class Wamp::Worker::Proxy::Dispatcher

Constants

TIMEOUT

We want to timeout every few seconds so higher level code can look for a shutdown

Attributes

session[RW]

Public Class Methods

new(name, session=nil, uuid: nil) click to toggle source

Constructor

Calls superclass method Wamp::Worker::Proxy::Base::new
# File lib/wamp/worker/proxy/dispatcher.rb, line 16
def initialize(name, session=nil, uuid: nil)
  super name, uuid: uuid
  self.session = session
end

Public Instance Methods

check_queues() click to toggle source

Check the queues

# File lib/wamp/worker/proxy/dispatcher.rb, line 29
def check_queues
  check_queue [self.command_req_queue, self.background_res_queue]
end
increment_ticker() click to toggle source

Increments the ticker

# File lib/wamp/worker/proxy/dispatcher.rb, line 23
def increment_ticker
  self.ticker.increment
end
process(descriptor) click to toggle source

Executes the request

@param request [Descriptor] - The request

# File lib/wamp/worker/proxy/dispatcher.rb, line 36
def process(descriptor)
  return unless descriptor != nil

  raise(RuntimeError, "must have a session to process a descriptor") unless self.session != nil

  # Create the callback
  callback = -> result, error, details {
    # Need to remove the session from the details response
    details&.delete(:session)

    # Create the params
    params = { result: result, error: error, details: details }

    # Push the response back
    self.queue.push descriptor.handle, descriptor.command, params
  }

  # Call the session
  if descriptor.command == :call

    # invoke the call method
    procedure = descriptor.params[:procedure]
    args = descriptor.params[:args]
    kwargs = descriptor.params[:kwargs]
    options = descriptor.params[:options]

    self.session.call(procedure, args, kwargs, options, &callback)

  elsif descriptor.command == :publish

    # invoke the publish method
    topic = descriptor.params[:topic]
    args = descriptor.params[:args]
    kwargs = descriptor.params[:kwargs]
    options = descriptor.params[:options]

    self.session.publish(topic,  args, kwargs, options, &callback)

  elsif descriptor.command == :yield

    # invoke the yield method
    request = descriptor.params[:request]
    options = descriptor.params[:options]
    check_defer = descriptor.params[:check_defer]
    result_hash = descriptor.params[:result] || {}

    # Get the response from the descriptor params
    result = Wamp::Client::Response.from_hash(result_hash)

    self.session.yield(request, result, options, check_defer)

  else

    # Return error if the command is not supported
    error = Wamp::Client::Response::CallError.new(
        Wamp::Client::Response::DEFAULT_ERROR,
        ["unsupported proxy command '#{descriptor.command}'"])
    callback.call(nil, error.to_hash, {})

  end

end

Private Instance Methods

check_queue(queue_name) click to toggle source

This methods blocks waiting for a value to appear in the queue

@param queue_name [String] - the name of the queue

# File lib/wamp/worker/proxy/dispatcher.rb, line 104
def check_queue(queue_name)

  # Wait for a value to appear in the queue.  We have a timeout so
  # the thread can check if the worker has been killed
  self.queue.pop(queue_name, wait: true, timeout: TIMEOUT)
end