class MqttSN

Constants

CLEAN_FLAG
CONNACK_TYPE
CONNECT_TYPE
DISCONNECT_TYPE
GWINFO_TYPE
MAX_IDLE
Nretry
PINGREQ_TYPE
PINGRESP_TYPE
PUBACK_TYPE
PUBCOMP_TYPE
PUBLISH_TYPE
PUBREC_TYPE
PUBREL_TYPE
QOS0_FLAG
QOS1_FLAG
QOS2_FLAG
QOSM1_FLAG
REGACK_TYPE
REGISTER_TYPE
RETAIN_FLAG
SEARCHGW_TYPE
SUBACK_TYPE
SUBSCRIBE_TYPE
TOPIC_PREDEFINED_FLAG
TOPIC_SHORT_FLAG
Tretry
UNSUBACK_TYPE
UNSUBSCRIBE_TYPE
WILLMSGREQ_TYPE
WILLMSGRESP_TYPE
WILLMSGUPD_TYPE
WILLMSG_TYPE
WILLTOPICREQ_TYPE
WILLTOPICRESP_TYPE
WILLTOPICUPD_TYPE
WILLTOPIC_TYPE
WILL_FLAG

Attributes

active_gw_id[RW]
clients[RW]
gateways[RW]
http_log[RW]
options[RW]
state[RW]

Public Class Methods

build_packet(m) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 358
def self.build_packet m
  msg=" "
  len=1
  m.each_with_index do |b,i|
    msg[i+1]=b.chr
    len+=1
  end
  msg[0]=len.chr
  msg
end
hexdump(data) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 349
def self.hexdump data
  raw=""
  data.each_byte do |b|
    raw=raw+"," if raw!=""
    raw=raw+sprintf("%02X",b)
  end
  raw
end
new(hash={}) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 171
def initialize(hash={})
    @options=hash #save these
    @server_uri=hash[:server_uri]
    @debug=hash[:debug]
    @verbose=hash[:verbose]
    @state=:disconnected
    @bcast_port=5000
    @keepalive=(hash[:keepalive]||25).to_i
    @forwarder=hash[:forwarder] #flag to indicate forward mode
    @will_topic=nil
    @will_msg=nil
    @active_gw_id=nil
    @http_log=[]
    @local_ifs=Socket.getifaddrs.map { |i| i.addr.ip_address if i.addr.ipv4? }.compact


    @sem=Mutex.new
    @gsem=Mutex.new
    @log_q = Queue.new #log queue :)
    @msg_id=1
    @clients={}
    @gateways={}
    @autodiscovery=false
    @broadcast_uri=hash[:broadcast_uri]

    if @server_uri
      note "Using Default Gateway: #{@server_uri}"
      add_gateway(0,{uri: @server_uri,source: "default"})
      pick_new_gateway
    elsif @broadcast_uri
      note "Autodiscovery Active, using #{@broadcast_uri}"
      @autodiscovery=true
    else
      puts "No autodiscovery and no Default Gateway -- cannot proceed"
      exit -1
    end


    @log_t=Thread.new do
      log_thread
    end

    @topics={} #hash of registered topics is stored here
    @iq = Queue.new
    @dataq = Queue.new

    if @broadcast_uri
      @bcast_s=open_multicast_send_port
      @bcast=open_multicast_recv_port

      @roam_t=Thread.new do
        roam_thread @bcast
      end
    end
    if @forwarder
      @s,@server,@port = MqttSN::open_port @server_uri
      @local_port=hash[:local_port]||1882
      note "Open port to Gateway: #{@server_uri}: #{@server},#{@port} -- Listening local port: #{@local_port} @ IPs: #{@local_ifs}"
      @s.bind("0.0.0.0",@local_port)
      @bcast_period=60
    else
      client_thread
    end
end
open_port(uri_s) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 113
def self.open_port uri_s
  begin
    uri = URI.parse(uri_s)
   if uri.scheme== 'udp'
      return [UDPSocket.new,uri.host,uri.port]
    else
      raise "Error: Cannot open socket for '#{uri_s}', unsupported scheme: '#{uri.scheme}'"
    end
  rescue => e
      pp e.backtrace
      raise "Error: Cannot open socket for '#{uri_s}': #{e}"
  end
