class Wamp::Worker::Proxy::Dispatcher
Constants
- TIMEOUT
We want to timeout every few seconds so higher level code can look for a shutdown
Attributes
session[RW]
Public Class Methods
new(name, session=nil, uuid: nil)
click to toggle source
Constructor
Calls superclass method
Wamp::Worker::Proxy::Base::new
# File lib/wamp/worker/proxy/dispatcher.rb, line 16 def initialize(name, session=nil, uuid: nil) super name, uuid: uuid self.session = session end
Public Instance Methods
check_queues()
click to toggle source
Check the queues
# File lib/wamp/worker/proxy/dispatcher.rb, line 29 def check_queues check_queue [self.command_req_queue, self.background_res_queue] end
increment_ticker()
click to toggle source
Increments the ticker
# File lib/wamp/worker/proxy/dispatcher.rb, line 23 def increment_ticker self.ticker.increment end
process(descriptor)
click to toggle source
Executes the request
@param request [Descriptor] - The request
# File lib/wamp/worker/proxy/dispatcher.rb, line 36 def process(descriptor) return unless descriptor != nil raise(RuntimeError, "must have a session to process a descriptor") unless self.session != nil # Create the callback callback = -> result, error, details { # Need to remove the session from the details response details&.delete(:session) # Create the params params = { result: result, error: error, details: details } # Push the response back self.queue.push descriptor.handle, descriptor.command, params } # Call the session if descriptor.command == :call # invoke the call method procedure = descriptor.params[:procedure] args = descriptor.params[:args] kwargs = descriptor.params[:kwargs] options = descriptor.params[:options] self.session.call(procedure, args, kwargs, options, &callback) elsif descriptor.command == :publish # invoke the publish method topic = descriptor.params[:topic] args = descriptor.params[:args] kwargs = descriptor.params[:kwargs] options = descriptor.params[:options] self.session.publish(topic, args, kwargs, options, &callback) elsif descriptor.command == :yield # invoke the yield method request = descriptor.params[:request] options = descriptor.params[:options] check_defer = descriptor.params[:check_defer] result_hash = descriptor.params[:result] || {} # Get the response from the descriptor params result = Wamp::Client::Response.from_hash(result_hash) self.session.yield(request, result, options, check_defer) else # Return error if the command is not supported error = Wamp::Client::Response::CallError.new( Wamp::Client::Response::DEFAULT_ERROR, ["unsupported proxy command '#{descriptor.command}'"]) callback.call(nil, error.to_hash, {}) end end
Private Instance Methods
check_queue(queue_name)
click to toggle source
This methods blocks waiting for a value to appear in the queue
@param queue_name [String] - the name of the queue
# File lib/wamp/worker/proxy/dispatcher.rb, line 104 def check_queue(queue_name) # Wait for a value to appear in the queue. We have a timeout so # the thread can check if the worker has been killed self.queue.pop(queue_name, wait: true, timeout: TIMEOUT) end