class MqttSN
Constants
- ADVERTISE_TYPE
- CLEAN_FLAG
- CONNACK_TYPE
- CONNECT_TYPE
- DISCONNECT_TYPE
- GWINFO_TYPE
- MAX_IDLE
- Nretry
- PINGREQ_TYPE
- PINGRESP_TYPE
- PUBACK_TYPE
- PUBCOMP_TYPE
- PUBLISH_TYPE
- PUBREC_TYPE
- PUBREL_TYPE
- QOS0_FLAG
- QOS1_FLAG
- QOS2_FLAG
- QOSM1_FLAG
- REGACK_TYPE
- REGISTER_TYPE
- RETAIN_FLAG
- SEARCHGW_TYPE
- SUBACK_TYPE
- SUBSCRIBE_TYPE
- TOPIC_PREDEFINED_FLAG
- TOPIC_SHORT_FLAG
- Tretry
- UNSUBACK_TYPE
- UNSUBSCRIBE_TYPE
- WILLMSGREQ_TYPE
- WILLMSGRESP_TYPE
- WILLMSGUPD_TYPE
- WILLMSG_TYPE
- WILLTOPICREQ_TYPE
- WILLTOPICRESP_TYPE
- WILLTOPICUPD_TYPE
- WILLTOPIC_TYPE
- WILL_FLAG
Attributes
active_gw_id[RW]
clients[RW]
gateways[RW]
http_log[RW]
options[RW]
state[RW]
Public Class Methods
build_packet(m)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 358 def self.build_packet m msg=" " len=1 m.each_with_index do |b,i| msg[i+1]=b.chr len+=1 end msg[0]=len.chr msg end
hexdump(data)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 349 def self.hexdump data raw="" data.each_byte do |b| raw=raw+"," if raw!="" raw=raw+sprintf("%02X",b) end raw end
new(hash={})
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 171 def initialize(hash={}) @options=hash #save these @server_uri=hash[:server_uri] @debug=hash[:debug] @verbose=hash[:verbose] @state=:disconnected @bcast_port=5000 @keepalive=(hash[:keepalive]||25).to_i @forwarder=hash[:forwarder] #flag to indicate forward mode @will_topic=nil @will_msg=nil @active_gw_id=nil @http_log=[] @local_ifs=Socket.getifaddrs.map { |i| i.addr.ip_address if i.addr.ipv4? }.compact @sem=Mutex.new @gsem=Mutex.new @log_q = Queue.new #log queue :) @msg_id=1 @clients={} @gateways={} @autodiscovery=false @broadcast_uri=hash[:broadcast_uri] if @server_uri note "Using Default Gateway: #{@server_uri}" add_gateway(0,{uri: @server_uri,source: "default"}) pick_new_gateway elsif @broadcast_uri note "Autodiscovery Active, using #{@broadcast_uri}" @autodiscovery=true else puts "No autodiscovery and no Default Gateway -- cannot proceed" exit -1 end @log_t=Thread.new do log_thread end @topics={} #hash of registered topics is stored here @iq = Queue.new @dataq = Queue.new if @broadcast_uri @bcast_s=open_multicast_send_port @bcast=open_multicast_recv_port @roam_t=Thread.new do roam_thread @bcast end end if @forwarder @s,@server,@port = MqttSN::open_port @server_uri @local_port=hash[:local_port]||1882 note "Open port to Gateway: #{@server_uri}: #{@server},#{@port} -- Listening local port: #{@local_port} @ IPs: #{@local_ifs}" @s.bind("0.0.0.0",@local_port) @bcast_period=60 else client_thread end end
open_port(uri_s)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 113 def self.open_port uri_s begin uri = URI.parse(uri_s) if uri.scheme== 'udp' return [UDPSocket.new,uri.host,uri.port] else raise "Error: Cannot open socket for '#{uri_s}', unsupported scheme: '#{uri.scheme}'" end rescue => e pp e.backtrace raise "Error: Cannot open socket for '#{uri_s}': #{e}" end end
parse_message(r)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 799 def self.parse_message r m=nil if not r[0] puts "Malformed empty packet #{r}" return {} end len=r[0].ord if len!= r.size puts "Malformed packet #{r}" return {} end case r[len-1].ord when 0x00 status=:ok when 0x01 status=:rejected_congestion when 0x02 status=:rejected_invalid_topic_id when 0x03 status=:rejected_not_supported else status=:unknown_error end type_byte=r[1].ord done=false case type_byte when CONNECT_TYPE duration=(r[4].ord<<8)+r[5].ord id=r[6,len-6] m={type: :connect, flags: r[2].ord, duration: duration, client_id: id, status: :ok} when CONNACK_TYPE m={type: :connect_ack,status: status} when SUBACK_TYPE topic_id=(r[3].ord<<8)+r[4].ord msg_id=(r[5].ord<<8)+r[6].ord m={type: :sub_ack, topic_id: topic_id, msg_id: msg_id, status: status} when SUBSCRIBE_TYPE msg_id=(r[3].ord<<8)+r[4].ord topic=r[5,len-5] m={type: :subscribe, flags: r[2].ord, topic: topic, msg_id: msg_id, status: :ok} when UNSUBACK_TYPE msg_id=(r[2].ord<<8)+r[3].ord m={type: :unsub_ack, msg_id: msg_id, status: :ok} when PUBLISH_TYPE topic_id=(r[3].ord<<8)+r[4].ord msg_id=(r[5].ord<<8)+r[6].ord msg=r[7,len-7].force_encoding("UTF-8") flags=r[2].ord topic_type=:long topic="" if flags&0x03==TOPIC_SHORT_FLAG topic_type=:short topic=r[3].chr+r[4].chr elsif flags&0x03==TOPIC_PREDEFINED_FLAG topic_type=:predefined topic="" end qos=(flags>>5)&0x03 qos=-1 if qos==3 m={type: :publish, qos: qos, topic_id: topic_id, topic_type:topic_type, topic: topic, msg_id: msg_id, msg: msg,flags: flags, status: :ok} m[:retain]=true if flags & RETAIN_FLAG == RETAIN_FLAG when PUBREL_TYPE msg_id=(r[2].ord<<8)+r[3].ord m={type: :pub_rel, msg_id: msg_id, status: :ok} when DISCONNECT_TYPE m={type: :disconnect,status: :ok} when REGISTER_TYPE topic_id=(r[2].ord<<8)+r[3].ord msg_id=(r[4].ord<<8)+r[5].ord topic=r[6,len-6] m={type: :register, topic_id: topic_id, msg_id: msg_id, topic: topic,status: :ok} when REGACK_TYPE topic_id=(r[2].ord<<8)+r[3].ord m={type: :register_ack,topic_id: topic_id,status: status} when PUBREC_TYPE msg_id=(r[2].ord<<8)+r[3].ord m={type: :pubrec,msg_id: msg_id,status: :ok} when PUBACK_TYPE topic_id=(r[2].ord<<8)+r[3].ord msg_id=(r[4].ord<<8)+r[5].ord m={type: :publish_ack,topic_id: topic_id,msg_id: msg_id, status: status} when PUBCOMP_TYPE msg_id=(r[2].ord<<8)+r[3].ord m={type: :pubcomp,status: :ok, msg_id: msg_id} when WILLTOPICREQ_TYPE m={type: :will_topic_req, status: :ok} when WILLMSGREQ_TYPE m={type: :will_msg_req, status: :ok} when WILLTOPICRESP_TYPE m={type: :will_topic_resp, status: :ok} when WILLMSGRESP_TYPE m={type: :will_msg_resp, status: :ok} when SEARCHGW_TYPE m={type: :searchgw, radius: r[2].ord, status: :ok} when GWINFO_TYPE m={type: :gwinfo, gw_id: r[2].ord, status: :ok} when ADVERTISE_TYPE duration=(r[3].ord<<8)+r[4].ord m={type: :advertise, gw_id: r[2].ord, duration: duration, status: :ok} when PINGREQ_TYPE m={type: :ping, status: :ok} when PINGRESP_TYPE m={type: :pong, status: :ok} else m={type: :unknown, type_byte: type_byte } end m end
poll_packet(socket)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 627 def self.poll_packet socket #decide how to get data -- UDP-socket or FM-radio begin r,stuff=socket.recvfrom_nonblock(200) #get_packet --high level func! client_ip=stuff[2] client_port=stuff[1] return [r,client_ip,client_port] rescue IO::WaitReadable sleep 0.1 rescue => e puts "Error: receive thread died: #{e}" pp e.backtrace end return nil end
poll_packet_block(socket)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 643 def self.poll_packet_block socket #decide how to get data -- UDP-socket or FM-radio r,stuff=socket.recvfrom(200) #get_packet --high level func! client_ip=stuff[2] client_port=stuff[1] return [r,client_ip,client_port] end
send_raw_packet(msg,socket,server,port)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 558 def self.send_raw_packet msg,socket,server,port if socket socket.send(msg, 0, server, port) MqttSN::hexdump msg else puts "Error: no socket at send_raw_packet" end end
Public Instance Methods
add_gateway(gw_id,hash)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 158 def add_gateway gw_id,hash if not @gateways[gw_id] @gateways[gw_id]={stamp: Time.now.to_i, status: :ok, last_use: 0,last_ping: 0,counter_send:0, last_send: 0,counter_recv:0, last_recv: 0}.merge(hash) else if @gateways[gw_id][:uri]!=hash[:uri] note "conflict -- gateway has moved? or duplicate" else @gateways[gw_id][:stamp]=Time.now.to_i @gateways[gw_id]=@gateways[gw_id].merge hash end end end
client_thread()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 967 def client_thread Thread.new do #ping thread while true do begin while @state==:disconnected sleep 1 end sleep @keepalive if @active_gw_id and @gateways[@active_gw_id] and @gateways[@active_gw_id][:socket] #if we are connected... send :ping, timeout: 5,expect: :pong do |status,message| if status!=:ok note "Error:#{@id} no pong! -- sending disconnect to app" @state=:disconnected gateway_close :timeout end end end rescue => e puts "Error: receive thread died: #{e}" pp e.backtrace end end end Thread.new do #work thread while true do begin do_sleep=true if @active_gw_id and @gateways[@active_gw_id] and @gateways[@active_gw_id][:socket] #if we are connected... if pac=MqttSN::poll_packet(@gateways[@active_gw_id][:socket]) #cannot block -- gateway may change... r,client_ip,client_port=pac m=MqttSN::parse_message r if @debug and m m[:debug]=MqttSN::hexdump r end _,port,_,_= @gateways[@active_gw_id][:socket].addr src="udp://0.0.0.0:#{port}" logger "id %-24.24s -> %-24.24s | %s",@gateways[@active_gw_id][:uri],src,m.to_json process_message m do_sleep=false @gateways[@active_gw_id][:last_recv]=Time.now.to_i @gateways[@active_gw_id][:counter_recv]+=1 end end if do_sleep sleep 0.01 end rescue => e puts "Error: receive thread died: #{e}" pp e.backtrace end end end end
connect(id,&block)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 676 def connect id,&block send :connect,id: id, clean: false, duration: @keepalive, expect: [:connect_ack,:will_topic_req] do |s,m| #add will here! if s==:ok if m[:type]==:will_topic_req send :will_topic, topic: @will_topic, expect: [:will_msg_req] do |s,m| #add will here! if s==:ok send :will_msg, msg: @will_msg, expect: [:connect_ack] do |s,m| end end end elsif m[:type]==:connect_ack block.call :ok,m if block end else block.call :fail,m if block end end end
disconnect()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 695 def disconnect send :disconnect, expect: :disconnect do |status,message| end end
forwarder_thread()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 236 def forwarder_thread if not @forwarder raise "Cannot Forward if no Forwarder!" end begin last_kill=0 stime=Time.now.to_i Thread.new do #maintenance while true do sleep 1 now=Time.now.to_i changes=false @clients.dup.each do |key,data| if data[:state]==:disconnected dest="#{data[:ip]}:#{data[:port]}" note "- %s",dest @clients.delete key changes=true elsif data[:last_send]<now-MAX_IDLE and data[:last_recv]<now-MAX_IDLE dest="#{data[:ip]}:#{data[:port]}" note "-- %s",dest kill_client key @clients.delete key changes=true end end if changes note "cli:#{@clients.to_json}" end end end while true pac=MqttSN::poll_packet_block(@s) #data from clients to our service sovket r,client_ip,client_port=pac key="#{client_ip}:#{client_port}" if not @clients[key] uri="udp://#{client_ip}:#{client_port}" @clients[key]={ip:client_ip, port:client_port, socket: UDPSocket.new, uri: uri, state: :active, counter_send:0, last_send:0 , counter_recv:0, last_recv:0} c=@clients[key] puts "thread start for #{key}" @clients[key][:thread]=Thread.new(key) do |my_key| while true pacc=MqttSN::poll_packet_block(@clients[my_key][:socket]) #if we get data from server destined to our client rr,client_ip,client_port=pacc @s.send(rr, 0, @clients[my_key][:ip], @clients[my_key][:port]) # send_packet to client mm=MqttSN::parse_message rr _,port,_,_ = @clients[my_key][:socket].addr dest="#{@server}:#{port}" logger "sc %-24.24s <- %-24.24s | %s",@clients[my_key][:uri],@gateways[@active_gw_id][:uri],mm.to_json @gateways[@active_gw_id][:last_recv]=Time.now.to_i @gateways[@active_gw_id][:counter_recv]+=1 @clients[my_key][:last_send]=Time.now.to_i @clients[my_key][:counter_send]+=1 case mm[:type] when :disconnect @clients[my_key][:state]=:disconnected end end end dest="#{client_ip}:#{client_port}" note "+ %s\n",dest note "cli: #{@clients.to_json}" end @clients[key][:stamp]=Time.now.to_i m=MqttSN::parse_message r case m[:type] when :publish if m[:qos]==-1 @clients[key][:state]=:disconnected #one shot end end sbytes=@clients[key][:socket].send(r, 0, @server, @port) # to rsmb -- ok as is _,port,_,_ = @clients[key][:socket].addr dest="#{@server}:#{port}" @gateways[@active_gw_id][:last_send]=Time.now.to_i @gateways[@active_gw_id][:counter_send]+=1 @clients[key][:last_recv]=Time.now.to_i @clients[key][:counter_recv]+=1 begin if @active_gw_id logger "cs %-24.24s -> %-24.24s | %s", @clients[key][:uri],@gateways[@active_gw_id][:uri],m.to_json else logger "cs %-24.24s -> %-24.24s | %s", @clients[key][:uri],"??",m.to_json end rescue Exception =>e puts "logging fails #{e}" end end end end
gateway_close(cause)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1039 def gateway_close cause @gsem.synchronize do #one command at a time -- if @active_gw_id # if using one, mark it used, so it will be last reused note "Closing gw #{@active_gw_id} cause: #{cause}" @gateways[@active_gw_id][:last_use]=Time.now.to_i if @gateways[@active_gw_id][:socket] @gateways[@active_gw_id][:socket].close @gateways[@active_gw_id][:socket]=nil end @active_gw_id=nil end end end
goto_sleep(duration)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 700 def goto_sleep duration send :disconnect, duration: duration, expect: :disconnect do |status,message| end end
kill_client(key)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 329 def kill_client key puts "Killing Client #{key}:" if c=@clients[key] puts "Really Killing #{key}" send_packet [DISCONNECT_TYPE],@s,c[:ip], c[:port] send_packet [DISCONNECT_TYPE],c[:socket], @server,@port end end
kill_clients()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 338 def kill_clients puts "Killing Clients:" @clients.each do |key,c| puts "Killing #{key}" send_packet [DISCONNECT_TYPE],@s,c[:ip], c[:port] send_packet [DISCONNECT_TYPE],c[:socket], @server,@port end puts "Killing Clients Done." end
log_empty?()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 87 def log_empty? @log_q.empty? or not @verbose end
log_flush()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 91 def log_flush while not log_empty? sleep 0.1 end end
log_thread()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 97 def log_thread while true do begin if not @log_q.empty? l=@log_q.pop puts l else sleep 0.01 end rescue => e puts "Error: receive thread died: #{e}" pp e.backtrace end end end
logger(str,*args)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 58 def logger str,*args s=sprintf(str,*args) if not @forwarder text=sprintf("%s: (%3.3s) | %s",Time.now.iso8601,@active_gw_id,s) else text=sprintf("%s: [%3.3s] | %s",Time.now.iso8601,@options[:gw_id],s) end if @verbose or @debug @log_q << text end if @options[:http_port] and @http_log @http_log << {stamp: Time.now.to_i, text: text} end end
note(str,*args)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 73 def note str,*args begin s=sprintf(str,*args) text=sprintf("%s: %s",Time.now.iso8601,s) @log_q << text if @options[:http_port] and @http_log @http_log << {stamp: Time.now.to_i, text: text} end rescue => e pp e.backtrace puts "note dies: #{e} '#{str}'" end end
open_multicast_recv_port()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 136 def open_multicast_recv_port uri = URI.parse(@broadcast_uri) ip = IPAddr.new(uri.host).hton + IPAddr.new("0.0.0.0").hton s = UDPSocket.new s.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, ip) begin s.setsockopt(:SOL_SOCKET, :SO_REUSEPORT, 1) rescue #on raspian this is needed... puts "WARNING: :SO_REUSEPORT not defined -- guessing it is 15 ;)" s.setsockopt(:SOL_SOCKET, 15, 1) end s.bind(Socket::INADDR_ANY, uri.port) s end
open_multicast_send_port()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 127 def open_multicast_send_port uri = URI.parse(@broadcast_uri) ip = IPAddr.new(uri.host).hton + IPAddr.new("0.0.0.0").hton socket_b = UDPSocket.new socket_b.setsockopt(Socket::IPPROTO_IP, Socket::IP_TTL, [1].pack('i')) socket_b end
pick_new_gateway()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1054 def pick_new_gateway begin gateway_close nil @gsem.synchronize do #one command at a time -- pick=nil pick_t=0 @gateways.each do |gw_id,data| if data[:uri] and data[:status]==:ok if not pick or data[:last_use]==0 or pick_t>data[:last_use] pick=gw_id pick_t=data[:last_use] end end end if pick @active_gw_id=pick note "Opening Gateway #{@active_gw_id}: #{@gateways[@active_gw_id][:uri]}" @s,@server,@port = MqttSN::open_port @gateways[@active_gw_id][:uri] @gateways[@active_gw_id][:socket]=@s @gateways[@active_gw_id][:last_use]=Time.now.to_i else #note "Error: no usable gw found !!" end end rescue => e puts "Error: receive thread died: #{e}" pp e.backtrace end return @active_gw_id end
ping()
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 743 def ping send :ping, timeout: 2,expect: :pong do |status,message| if status==:ok else puts "Error:#{@id} no pong!" end end end
process_broadcast_message(m,client_ip,client_port)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1022 def process_broadcast_message m,client_ip,client_port case m[:type] when :searchgw if @forwarder _,port,_,_=@bcast.addr #logger "ib %-24.24s -> %-24.24s | %s","udp://0.0.0.0:#{port}",@broadcast_uri,m.to_json send_packet_bcast [GWINFO_TYPE,@options[:gw_id]] end when :advertise,:gwinfo gw_id=m[:gw_id] duration=m[:duration]||180 uri="udp://#{client_ip}:1882" add_gateway(gw_id,{uri: uri, source: m[:type], duration:duration,stamp: Time.now.to_i}) end end
process_message(m)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 914 def process_message m done=false case m[:type] when :register @topics[m[:topic]]=m[:topic_id] if not @transfer send :register_ack,topic_id: m[:topic_id], msg_id: m[:msg_id], return_code: 0 done=true end when :disconnect @state=:disconnected if not @transfer when :pub_rel if not @transfer send :pub_comp, msg_id: m[:msg_id] done=true end when :publish if m[:topic_type]==:long m[:topic]=@topics.key(m[:topic_id]) end if not @transfer @dataq<<m if m[:qos]==1 send :publish_ack,topic_id: m[:topic_id], msg_id: m[:msg_id], return_code: 0 elsif m[:qos]==2 send :pub_rec, msg_id: m[:msg_id] end done=true end when :connect_ack @state=:connected when :sub_ack @state=:subscribed when :pong @gsem.synchronize do #one command at a time -- if @active_gw_id and @gateways[@active_gw_id] @gateways[@active_gw_id][:last_ping]=Time.now.to_i end end when :searchgw done=true when :advertise done=true when :gwinfo done=true end # puts "got :#{@id} #{m.to_json}" if @verbose if not done @iq<<m if m end m end
pub(options={})
click to toggle source
toplevel funcs:
# File lib/mqtt-sn-ruby.rb, line 1139 def pub options={} sent=false if options[:qos]==-1 publish options[:topic]||"XX", options[:msg]||"test_value", qos: options[:qos] puts "Sent." else while not sent connect options[:id] do |s,m| if s==:ok publish options[:topic]||"test/message/123", options[:msg]||"test_value", qos: options[:qos] puts "Sent ok." sent=true else disconnect end end end end log_flush end
publish(topic,msg,hash={})
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 764 def publish topic,msg,hash={} if topic[0]=="=" topic[0]="" topic_id=topic.to_i topic_type=:predefined elsif topic.size==2 topic_id=((topic[0].ord&0xff)<<8)+(topic[1].ord&0xff) topic_type=:short else topic_type=:long if not @topics[topic] register_topic topic end topic_id=@topics[topic] end case hash[:qos] when 1 send :publish,msg: msg, retain: hash[:retain], topic_id: topic_id, topic_type: topic_type, qos: 1, expect: [:publish_ack] do |s,m| if s==:ok end end when 2 send :publish,msg: msg, retain: hash[:retain], topic_id: topic_id, topic_type: topic_type, qos: 2, expect: [:pubrec] do |s,m| if s==:ok if m[:type]==:pubrec send :pubrel,msg_id: m[:msg_id], expect: :pubcomp do |s,m| end end end end else send :publish,msg: msg, retain: hash[:retain],topic_id: topic_id, topic_type: topic_type, qos: hash[:qos]||0 end end
register_topic(topic)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 752 def register_topic topic send :register,topic: topic, expect: :register_ack do |s,m| if s==:ok @topics[topic]=m[:topic_id] else raise "Error:#{@id} Register topic #{topic} failed!" end #pp @topics end @topics[topic] end
roam_thread(socket)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1085 def roam_thread socket @last_bcast=0 if @forwarder Thread.new do while true do send_packet_bcast [ADVERTISE_TYPE,@options[:gw_id],@bcast_period>>8,@bcast_period&0xff] sleep @bcast_period end end elsif @autodiscovery #client should try to find some gateways.. Thread.new do while true do send :searchgw #replies may or may not come -- even multiple! if @gateways=={} sleep 5 else #pp @gateways sleep 30 end end end Thread.new do while true do if @active_gw_id and @gateways[@active_gw_id] and @gateways[@active_gw_id][:socket] else # not so ok -- pick one! #pick_new_gateway end sleep 0.01 end end end while true do begin if @bcast pac=MqttSN::poll_packet_block(socket) r,client_ip,client_port=pac m=MqttSN::parse_message r if @debug and m m[:debug]=MqttSN::hexdump r end _,port,_,_ = @bcast.addr src="udp://#{client_ip}:#{client_port}" logger "ib %-24.24s <- %-24.24s | %s",@broadcast_uri,src,m.to_json process_broadcast_message m,client_ip,client_port end rescue => e puts "Error: receive thread died: #{e}" pp e.backtrace end end end
send(type,hash={})
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 369 def send type,hash={},&block if @state==:disconnected and type!=:connect and type!=:will_topic and type!=:will_msg and type!=:searchgw if type==:disconnect return #already disconnected.. nothing to do elsif type==:publish and hash[:qos]==-1 else note "Error: Cannot #{type} while unconnected, send :connect first!" return nil end end case type when :connect if not hash[:id] or hash[:id]="" hash[:id]="mqtt-sn-ruby-#{$$}" end note "Connecting as '#{hash[:id]}'" flags=0 flags+=CLEAN_FLAG if hash[:clean] flags+=RETAIN_FLAG if hash[:retain] flags+=WILL_FLAG if @will_topic p=[CONNECT_TYPE,flags,0x01,hash[:duration]>>8 ,hash[:duration] & 0xff] hash[:id].each_byte do |b| p<<b end @id=hash[:id] when :register raise "Need :topic to Publish!" if not hash[:topic] p=[REGISTER_TYPE,0,0,@msg_id >>8 ,@msg_id & 0xff] hash[:topic].each_byte do |b| p<<b end @msg_id+=1 when :register_ack p=[REGACK_TYPE,hash[:topic_id]>>8 ,hash[:topic_id] & 0xff,hash[:msg_id]>>8 ,hash[:msg_id] & 0xff,hash[:return_code]] when :publish_ack p=[PUBACK_TYPE,hash[:topic_id]>>8 ,hash[:topic_id] & 0xff,hash[:msg_id]>>8 ,hash[:msg_id] & 0xff,hash[:return_code]] when :pub_rec p=[PUBREC_TYPE,hash[:msg_id]>>8 ,hash[:msg_id] & 0xff] when :pub_comp p=[PUBCOMP_TYPE,hash[:msg_id]>>8 ,hash[:msg_id] & 0xff] when :will_topic raise "Need :topic to :will_topic" if not hash[:topic] p=[WILLTOPIC_TYPE,0] hash[:topic].each_byte do |b| p<<b end when :will_topic_upd raise "Need :topic to :will_topic_upd" if not hash[:topic] p=[WILLTOPICUPD_TYPE,0] hash[:topic].each_byte do |b| p<<b end when :will_msg raise "Need :msg to :will_msg" if not hash[:msg] p=[WILLMSG_TYPE] hash[:msg].each_byte do |b| p<<b end when :will_msg_upd raise "Need :msg to :will_msg_upd" if not hash[:msg] p=[WILLMSGUPD_TYPE] hash[:msg].each_byte do |b| p<<b end when :subscribe raise "Need :topic to :subscribe" if not hash[:topic] qos=hash[:qos]||0 flags=0 if qos==-1 flags+=QOSM1_FLAG else flags+=QOS1_FLAG*qos end p=[SUBSCRIBE_TYPE,flags,@msg_id >>8 ,@msg_id & 0xff] hash[:topic].each_byte do |b| p<<b end @msg_id+=1 when :unsubscribe raise "Need :topic to :unsubscribe" if not hash[:topic] p=[UNSUBSCRIBE_TYPE,0,@msg_id >>8 ,@msg_id & 0xff] hash[:topic].each_byte do |b| p<<b end @msg_id+=1 when :publish raise "Need :topic_id to Publish!" if not hash[:topic_id] qos=hash[:qos]||0 flags=0 flags+=RETAIN_FLAG if hash[:retain] if qos==-1 flags+=QOSM1_FLAG else flags+=QOS1_FLAG*qos end if hash[:topic_type]==:short flags+=TOPIC_SHORT_FLAG elsif hash[:topic_type]==:predefined flags+=TOPIC_PREDEFINED_FLAG end p=[PUBLISH_TYPE,flags,hash[:topic_id] >>8 ,hash[:topic_id] & 0xff,@msg_id >>8 ,@msg_id & 0xff] hash[:msg].each_byte do |b| p<<b.ord end @msg_id+=1 when :pubrel raise "Need the original :msg_id of the Publish for PubRel!" if not hash[:msg_id] p=[PUBREL_TYPE,hash[:msg_id] >>8 ,hash[:msg_id] & 0xff] when :ping p=[PINGREQ_TYPE] when :searchgw p=[SEARCHGW_TYPE,0] when :disconnect if hash[:duration] p=[DISCONNECT_TYPE,hash[:duration] >>8 ,hash[:duration] & 0xff] else p=[DISCONNECT_TYPE] end else puts "Error: Strange send?? #{type}" return nil end status=:timeout m={} if not hash[:expect] if type==:searchgw raw=send_packet_bcast p else raw=send_packet_gw p end return end @sem.synchronize do #one command at a time -- if hash[:expect] while not @iq.empty? mp=@iq.pop puts "WARN:#{@id} ************** Purged message: #{mp}" end @iq.clear end if type==:searchgw raw=send_packet_bcast p else raw=send_packet_gw p end hash[:debug]=raw if @debug #puts "send:#{@id} #{type},#{hash.to_json}" if @verbose timeout=hash[:timeout]||Tretry retries=0 if hash[:expect] while retries<Nretry do stime=Time.now.to_i while Time.now.to_i<stime+timeout if not @iq.empty? m=@iq.pop if Array(hash[:expect]).include? m[:type] status=:ok break else puts "WARN:#{@id} ************** Discarded message: #{m}" end end sleep 0.1 end if status==:ok break else retries+=1 send_packet_gw p puts "fail to get ack, retry #{retries} :#{@id} #{type},#{hash.to_json}" #need to set DUP flag ! end end if status==:timeout note "Warn: ack timeouted, assume disconnected" @state=:disconnect gateway_close :timeout end end end #sem if block block.call status,m end end
send_packet(m,socket,server,port)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 567 def send_packet m,socket,server,port msg=MqttSN::build_packet m MqttSN::send_raw_packet msg,socket,server,port dest="#{server}:#{port}" _,port,_,_ = socket.addr src=":#{port}" logger "od %-18.18s <- %-18.18s | %s",dest,src,MqttSN::parse_message(msg).to_json end
send_packet_bcast(m)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 576 def send_packet_bcast m uri = URI.parse(@broadcast_uri) msg=MqttSN::build_packet m MqttSN::send_raw_packet msg,@bcast_s,uri.host,uri.port _,port,_,_ = @bcast_s.addr src="udp://0.0.0.0:#{port}" logger "ob %-24.24s <- %-24.24s | %s",@broadcast_uri,src,MqttSN::parse_message(msg).to_json end
send_packet_gw(m)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 585 def send_packet_gw m msg=MqttSN::build_packet m waits=0 debug={} debug[:debug]=MqttSN::hexdump(msg) if @debug if not @active_gw_id or not @gateways[@active_gw_id] or not @gateways[@active_gw_id][:socket] note "No active gw, wait ." while not @active_gw_id or not @gateways[@active_gw_id] or not @gateways[@active_gw_id][:socket] ret="-" if not ret=pick_new_gateway sleep 0.5 #print "." end waits+=1 if waits>30 puts "\nNone Found -- not sending" return end end note "Gw Ok!" end ok=false @gsem.synchronize do if @active_gw_id and @gateways[@active_gw_id] and @gateways[@active_gw_id][:socket] ok=true @gateways[@active_gw_id][:last_send]=Time.now.to_i @gateways[@active_gw_id][:counter_send]+=1 uri=URI.parse(@gateways[@active_gw_id][:uri]) #uri.scheme MqttSN::send_raw_packet msg,@gateways[@active_gw_id][:socket],uri.host,uri.port _,port,_,_ = @gateways[@active_gw_id][:socket].addr src="udp://0.0.0.0:#{port}" logger "od %-24.24s <- %-24.24s | %s",@gateways[@active_gw_id][:uri],src,MqttSN::parse_message(msg).merge(debug).to_json ###utf-8 end end if not ok puts "no gw to send.." sleep 1 end end
sub(options={})
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 1160 def sub options={},&block loop do note "Connecting.." connect options[:id] do |cs,cm| note "connect result: #{cs} #{cm}" if cs==:ok note "Subscribing.." subscribe options[:topic]||"test/message/123", qos: options[:qos] do |s,m| if s==:sub_ack note "Subscribed Ok! Waiting for Messages!" elsif s==:disconnect note "Disconnected -- switch to new gateway" else if block block.call s,m else note "Got Message: #{s}: #{m}" end end end end end puts "Disconnected..." end end
subscribe(topic,hash={}) { |status, message| ... }
click to toggle source
Mid-level function to send Subscription to Broker, if code block is provided, it will run the block when new messages are received. When subsciption is established with Broker, a :sub_ack message is given to code block. Incoming messages are indicated with :got_data status. If connection to Broker is disrupted, a :disconnect message is given.!
# File lib/mqtt-sn-ruby.rb, line 711 def subscribe topic,hash={},&block # :yields: status, message send :subscribe, topic: topic, qos: hash[:qos],expect: :sub_ack do |s,m| if s==:ok if m[:topic_id] and m[:topic_id]>0 #when subs topic has no wild cards, we get topic id here: if m[:topic_type]==:long @topics[topic]=m[:topic_id] end end end if block block.call :sub_ack,m while true if not @dataq.empty? m=@dataq.pop block.call :got_data,m end sleep 0.1 if @state==:disconnected block.call :disconnect,{} #gateway_close :subscribe_disconnected break end end end end end
unsubscribe(topic)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 738 def unsubscribe topic send :unsubscribe, topic: topic, expect: :unsub_ack do |s,m| end end
will_and_testament(topic,msg)
click to toggle source
# File lib/mqtt-sn-ruby.rb, line 651 def will_and_testament topic,msg if @state!=:disconnected #if already connected, send changes, otherwise wait until connect does it. if @will_topic!=topic send :will_topic_upd, topic: topic, expect: :will_topic_resp do |status,message| puts "will topic updated" if status==:ok else puts "Error:#{@id} no pong!" end end end if @will_msg!=msg send :will_msg_upd, msg: msg, expect: :will_msg_resp do |status,message| puts "will msg updated" if status==:ok else puts "Error:#{@id} no pong!" end end end end @will_topic=topic @will_msg=msg end