end
parse_message(r) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 799
def self.parse_message r
  m=nil
  if not r[0]
    puts "Malformed empty packet #{r}"
    return {}
  end
  len=r[0].ord
  if len!= r.size
    puts "Malformed packet #{r}"
    return {}
  end
  case r[len-1].ord
  when 0x00
    status=:ok
  when 0x01
    status=:rejected_congestion
  when 0x02
    status=:rejected_invalid_topic_id
  when 0x03
    status=:rejected_not_supported
  else
    status=:unknown_error
  end
  type_byte=r[1].ord
  done=false

  case type_byte
  when CONNECT_TYPE
    duration=(r[4].ord<<8)+r[5].ord
    id=r[6,len-6]
    m={type: :connect, flags: r[2].ord, duration: duration, client_id: id, status: :ok}
  when CONNACK_TYPE
    m={type: :connect_ack,status: status}
  when SUBACK_TYPE
    topic_id=(r[3].ord<<8)+r[4].ord
    msg_id=(r[5].ord<<8)+r[6].ord
    m={type: :sub_ack, topic_id: topic_id, msg_id: msg_id, status: status}
  when SUBSCRIBE_TYPE
    msg_id=(r[3].ord<<8)+r[4].ord
    topic=r[5,len-5]
    m={type: :subscribe, flags: r[2].ord, topic: topic, msg_id: msg_id, status: :ok}
  when UNSUBACK_TYPE
    msg_id=(r[2].ord<<8)+r[3].ord
    m={type: :unsub_ack, msg_id: msg_id, status: :ok}
  when PUBLISH_TYPE
    topic_id=(r[3].ord<<8)+r[4].ord
    msg_id=(r[5].ord<<8)+r[6].ord
    msg=r[7,len-7].force_encoding("UTF-8")
    flags=r[2].ord
    topic_type=:long
    topic=""
    if flags&0x03==TOPIC_SHORT_FLAG
      topic_type=:short
      topic=r[3].chr+r[4].chr
    elsif flags&0x03==TOPIC_PREDEFINED_FLAG
      topic_type=:predefined
      topic=""
    end
    qos=(flags>>5)&0x03
    qos=-1 if qos==3
    m={type: :publish, qos: qos, topic_id: topic_id, topic_type:topic_type, topic: topic, msg_id: msg_id, msg: msg,flags: flags, status: :ok}
    m[:retain]=true if flags & RETAIN_FLAG == RETAIN_FLAG
  when PUBREL_TYPE
    msg_id=(r[2].ord<<8)+r[3].ord
    m={type: :pub_rel, msg_id: msg_id, status: :ok}
  when DISCONNECT_TYPE
    m={type: :disconnect,status: :ok}
  when REGISTER_TYPE
    topic_id=(r[2].ord<<8)+r[3].ord
    msg_id=(r[4].ord<<8)+r[5].ord
    topic=r[6,len-6]
    m={type: :register, topic_id: topic_id, msg_id: msg_id, topic: topic,status: :ok}
  when REGACK_TYPE
    topic_id=(r[2].ord<<8)+r[3].ord
    m={type: :register_ack,topic_id: topic_id,status: status}
  when PUBREC_TYPE
    msg_id=(r[2].ord<<8)+r[3].ord
    m={type: :pubrec,msg_id: msg_id,status: :ok}
  when PUBACK_TYPE
    topic_id=(r[2].ord<<8)+r[3].ord
    msg_id=(r[4].ord<<8)+r[5].ord
    m={type: :publish_ack,topic_id: topic_id,msg_id: msg_id, status: status}
  when PUBCOMP_TYPE
    msg_id=(r[2].ord<<8)+r[3].ord
    m={type: :pubcomp,status: :ok, msg_id: msg_id}

  when WILLTOPICREQ_TYPE
    m={type: :will_topic_req, status: :ok}
  when WILLMSGREQ_TYPE
    m={type: :will_msg_req, status: :ok}

  when WILLTOPICRESP_TYPE
    m={type: :will_topic_resp, status: :ok}
  when WILLMSGRESP_TYPE
    m={type: :will_msg_resp, status: :ok}

  when SEARCHGW_TYPE
    m={type: :searchgw, radius: r[2].ord, status: :ok}
  when GWINFO_TYPE
    m={type: :gwinfo, gw_id: r[2].ord, status: :ok}
  when ADVERTISE_TYPE
    duration=(r[3].ord<<8)+r[4].ord
    m={type: :advertise, gw_id: r[2].ord, duration: duration, status: :ok}

  when PINGREQ_TYPE
    m={type: :ping, status: :ok}
  when PINGRESP_TYPE
    m={type: :pong, status: :ok}

  else
    m={type: :unknown, type_byte: type_byte }
  end
  m
end
poll_packet(socket) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 627
def self.poll_packet socket
  #decide how to get data -- UDP-socket or FM-radio
  begin
    r,stuff=socket.recvfrom_nonblock(200) #get_packet --high level func!
    client_ip=stuff[2]
    client_port=stuff[1]
    return [r,client_ip,client_port]
  rescue IO::WaitReadable
    sleep 0.1
  rescue => e
    puts "Error: receive thread died: #{e}"
    pp e.backtrace
  end
  return nil
end
poll_packet_block(socket) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 643
def self.poll_packet_block socket
  #decide how to get data -- UDP-socket or FM-radio
  r,stuff=socket.recvfrom(200) #get_packet --high level func!
  client_ip=stuff[2]
  client_port=stuff[1]
  return [r,client_ip,client_port]
