class Orchestrator::Device::Processor
Constants
- CONFIG_DEFAULTS
- DUMMY_RESOLVER
- FAILURE
- SEND_DEFAULTS
Any command that waits: send(‘power on?’).then( execute after command complete ) Named commands mean the promise may never resolve Totally replaces emit as we don’t care and makes cross module request super easy – non-wait commands resolve after they have been written to the socket!!
- SUCCESS
- TERMINATE_MSG
- UNNAMED
Attributes
config[R]
last_receive_at[R]
For statistics only
last_sent_at[R]
For statistics only
queue[R]
thread[R]
timeout[R]
For statistics only
transport[RW]
Public Class Methods
new(man)
click to toggle source
init -> mod.load -> post_init So config can be set in on_load if desired
# File lib/orchestrator/device/processor.rb, line 75 def initialize(man) @man = man @thread = @man.thread @logger = @man.logger @defaults = SEND_DEFAULTS.dup @config = CONFIG_DEFAULTS.dup @queue = CommandQueue.new(@thread, method(:send_next)) @responses = [] @wait = false @connected = false @checking = Mutex.new @bonus = 0 @last_sent_at = 0 @last_receive_at = 0 # Used to indicate when we can start the next response processing @head = ::Libuv::Q::ResolvedPromise.new(@thread, true) @tail = ::Libuv::Q::ResolvedPromise.new(@thread, true) # Method variables @resolver = proc { |resp| @thread.schedule { resolve_callback(resp) } } @resp_success = proc { |result| @thread.next_tick { resp_success(result) } } @resp_failure = proc { |reason| @thread.next_tick { resp_failure(reason) } } end
Public Instance Methods
buffer(data)
click to toggle source
# File lib/orchestrator/device/processor.rb, line 173 def buffer(data) @last_receive_at = @thread.now if @buffer @responses.concat @buffer.extract(data) else # tokenizing buffer above will enforce encoding if @config[:encoding] data.force_encoding(@config[:encoding]) end @responses << data end # if we are waiting we don't want to process this data just yet if !@wait check_next end end
check_next()
click to toggle source
# File lib/orchestrator/device/processor.rb, line 196 def check_next return if @checking.locked? || @responses.length <= 0 @checking.synchronize { loop do check_data(@responses.shift) break if @wait || @responses.length == 0 end } end
config=(options)
click to toggle source
# File lib/orchestrator/device/processor.rb, line 111 def config=(options) @config.merge!(options) # use tokenize to signal a buffer update new_buffer if options.include?(:tokenize) end
connected()
click to toggle source
Callbacks ————————-
# File lib/orchestrator/device/processor.rb, line 148 def connected @connected = true new_buffer @man.notify_connected if @config[:update_status] @man.trak(:connected, true) end end
disconnected()
click to toggle source
# File lib/orchestrator/device/processor.rb, line 157 def disconnected @connected = false @man.notify_disconnected if @config[:update_status] @man.trak(:connected, false) end if @buffer && @config[:flush_buffer_on_disconnect] check_data(@buffer.flush) end @buffer = nil if @queue.waiting resp_failure(:disconnected) end end
queue_command(options)
click to toggle source
Public interface
# File lib/orchestrator/device/processor.rb, line 119 def queue_command(options) # Make sure we are sending appropriately formatted data raw = options[:data] if raw.is_a?(Array) options[:data] = array_to_str(raw) elsif options[:hex_string] == true options[:data] = hex_to_byte(raw) end data = options[:data] options[:retries] = 0 if options[:wait] == false if options[:name].is_a? String options[:name] = options[:name].to_sym end # merge in the defaults options = @defaults.merge(options) @queue.push(options, options[:priority] + @bonus) rescue => e options[:defer].reject(e) @logger.print_error(e, 'error queuing command') end
send_options(options)
click to toggle source
Helper functions ——————
# File lib/orchestrator/device/processor.rb, line 107 def send_options(options) @defaults.merge!(options) end
terminate()
click to toggle source
# File lib/orchestrator/device/processor.rb, line 192 def terminate @thread.schedule method(:do_terminate) end
Protected Instance Methods
call_emit(cmd)
click to toggle source
If a callback was in place for the current
# File lib/orchestrator/device/processor.rb, line 356 def call_emit(cmd) callback = cmd[:emit] if callback @thread.next_tick do begin callback.call rescue => e @logger.print_error(e, 'error in emit callback') end end end end
check_data(data)
click to toggle source
Check transport response data
# File lib/orchestrator/device/processor.rb, line 232 def check_data(data) resp = nil # Provide commands with a bonus in this section @bonus = @config[:priority_bonus] begin cmd = @queue.waiting if cmd @wait = true @defer = @thread.defer @defer.promise.then @resp_success, @resp_failure # Disconnect before processing the response transport.disconnect if cmd[:force_disconnect] # Send response, early resolver and command resp = @man.notify_received(data, @resolver, cmd) else resp = @man.notify_received(data, DUMMY_RESOLVER, nil) # Don't need to trigger Queue next here as we are not waiting on anything end rescue => e # NOTE:: This error should never be called @logger.print_error(e, 'error processing response data') @defer.reject :abort if @defer ensure @bonus = 0 end # Check if response is a success or failure resolve_callback(resp) unless resp == :async end
clear_timers()
click to toggle source
# File lib/orchestrator/device/processor.rb, line 438 def clear_timers @timeout.cancel if @timeout @timeout = nil end
do_terminate()
click to toggle source
# File lib/orchestrator/device/processor.rb, line 224 def do_terminate if @queue.waiting @queue.waiting[:defer].reject(TERMINATE_MSG) end @queue.cancel_all(TERMINATE_MSG) end
new_buffer()
click to toggle source
# File lib/orchestrator/device/processor.rb, line 210 def new_buffer tokenize = @config[:tokenize] if tokenize if tokenize.respond_to? :call @buffer = tokenize.call else @buffer = ::UV::BufferedTokenizer.new(@config) end elsif @buffer # remove the buffer if none @buffer = nil end end
process_send(command)
click to toggle source
# File lib/orchestrator/device/processor.rb, line 393 def process_send(command) # delay on receive if command[:delay_on_receive] > 0 gap = @last_receive_at + command[:delay_on_receive] - @thread.now if gap > 0 defer = @thread.defer sched = schedule.in(gap) do defer.resolve(process_send(command)) end # in case of shutdown we need to resolve this promise sched.catch do defer.reject(:shutdown) end defer.promise else transport_send(command) end else transport_send(command) end end
resolve_callback(resp)
click to toggle source
# File lib/orchestrator/device/processor.rb, line 266 def resolve_callback(resp) if @defer if FAILURE.include? resp @defer.reject resp else @defer.resolve resp end @defer = nil end end
resp_failure(result_raw)
click to toggle source
# File lib/orchestrator/device/processor.rb, line 277 def resp_failure(result_raw) if @queue.waiting begin result = result_raw.is_a?(Fixnum) ? :timeout : result_raw cmd = @queue.waiting debug = "with #{result}: <#{cmd[:name] || UNNAMED}> " if cmd[:data] debug << "#{cmd[:data].inspect}" else debug << cmd[:path] end @logger.debug "command failed #{debug}" if cmd[:retries] == 0 err = Error::CommandFailure.new "command aborted #{debug}" cmd[:defer].reject(err) @logger.warn err.message else cmd[:retries] -= 1 cmd[:wait_count] = 0 # reset our ignore count @queue.push(cmd, cmd[:priority] + @config[:priority_bonus]) end rescue => e # Prevent the queue from ever pausing - this should never be called @logger.print_error(e, 'error handling request failure') end end clear_timers @wait = false @queue.waiting = nil check_next # Process already received @queue.shift if @connected # Then send a new command end
resp_success(result)
click to toggle source
We only care about queued commands here Promises resolve on the next tick so processing
is guaranteed to have completed
Check for queue wait as we may have gone offline
# File lib/orchestrator/device/processor.rb, line 317 def resp_success(result) if @queue.waiting && result && result != :ignore if result == :abort cmd = @queue.waiting err = Error::CommandFailure.new "module aborted command with #{result}: <#{cmd[:name] || UNNAMED}> #{(cmd[:data] || cmd[:path]).inspect}" @queue.waiting[:defer].reject(err) else @queue.waiting[:defer].resolve(result) call_emit @queue.waiting end clear_timers @wait = false @queue.waiting = nil check_next # Process pending @queue.shift # Send the next command # Else it must have been a nil or :ignore elsif @queue.waiting cmd = @queue.waiting cmd[:wait_count] ||= 0 cmd[:wait_count] += 1 if cmd[:wait_count] > cmd[:max_waits] resp_failure(:max_waits_exceeded) else check_next end else # ensure consistent state (offline event may have occurred) clear_timers @wait = false check_next end end
schedule()
click to toggle source
# File lib/orchestrator/device/processor.rb, line 443 def schedule @schedule ||= @man.get_scheduler end
send_next(command)
click to toggle source
Callback for queued commands
# File lib/orchestrator/device/processor.rb, line 371 def send_next(command) # Check for any required delays between sends if command[:delay] > 0 gap = @last_sent_at + command[:delay] - @thread.now if gap > 0 defer = @thread.defer sched = schedule.in(gap) do defer.resolve(process_send(command)) end # in case of shutdown we need to resolve this promise sched.catch do defer.reject(:shutdown) end defer.promise else process_send(command) end else process_send(command) end end
transport_send(command)
click to toggle source
# File lib/orchestrator/device/processor.rb, line 417 def transport_send(command) @transport.transmit(command) @last_sent_at = @thread.now if @queue.waiting # Set up timers for command timeout @timeout = schedule.in(command[:timeout], @resp_failure) else # resole the send promise early as we are not waiting for the response command[:defer].resolve(:no_wait) call_emit command # the command has been sent end # Useful for emergency stops etc if command[:clear_queue] @queue.cancel_all("Command #{command[:name]} cleared the queue") end nil # ensure promise chain is not propagated end