module LoraRb::Call
It contains all the methods for selecting the items
It contains all the methods for selecting the items
It contains all the methods for selecting the items
It contains all the methods for selecting the items
It contains all the methods for selecting the items
Attributes
client[R]
contex[R]
ssl_socket[R]
tcp_socket[R]
Private Instance Methods
_http_headers(headers={})
click to toggle source
# File lib/lora-rb/resiot/http/call.rb, line 65 def _http_headers(headers={}) headers.merge('Content-Type' => 'application/json', 'Authorization' => @token) end
all_enqueued_downlink_id(enqueued_messages:nil, eui:nil, request_id:nil, debug:false)
click to toggle source
It returns and array of ids from enqueued downlink response Response example: {“reply”=>[
{"confirmed"=>true, "id"=>5034, "payload"=>"0200", "port"=>40, "priority"=>0}, {"confirmed"=>true, "id"=>5035, "payload"=>"0200", "port"=>40, "priority"=>0}, {"confirmed"=>true, "id"=>5036, "payload"=>"0200", "port"=>40, "priority"=>0}, {"confirmed"=>true, "id"=>5037, "payload"=>"0200", "port"=>40, "priority"=>0}], "status"=>200}]
Returns:
- 5034, 5035, 5036, 5037
# File lib/lora-rb/a2a/mqtt/call.rb, line 283 def all_enqueued_downlink_id(enqueued_messages:nil, eui:nil, request_id:nil, debug:false) enqueued_messages ||= sub_get_enqueued_messages(eui: eui, request_id: request_id, debug: debug).last if enqueued_messages['status'] != 200 error = "Cannot process enqueued messages since status=#{enqueued_messages['status']}" if enqueued_messages['status'] == 404 raise EuiNotFound, error else raise error end end return [] if enqueued_messages['reply'].nil? raise "An array was expected not: #{enqueued_messages['reply']}" unless enqueued_messages['reply'].is_a?(Array) res = enqueued_messages['reply'].map do |message| message['id'] end puts "#{res}" if debug == :full res end
generate_request_id()
click to toggle source
# File lib/lora-rb/a2a/mqtt/call.rb, line 269 def generate_request_id "#{Time.now.strftime('%y%m%d%H%M%S')}#{SecureRandom.hex(1)}" end
get_eui_from_topic(topic)
click to toggle source
# File lib/lora-rb/a2a/mqtt/call.rb, line 259 def get_eui_from_topic(topic) res = topic&.match(/devices\/(\w{16})/) res[1] if res end
get_port_from_topic(topic)
click to toggle source
# File lib/lora-rb/a2a/mqtt/call.rb, line 264 def get_port_from_topic(topic) res = topic&.match(/uplink\/(\d+)(\/|\Z)/) res[1] if res end
sub_delete_enqueued_messages(eui:, id: nil, debug: false)
click to toggle source
Delete one or all enqueued messages from the network queue
# File lib/lora-rb/a2a/mqtt/call.rb, line 220 def sub_delete_enqueued_messages(eui:, id: nil, debug: false) request_id = generate_request_id ids_to_delete = id ? [id] : all_enqueued_downlink_id(eui: eui, request_id: request_id, debug: debug) queued_response_url = @downlink_response_urls.philter({name: :queued}, get: :url).first raise "queued_topic not found in the config yml!" unless queued_response_url queued_response_url = merge_tags_to_url(queued_response_url, username: @username, appid: @appid, deveui: eui, clientid: @client_id, requestid: request_id) puts " response_topic #{queued_response_url}" if debug @client.subscribe(queued_response_url) responses = [] ids_to_delete.each do |id_to_delete| # Check if there are previous messages to clear delete_enqueued_downlink_url = merge_tags_to_url(@delete_enqueued_downlink_url, username: @username, appid: @appid, deveui: eui, id: id_to_delete, clientid: @client_id, requestid: request_id) puts " delete_enqueued_downlink_url #{delete_enqueued_downlink_url}" if debug # Request enqueued items @client.publish(delete_enqueued_downlink_url, ''.to_json, false) queued_response_url, response_message = sub_read_data(topic: nil, debug: debug) puts " Found response #{response_message} " if debug == :full responses << response_message end @client.unsubscribe(queued_response_url) return queued_response_url, responses end
sub_get_enqueued_messages(eui:, request_id: nil, debug: false)
click to toggle source
Get enqueued messages from the network queue
# File lib/lora-rb/a2a/mqtt/call.rb, line 185 def sub_get_enqueued_messages(eui:, request_id: nil, debug: false) request_id ||= generate_request_id raise "eui cannot be empty!" unless eui queued_response_url = @downlink_response_urls.philter({name: :queued}, get: :url).first raise "queued_topic not found in the config yml!" unless queued_response_url queued_response_url = merge_tags_to_url(queued_response_url, username: @username, appid: @appid, deveui: eui, clientid: @client_id, requestid: request_id) puts " queued_response_url #{queued_response_url}" if debug @client.subscribe(queued_response_url) # Check if there are previous messages to clear enqueued_downlinks_url = merge_tags_to_url(@enqueued_downlinks_url, username: @username, appid: @appid, deveui: eui, clientid: @client_id, requestid: request_id) puts " enqueued_downlinks_url #{enqueued_downlinks_url}" if debug # Request enqueued items @client.publish(enqueued_downlinks_url, ''.to_json, false) response_topic, response_message = sub_read_data(topic: nil, debug: debug) puts " Topic #{response_topic} response #{response_message} " if debug == :full @client.unsubscribe(queued_response_url) return response_topic, response_message end
sub_initialize(options={})
click to toggle source
# File lib/lora-rb/a2a/mqtt/call.rb, line 10 def sub_initialize(options={}) require 'mqtt' raise 'Specify uplink_url to continue!' unless options[:uplink_url] raise 'Specify downlink_url to continue!' unless options[:downlink_url] raise 'Specify downlink_response_urls to continue!' unless options[:downlink_response_urls] raise 'Specify host to continue!' unless options[:host] raise 'Specify port to continue!' unless options[:port] raise 'Specify username to continue!' unless options[:username] raise 'Specify password to continue!' unless options[:password] raise 'Specify ssl_file to continue!' if options[:ssl] && !options[:ssl_file] @username = options[:username] @client_id = "#{@username}::ssg#{generate_request_id}" connection_attributes = { host: options[:host], port: options[:port], ssl: options[:ssl], username: @username, password: options[:password], client_id: @client_id } connection_attributes[:cert_file] = options[:ssl_file] if connection_attributes[:ssl] # The class variable @@mock is set by the stubbed class. The other methods (send_cmd etc.) are entirely stubbed. @client = MQTT::Client.connect(connection_attributes) unless defined?(@@mock) && @@mock @topic = merge_tags_to_url(options[:uplink_url], { username: @username, appid: @appid }, { debug: options[:debug] }) @downlink_url = options[:downlink_url] @downlink_response_urls = options[:downlink_response_urls] @enqueued_downlinks_url = options[:enqueued_downlinks_url] @delete_enqueued_downlink_url = options[:delete_enqueued_downlink_url] @timeout = options[:timeout] @wait_response = options.has_key?(:wait_response) ? options[:wait_response] : true {'hello' => 'Lora-Rb: Ready to start!'} end
sub_listen(options={}, &block)
click to toggle source
Waiting for message in the queue
# File lib/lora-rb/a2a/mqtt/call.rb, line 152 def sub_listen(options={}, &block) topic = options.has_key?(:topic) ? options[:topic] : @topic if topic topic = topic.dup puts " Waiting for messages in #{topic}. To exit press CTRL+C" if options[:debug] end @client.get(topic) do |topic, message| # Block is executed for every message received puts " original json: #{topic}: #{message}" if options[:debug] == :full message = JSON.parse(message) if message.respond_to? '[]' eui = get_eui_from_topic(topic) message['eui'] ||= eui if eui port = get_port_from_topic(topic) message['port'] ||= port if port if @encoding payload_key = %w(payload data).find { |tag| message[tag] } message[payload_key] = lora_decode(message[payload_key]) if payload_key end end puts " #{message}" if options[:debug] #After reworking block.call(topic, message) if block_given? break if options[:test] end end
sub_quit()
click to toggle source
Close the connection
# File lib/lora-rb/a2a/mqtt/call.rb, line 180 def sub_quit @client.disconnect end
sub_read_data(options={})
click to toggle source
Receive the payload from the network There is a timeout. Use this method only to retrieve data from the queue. If you have to waiting data please use listen. If topic is nil it uses subscribed topics
# File lib/lora-rb/a2a/mqtt/call.rb, line 121 def sub_read_data(options={}) topic = options.has_key?(:topic) ? options[:topic] : @topic if topic topic = topic.dup puts " Reading topic #{topic}..." if options[:debug] end message = nil begin Timeout.timeout(options[:timeout] || @timeout) do topic, message = @client.get(topic) end rescue Timeout::Error message = {error: 'Timeout'}.to_json end message = JSON.parse(message) if message.respond_to? '[]' eui = get_eui_from_topic(topic) message['eui'] ||= eui if eui port = get_port_from_topic(topic) message['port'] ||= port if port if @encoding payload_key = %w(payload data).find { |tag| message[tag] } message[payload_key] = lora_decode(message[payload_key]) if payload_key end end return topic, message end
sub_send_cmd(options={})
click to toggle source
Send the request to device
# File lib/lora-rb/a2a/mqtt/call.rb, line 52 def sub_send_cmd(options={}) options = { wait_response: @wait_response, delete_previous_enqueued_commands: true }.merge(options) sub_delete_enqueued_messages(eui: options[:eui], debug: options[:debug]) if options[:delete_previous_enqueued_commands] h_request = { "port": options[:port], # Port where the message should be sent "payload": (@encoding ? lora_encode(options[:data]) : options[:data]), # Message payload "confirmed": options[:confirmed], # (Optional) Tells whether a confirmed downlink is requested. Default: false } # (Optional) Specifies which rx window should be used: RX1, RX2 or any(BOTH). Default: BOTH h_request['window'] = options[:window] if options[:window] # (Optional) Message priority, used to sort messages in the queue. Default: 0 h_request['priority'] = options[:priority] if options[:priority] request_id = generate_request_id publish_url = merge_tags_to_url(@downlink_url, username: @username, appid: @appid, deveui: options[:eui], clientid: @client_id, requestid: request_id) puts " publish #{h_request.to_json} to #{publish_url}" if options[:debug] responses = [] response_topics = [] # thr_response = nil if options[:wait_response] @downlink_response_urls.each do |dru_hash| name, response_url = dru_hash[:name], dru_hash[:url] next unless response_url # response_topic = "reply/#{@client_id}/id/#{request_id}" response_topic = merge_tags_to_url(response_url, username: @username, appid: @appid, deveui: options[:eui], clientid: @client_id, requestid: request_id) puts " Subscribing response #{response_topic}" if options[:debug] == :full response_topics << response_topic end raise "cannot subscribe without topic, response_topics is empty!" if response_topics.empty? @client.subscribe(*response_topics) end @client.publish(publish_url, h_request.to_json, false) if options[:wait_response] # Waiting for all the responses # thr_responses.each { |thr_response| thr_response.join } # thr_response.join @downlink_response_urls.each do |dru_hash| response_topic, response_message = sub_read_data(topic: nil, debug: options[:debug]) response = { topic: response_topic, json: response_message } puts " Found response #{response} " if options[:debug] == :full responses << response end @client.unsubscribe(*response_topics) end responses end