end
send_raw_packet(msg,socket,server,port) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 558
def self.send_raw_packet msg,socket,server,port
  if socket
    socket.send(msg, 0, server, port)
    MqttSN::hexdump msg
  else
    puts "Error: no socket at send_raw_packet"
  end
end

Public Instance Methods

add_gateway(gw_id,hash) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 158
def add_gateway gw_id,hash
  if not @gateways[gw_id]
     @gateways[gw_id]={stamp: Time.now.to_i, status: :ok, last_use: 0,last_ping: 0,counter_send:0, last_send: 0,counter_recv:0, last_recv: 0}.merge(hash)
  else
    if @gateways[gw_id][:uri]!=hash[:uri]
      note "conflict -- gateway has moved? or duplicate"
    else
      @gateways[gw_id][:stamp]=Time.now.to_i
      @gateways[gw_id]=@gateways[gw_id].merge hash
    end
  end
end
client_thread() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 967
def client_thread
  Thread.new do #ping thread
    while true do
      begin
        while @state==:disconnected
          sleep 1
        end
        sleep @keepalive
        if @active_gw_id and @gateways[@active_gw_id] and @gateways[@active_gw_id][:socket] #if we are connected...
          send :ping, timeout: 5,expect: :pong do |status,message|
            if status!=:ok
              note "Error:#{@id} no pong! -- sending disconnect to app"
              @state=:disconnected
              gateway_close :timeout
            end
          end
        end
      rescue => e
        puts "Error: receive thread died: #{e}"
        pp e.backtrace
      end
    end
  end

  Thread.new do #work thread
    while true do
      begin
        do_sleep=true
          if @active_gw_id and @gateways[@active_gw_id] and @gateways[@active_gw_id][:socket] #if we are connected...
            if pac=MqttSN::poll_packet(@gateways[@active_gw_id][:socket]) #cannot block -- gateway may change...
              r,client_ip,client_port=pac
              m=MqttSN::parse_message r
              if @debug and m
                m[:debug]=MqttSN::hexdump r
              end
              _,port,_,_= @gateways[@active_gw_id][:socket].addr
              src="udp://0.0.0.0:#{port}"
              logger "id %-24.24s -> %-24.24s | %s",@gateways[@active_gw_id][:uri],src,m.to_json
              process_message m
              do_sleep=false
              @gateways[@active_gw_id][:last_recv]=Time.now.to_i
              @gateways[@active_gw_id][:counter_recv]+=1
            end
          end
        if do_sleep
          sleep 0.01
        end
      rescue => e
        puts "Error: receive thread died: #{e}"
        pp e.backtrace
      end
    end
  end
end
connect(id,&block) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 676
def connect id,&block
  send :connect,id: id, clean: false, duration: @keepalive, expect: [:connect_ack,:will_topic_req] do |s,m| #add will here!
    if s==:ok
      if m[:type]==:will_topic_req
        send :will_topic, topic: @will_topic, expect: [:will_msg_req] do |s,m| #add will here!
          if s==:ok
            send :will_msg, msg: @will_msg, expect: [:connect_ack] do |s,m|
            end
          end
        end
      elsif m[:type]==:connect_ack
        block.call :ok,m if block
      end
    else
      block.call :fail,m if block
    end
  end
end
disconnect() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 695
def disconnect
  send :disconnect, expect: :disconnect do |status,message|
  end
end
forwarder_thread() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 236
def forwarder_thread
  if not @forwarder
    raise "Cannot Forward if no Forwarder!"
  end
  begin
    last_kill=0
    stime=Time.now.to_i
    Thread.new do #maintenance
      while true do
        sleep 1
        now=Time.now.to_i
        changes=false
        @clients.dup.each do |key,data|
          if data[:state]==:disconnected
            dest="#{data[:ip]}:#{data[:port]}"
            note "- %s",dest
            @clients.delete key
            changes=true
          elsif data[:last_send]<now-MAX_IDLE and data[:last_recv]<now-MAX_IDLE
            dest="#{data[:ip]}:#{data[:port]}"
            note "-- %s",dest
            kill_client key
            @clients.delete key
            changes=true
          end
        end
        if changes
           note "cli:#{@clients.to_json}"
        end
      end
    end
    while true
      pac=MqttSN::poll_packet_block(@s) #data from clients to our service sovket
      r,client_ip,client_port=pac
      key="#{client_ip}:#{client_port}"
      if not @clients[key]
        uri="udp://#{client_ip}:#{client_port}"
        @clients[key]={ip:client_ip, port:client_port, socket: UDPSocket.new, uri: uri, state: :active, counter_send:0, last_send:0 , counter_recv:0, last_recv:0}
        c=@clients[key]
        puts "thread start for #{key}"

        @clients[key][:thread]=Thread.new(key) do |my_key|
          while true
            pacc=MqttSN::poll_packet_block(@clients[my_key][:socket]) #if we get data from server destined to our client
            rr,client_ip,client_port=pacc
            @s.send(rr, 0, @clients[my_key][:ip], @clients[my_key][:port]) # send_packet to client
            mm=MqttSN::parse_message rr
            _,port,_,_ = @clients[my_key][:socket].addr
            dest="#{@server}:#{port}"
            logger "sc %-24.24s <- %-24.24s | %s",@clients[my_key][:uri],@gateways[@active_gw_id][:uri],mm.to_json
            @gateways[@active_gw_id][:last_recv]=Time.now.to_i
            @gateways[@active_gw_id][:counter_recv]+=1
            @clients[my_key][:last_send]=Time.now.to_i
            @clients[my_key][:counter_send]+=1

            case mm[:type]
            when :disconnect
              @clients[my_key][:state]=:disconnected
            end
          end
        end
        dest="#{client_ip}:#{client_port}"
        note "+ %s\n",dest
        note "cli: #{@clients.to_json}"
      end
      @clients[key][:stamp]=Time.now.to_i
      m=MqttSN::parse_message r
      case m[:type]
      when :publish
        if m[:qos]==-1
          @clients[key][:state]=:disconnected #one shot
        end
      end
      sbytes=@clients[key][:socket].send(r, 0, @server, @port) # to rsmb -- ok as is
      _,port,_,_ = @clients[key][:socket].addr
      dest="#{@server}:#{port}"
      @gateways[@active_gw_id][:last_send]=Time.now.to_i
      @gateways[@active_gw_id][:counter_send]+=1
      @clients[key][:last_recv]=Time.now.to_i
      @clients[key][:counter_recv]+=1
      begin
        if @active_gw_id
          logger "cs %-24.24s -> %-24.24s | %s", @clients[key][:uri],@gateways[@active_gw_id][:uri],m.to_json
        else
          logger "cs %-24.24s -> %-24.24s | %s", @clients[key][:uri],"??",m.to_json
        end
      rescue Exception =>e
        puts "logging fails #{e}"
      end
     end
  end
end
gateway_close(cause) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1039
def gateway_close cause
  @gsem.synchronize do #one command at a time --

    if @active_gw_id # if using one, mark it used, so it will be last reused
      note "Closing gw #{@active_gw_id} cause: #{cause}"
      @gateways[@active_gw_id][:last_use]=Time.now.to_i
      if @gateways[@active_gw_id][:socket]
        @gateways[@active_gw_id][:socket].close
        @gateways[@active_gw_id][:socket]=nil
      end
      @active_gw_id=nil
    end
  end
end
goto_sleep(duration) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 700
def goto_sleep duration
  send :disconnect, duration: duration, expect: :disconnect do |status,message|
  end
end
kill_client(key) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 329
def kill_client key
  puts "Killing Client #{key}:"
  if c=@clients[key]
    puts "Really Killing #{key}"
    send_packet [DISCONNECT_TYPE],@s,c[:ip], c[:port]
    send_packet [DISCONNECT_TYPE],c[:socket], @server,@port
  end
end
kill_clients() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 338
def kill_clients
  puts "Killing Clients:"
  @clients.each do |key,c|
    puts "Killing #{key}"
    send_packet [DISCONNECT_TYPE],@s,c[:ip], c[:port]
    send_packet [DISCONNECT_TYPE],c[:socket], @server,@port
  end
  puts "Killing Clients Done."
end
log_empty?() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 87
def log_empty?
  @log_q.empty? or not @verbose
end
log_flush() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 91
def log_flush
  while not log_empty?
    sleep 0.1
  end
end
log_thread() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 97
def log_thread
  while true do
    begin
      if not @log_q.empty?
        l=@log_q.pop
        puts l
      else
        sleep 0.01
      end
    rescue => e
      puts "Error: receive thread died: #{e}"
      pp e.backtrace
    end
  end
end
logger(str,*args) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 58
def logger str,*args
  s=sprintf(str,*args)
  if not @forwarder
    text=sprintf("%s: (%3.3s) | %s",Time.now.iso8601,@active_gw_id,s)
  else
    text=sprintf("%s: [%3.3s] | %s",Time.now.iso8601,@options[:gw_id],s)
  end
  if @verbose or @debug
    @log_q << text
  end
  if @options[:http_port] and @http_log
    @http_log << {stamp: Time.now.to_i, text: text}
  end
end
note(str,*args) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 73
def note str,*args
  begin
    s=sprintf(str,*args)
    text=sprintf("%s: %s",Time.now.iso8601,s)
    @log_q << text
    if @options[:http_port] and @http_log
      @http_log << {stamp: Time.now.to_i, text: text}
    end
  rescue => e
    pp e.backtrace
    puts "note dies: #{e} '#{str}'"
  end
end
open_multicast_recv_port() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 136
def open_multicast_recv_port
  uri = URI.parse(@broadcast_uri)
  ip =  IPAddr.new(uri.host).hton + IPAddr.new("0.0.0.0").hton
  s = UDPSocket.new
  s.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, ip)
  begin
    s.setsockopt(:SOL_SOCKET, :SO_REUSEPORT, 1)
  rescue #on raspian this is needed...
    puts "WARNING: :SO_REUSEPORT not defined -- guessing it is 15 ;)"
    s.setsockopt(:SOL_SOCKET, 15, 1)
  end
  s.bind(Socket::INADDR_ANY, uri.port)
  s
end
open_multicast_send_port() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 127
def open_multicast_send_port
  uri = URI.parse(@broadcast_uri)

  ip =  IPAddr.new(uri.host).hton + IPAddr.new("0.0.0.0").hton
  socket_b = UDPSocket.new
  socket_b.setsockopt(Socket::IPPROTO_IP, Socket::IP_TTL, [1].pack('i'))
  socket_b
end
pick_new_gateway() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1054
def pick_new_gateway
  begin
    gateway_close nil
    @gsem.synchronize do #one command at a time --
      pick=nil
      pick_t=0
      @gateways.each do |gw_id,data|
        if data[:uri] and data[:status]==:ok
          if not pick or data[:last_use]==0  or pick_t>data[:last_use]
            pick=gw_id
            pick_t=data[:last_use]
          end
        end
      end
      if pick
        @active_gw_id=pick
        note "Opening Gateway #{@active_gw_id}: #{@gateways[@active_gw_id][:uri]}"
        @s,@server,@port = MqttSN::open_port @gateways[@active_gw_id][:uri]
        @gateways[@active_gw_id][:socket]=@s
        @gateways[@active_gw_id][:last_use]=Time.now.to_i
      else
        #note "Error: no usable gw found !!"
      end
    end
  rescue => e
    puts "Error: receive thread died: #{e}"
    pp e.backtrace
  end
  return @active_gw_id
end
ping() click to toggle source
# File lib/mqtt-sn-ruby.rb, line 743
def ping
  send :ping, timeout: 2,expect: :pong do |status,message|
    if status==:ok
    else
      puts "Error:#{@id} no pong!"
    end
  end
end
process_broadcast_message(m,client_ip,client_port) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1022
def process_broadcast_message m,client_ip,client_port
  case m[:type]
  when :searchgw
    if @forwarder
      _,port,_,_=@bcast.addr
      #logger "ib %-24.24s -> %-24.24s | %s","udp://0.0.0.0:#{port}",@broadcast_uri,m.to_json
      send_packet_bcast [GWINFO_TYPE,@options[:gw_id]]
    end
  when :advertise,:gwinfo
    gw_id=m[:gw_id]
    duration=m[:duration]||180
    uri="udp://#{client_ip}:1882"
    add_gateway(gw_id,{uri: uri, source: m[:type], duration:duration,stamp: Time.now.to_i})
  end
end
process_message(m) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 914
def process_message m
  done=false
  case m[:type]
  when :register
    @topics[m[:topic]]=m[:topic_id]
    if not @transfer
      send :register_ack,topic_id: m[:topic_id], msg_id: m[:msg_id], return_code: 0
      done=true
    end
  when :disconnect
    @state=:disconnected if not @transfer
  when :pub_rel
    if not @transfer
      send :pub_comp, msg_id: m[:msg_id]
      done=true
    end
  when :publish
    if m[:topic_type]==:long
      m[:topic]=@topics.key(m[:topic_id])
    end
    if not @transfer
      @dataq<<m
      if m[:qos]==1
        send :publish_ack,topic_id: m[:topic_id], msg_id: m[:msg_id], return_code: 0
      elsif m[:qos]==2
        send :pub_rec, msg_id: m[:msg_id]
      end
      done=true
    end
  when :connect_ack
    @state=:connected
  when :sub_ack
    @state=:subscribed
  when :pong
    @gsem.synchronize do #one command at a time --
      if @active_gw_id and @gateways[@active_gw_id]
        @gateways[@active_gw_id][:last_ping]=Time.now.to_i
      end
    end
  when :searchgw
    done=true
  when :advertise
    done=true
  when :gwinfo
    done=true
  end
 # puts "got :#{@id} #{m.to_json}"  if @verbose
  if not done
    @iq<<m if m
  end
  m
end
pub(options={}) click to toggle source

toplevel funcs:

# File lib/mqtt-sn-ruby.rb, line 1139
def pub options={}
  sent=false
  if options[:qos]==-1
    publish options[:topic]||"XX", options[:msg]||"test_value", qos: options[:qos]
    puts "Sent."
  else
    while not sent
      connect options[:id] do |s,m|
        if s==:ok
          publish options[:topic]||"test/message/123", options[:msg]||"test_value", qos: options[:qos]
          puts "Sent ok."
          sent=true
        else
          disconnect
        end
      end
    end
  end
  log_flush
end
publish(topic,msg,hash={}) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 764
def publish topic,msg,hash={}
  if topic[0]=="="
    topic[0]=""
    topic_id=topic.to_i
    topic_type=:predefined
  elsif topic.size==2
    topic_id=((topic[0].ord&0xff)<<8)+(topic[1].ord&0xff)
    topic_type=:short
  else
    topic_type=:long
    if not @topics[topic]
      register_topic topic
    end
    topic_id=@topics[topic]
  end
  case hash[:qos]
  when 1
    send :publish,msg: msg, retain: hash[:retain], topic_id: topic_id, topic_type: topic_type, qos: 1, expect: [:publish_ack] do |s,m|
      if s==:ok
      end
    end
  when 2
    send :publish,msg: msg, retain: hash[:retain], topic_id: topic_id, topic_type: topic_type, qos: 2, expect: [:pubrec] do |s,m|
      if s==:ok
        if m[:type]==:pubrec
          send :pubrel,msg_id: m[:msg_id], expect: :pubcomp do |s,m|
          end
        end
      end
    end
  else
    send :publish,msg: msg, retain: hash[:retain],topic_id: topic_id, topic_type: topic_type, qos: hash[:qos]||0
  end
end
register_topic(topic) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 752
def register_topic topic
  send :register,topic: topic, expect: :register_ack do |s,m|
    if s==:ok
      @topics[topic]=m[:topic_id]
    else
      raise "Error:#{@id} Register topic #{topic} failed!"
    end
    #pp @topics
  end
  @topics[topic]
end
roam_thread(socket) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1085
def roam_thread socket
  @last_bcast=0
  if @forwarder
    Thread.new do
      while true do
        send_packet_bcast [ADVERTISE_TYPE,@options[:gw_id],@bcast_period>>8,@bcast_period&0xff]
        sleep @bcast_period
      end
    end
  elsif @autodiscovery  #client should try to find some gateways..
    Thread.new do
      while true do
        send :searchgw #replies may or may not come -- even multiple!
        if @gateways=={}
          sleep 5
        else
          #pp @gateways
          sleep 30
        end
      end
    end
    Thread.new do
      while true do
        if @active_gw_id and @gateways[@active_gw_id] and @gateways[@active_gw_id][:socket]
        else # not so ok -- pick one!
          #pick_new_gateway
        end
        sleep 0.01
      end
    end
  end
  while true do
    begin
      if @bcast
       pac=MqttSN::poll_packet_block(socket)
        r,client_ip,client_port=pac
        m=MqttSN::parse_message r
        if @debug and m
          m[:debug]=MqttSN::hexdump r
        end
        _,port,_,_ = @bcast.addr
        src="udp://#{client_ip}:#{client_port}"
        logger "ib %-24.24s <- %-24.24s | %s",@broadcast_uri,src,m.to_json
        process_broadcast_message m,client_ip,client_port
      end
    rescue => e
      puts "Error: receive thread died: #{e}"
      pp e.backtrace
    end
  end
end
send(type,hash={}) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 369
def send type,hash={},&block
  if @state==:disconnected and type!=:connect and type!=:will_topic  and type!=:will_msg  and type!=:searchgw
    if type==:disconnect
      return #already disconnected.. nothing to do
    elsif type==:publish and hash[:qos]==-1
    else
      note "Error: Cannot #{type} while unconnected, send :connect first!"
      return nil
    end
  end
  case type
  when :connect
    if not hash[:id] or hash[:id]=""
      hash[:id]="mqtt-sn-ruby-#{$$}"
    end
    note "Connecting as '#{hash[:id]}'"
    flags=0
    flags+=CLEAN_FLAG if hash[:clean]
    flags+=RETAIN_FLAG if hash[:retain]
    flags+=WILL_FLAG if @will_topic
    p=[CONNECT_TYPE,flags,0x01,hash[:duration]>>8 ,hash[:duration] & 0xff]
    hash[:id].each_byte do |b|
      p<<b
    end
    @id=hash[:id]
  when :register
    raise "Need :topic to Publish!" if not hash[:topic]
    p=[REGISTER_TYPE,0,0,@msg_id >>8 ,@msg_id & 0xff]
    hash[:topic].each_byte do |b|
      p<<b
    end
    @msg_id+=1
  when :register_ack
    p=[REGACK_TYPE,hash[:topic_id]>>8 ,hash[:topic_id] & 0xff,hash[:msg_id]>>8 ,hash[:msg_id] & 0xff,hash[:return_code]]
  when :publish_ack
    p=[PUBACK_TYPE,hash[:topic_id]>>8 ,hash[:topic_id] & 0xff,hash[:msg_id]>>8 ,hash[:msg_id] & 0xff,hash[:return_code]]
  when :pub_rec
    p=[PUBREC_TYPE,hash[:msg_id]>>8 ,hash[:msg_id] & 0xff]
  when :pub_comp
    p=[PUBCOMP_TYPE,hash[:msg_id]>>8 ,hash[:msg_id] & 0xff]
  when :will_topic
    raise "Need :topic to :will_topic" if not hash[:topic]
    p=[WILLTOPIC_TYPE,0]
    hash[:topic].each_byte do |b|
      p<<b
    end
  when :will_topic_upd
    raise "Need :topic to :will_topic_upd" if not hash[:topic]
    p=[WILLTOPICUPD_TYPE,0]
    hash[:topic].each_byte do |b|
      p<<b
    end
  when :will_msg
    raise "Need :msg to :will_msg" if not hash[:msg]
    p=[WILLMSG_TYPE]
    hash[:msg].each_byte do |b|
      p<<b
    end
  when :will_msg_upd
    raise "Need :msg to :will_msg_upd" if not hash[:msg]
    p=[WILLMSGUPD_TYPE]
    hash[:msg].each_byte do |b|
      p<<b
    end

  when :subscribe
    raise "Need :topic to :subscribe" if not hash[:topic]
    qos=hash[:qos]||0
    flags=0
    if qos==-1
      flags+=QOSM1_FLAG
    else
      flags+=QOS1_FLAG*qos
    end
    p=[SUBSCRIBE_TYPE,flags,@msg_id >>8 ,@msg_id & 0xff]
    hash[:topic].each_byte do |b|
      p<<b
    end
    @msg_id+=1

  when :unsubscribe
    raise "Need :topic to :unsubscribe" if not hash[:topic]
    p=[UNSUBSCRIBE_TYPE,0,@msg_id >>8 ,@msg_id & 0xff]
    hash[:topic].each_byte do |b|
      p<<b
    end
    @msg_id+=1

  when :publish
    raise "Need :topic_id to Publish!" if not hash[:topic_id]
    qos=hash[:qos]||0
    flags=0
    flags+=RETAIN_FLAG if hash[:retain]
    if qos==-1
      flags+=QOSM1_FLAG
    else
      flags+=QOS1_FLAG*qos
    end
    if hash[:topic_type]==:short
      flags+=TOPIC_SHORT_FLAG
    elsif hash[:topic_type]==:predefined
      flags+=TOPIC_PREDEFINED_FLAG
    end
    p=[PUBLISH_TYPE,flags,hash[:topic_id] >>8 ,hash[:topic_id] & 0xff,@msg_id >>8 ,@msg_id & 0xff]
    hash[:msg].each_byte do |b|
      p<<b.ord
    end
    @msg_id+=1
  when :pubrel
    raise "Need the original :msg_id of the Publish for PubRel!" if not hash[:msg_id]
    p=[PUBREL_TYPE,hash[:msg_id] >>8 ,hash[:msg_id] & 0xff]
  when :ping
    p=[PINGREQ_TYPE]
   when :searchgw
    p=[SEARCHGW_TYPE,0]
  when :disconnect
    if hash[:duration]
      p=[DISCONNECT_TYPE,hash[:duration] >>8 ,hash[:duration] & 0xff]
    else
      p=[DISCONNECT_TYPE]
    end
  else
    puts "Error: Strange send?? #{type}"
    return nil
  end
  status=:timeout
  m={}
  if not hash[:expect]
    if type==:searchgw
      raw=send_packet_bcast p
    else
      raw=send_packet_gw p
    end
    return
  end
  @sem.synchronize do #one command at a time --
    if hash[:expect]
      while not @iq.empty?
        mp=@iq.pop
        puts "WARN:#{@id} ************** Purged message: #{mp}"
      end
      @iq.clear
    end
    if type==:searchgw
      raw=send_packet_bcast p
    else
      raw=send_packet_gw p
    end
    hash[:debug]=raw if @debug
    #puts "send:#{@id} #{type},#{hash.to_json}" if @verbose
    timeout=hash[:timeout]||Tretry
    retries=0
    if hash[:expect]
      while retries<Nretry do
        stime=Time.now.to_i
        while Time.now.to_i<stime+timeout
          if not @iq.empty?
            m=@iq.pop
            if Array(hash[:expect]).include? m[:type]
              status=:ok
              break
            else
              puts "WARN:#{@id} ************** Discarded message: #{m}"
            end
          end
          sleep 0.1
        end
        if status==:ok
          break
        else
          retries+=1
          send_packet_gw p
          puts "fail to get ack, retry #{retries} :#{@id} #{type},#{hash.to_json}"
          #need to set DUP flag !
        end
      end
      if status==:timeout
        note "Warn: ack timeouted, assume disconnected"
        @state=:disconnect
        gateway_close :timeout
      end
    end
  end #sem
  if block
    block.call  status,m
  end

end
send_packet(m,socket,server,port) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 567
def send_packet m,socket,server,port
  msg=MqttSN::build_packet m
  MqttSN::send_raw_packet msg,socket,server,port
  dest="#{server}:#{port}"
   _,port,_,_ = socket.addr
  src=":#{port}"
  logger "od %-18.18s <- %-18.18s | %s",dest,src,MqttSN::parse_message(msg).to_json
end
send_packet_bcast(m) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 576
def send_packet_bcast m
  uri = URI.parse(@broadcast_uri)
  msg=MqttSN::build_packet m
  MqttSN::send_raw_packet msg,@bcast_s,uri.host,uri.port
   _,port,_,_ = @bcast_s.addr
  src="udp://0.0.0.0:#{port}"
  logger "ob %-24.24s <- %-24.24s | %s",@broadcast_uri,src,MqttSN::parse_message(msg).to_json
end
send_packet_gw(m) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 585
def send_packet_gw m
  msg=MqttSN::build_packet m
  waits=0
  debug={}
  debug[:debug]=MqttSN::hexdump(msg) if @debug

  if not @active_gw_id or not @gateways[@active_gw_id] or not @gateways[@active_gw_id][:socket]
    note "No active gw, wait ."
    while not @active_gw_id or not @gateways[@active_gw_id] or not @gateways[@active_gw_id][:socket]
      ret="-"
      if  not ret=pick_new_gateway
        sleep 0.5
        #print "."
      end
      waits+=1
      if waits>30
        puts "\nNone Found -- not sending"
        return
      end
    end
    note "Gw Ok!"
  end
  ok=false
  @gsem.synchronize do
    if @active_gw_id and @gateways[@active_gw_id] and @gateways[@active_gw_id][:socket]
      ok=true
      @gateways[@active_gw_id][:last_send]=Time.now.to_i
      @gateways[@active_gw_id][:counter_send]+=1
      uri=URI.parse(@gateways[@active_gw_id][:uri])
      #uri.scheme
      MqttSN::send_raw_packet msg,@gateways[@active_gw_id][:socket],uri.host,uri.port
      _,port,_,_ = @gateways[@active_gw_id][:socket].addr
      src="udp://0.0.0.0:#{port}"
      logger "od %-24.24s <- %-24.24s | %s",@gateways[@active_gw_id][:uri],src,MqttSN::parse_message(msg).merge(debug).to_json  ###utf-8
    end
  end
  if not ok
    puts "no gw to send.."
    sleep 1
  end
end
sub(options={}) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1160
  def sub options={},&block
    loop do
      note "Connecting.."
      connect options[:id] do |cs,cm|
        note "connect result: #{cs} #{cm}"
        if cs==:ok
          note "Subscribing.."
          subscribe options[:topic]||"test/message/123", qos: options[:qos] do |s,m|
            if s==:sub_ack
              note "Subscribed Ok! Waiting for Messages!"
            elsif s==:disconnect
              note "Disconnected -- switch to new gateway"
            else
             if block
                block.call s,m
              else
                note "Got Message: #{s}: #{m}"
              end
            end
          end
        end
      end
      puts "Disconnected..."
    end
end
subscribe(topic,hash={}) { |status, message| ... } click to toggle source

Mid-level function to send Subscription to Broker, if code block is provided, it will run the block when new messages are received. When subsciption is established with Broker, a :sub_ack message is given to code block. Incoming messages are indicated with :got_data status. If connection to Broker is disrupted, a :disconnect message is given.!

# File lib/mqtt-sn-ruby.rb, line 711
def subscribe topic,hash={},&block  # :yields: status, message
  send :subscribe, topic: topic, qos: hash[:qos],expect: :sub_ack do |s,m|
    if s==:ok
      if m[:topic_id] and m[:topic_id]>0 #when subs topic has no wild cards, we get topic id here:
        if m[:topic_type]==:long
          @topics[topic]=m[:topic_id]
        end
      end
    end
    if block
      block.call :sub_ack,m
      while true
        if not @dataq.empty?
          m=@dataq.pop
          block.call :got_data,m
        end
        sleep 0.1
        if @state==:disconnected
          block.call :disconnect,{}
          #gateway_close :subscribe_disconnected
          break
        end
      end
    end
  end
end
unsubscribe(topic) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 738
def unsubscribe topic
  send :unsubscribe, topic: topic, expect: :unsub_ack do |s,m|
  end
end
will_and_testament(topic,msg) click to toggle source
# File lib/mqtt-sn-ruby.rb, line 651
def will_and_testament topic,msg
  if @state!=:disconnected #if already connected, send changes, otherwise wait until connect does it.
    if @will_topic!=topic
      send :will_topic_upd, topic: topic, expect: :will_topic_resp do |status,message|
        puts "will topic updated"
        if status==:ok
        else
          puts "Error:#{@id} no pong!"
        end
      end
    end
    if @will_msg!=msg
      send :will_msg_upd, msg: msg, expect: :will_msg_resp do |status,message|
        puts "will msg updated"
        if status==:ok
        else
          puts "Error:#{@id} no pong!"
        end
      end
    end
  end
  @will_topic=topic
  @will_msg=msg
end