class Wamp::Worker::Runner::Main

This class is the main runner

Attributes

challenge[R]
client[R]
descriptor_queue[R]
queue_monitor[R]

Public Class Methods

new(name=nil, **options) click to toggle source

Constructor

Calls superclass method Wamp::Worker::Runner::Base::new
# File lib/wamp/worker/runner.rb, line 102
def initialize(name=nil, **options)
  super name

  # Combine the options
  options = Wamp::Worker.config.connection(self.name).merge options

  # Setup different options
  @challenge = options[:challenge]
  @client = options[:client] || Wamp::Client::Connection.new(options)
  @active = false

  # Create a queue for passing messages to the main runner
  @descriptor_queue = ::Queue.new

  # Note: since the queue monitor is attached to the same worker,
  # we need to lock the UUIDs together.  This will make sure they
  # delegate background tasks correctly
  uuid = self.dispatcher.uuid

  # Create a command queue monitor
  @queue_monitor = Background.new(self.name, uuid: uuid) do |runner|
    descriptor = runner.dispatcher.check_queues
    self.descriptor_queue.push(descriptor) if descriptor
  end

  # Add the tick loop handler
  self.client.transport_class.add_tick_loop { self.tick_handler }

  # Initialize the last tick
  @last_tick = Time.now.to_i

  # Catch SIGINT
  Signal.trap('INT') { self.stop }
  Signal.trap('TERM') { self.stop }
end

Public Instance Methods

_start() click to toggle source

Starts the run loop

# File lib/wamp/worker/runner.rb, line 140
def _start

  # On join, we need to subscribe and register the different handlers
  self.client.on :join do |session, details|
    self.join_handler session, details
  end

  # On leave, we will print a message
  self.client.on :leave do |reason, details|
    self.leave_handler(reason, details)
  end

  # On challenge, we will run the users challenge code
  self.client.on :challenge do |authmethod, details|
    self.challenge_handler(authmethod, details)
  end

  # Start the monitors
  self.queue_monitor.start

  # Log info
  logger.info("#{self.class.name} '#{self.name}' started")

  # Start the connection
  self.client.open
end
_stop() click to toggle source

Stops the run loop

# File lib/wamp/worker/runner.rb, line 169
def _stop

  # Stop the other threads
  self.queue_monitor.stop

  # Stop the event machine
  self.client.close
end
challenge_handler(authmethod, extra) click to toggle source
# File lib/wamp/worker/runner.rb, line 198
def challenge_handler(authmethod, extra)
  logger.info("#{self.class.name} runner '#{self.name}' challenge")

  if self.challenge
    self.challenge.call(authmethod, extra)
  else
    self.stop
    raise(ArgumentError, "client asked for '#{authmethod}' challenge, but no ':challenge' option was provided")
  end
end
join_handler(session, details) click to toggle source
# File lib/wamp/worker/runner.rb, line 178
def join_handler(session, details)
  logger.info("#{self.class.name} runner '#{self.name}' joined session with realm '#{details[:realm]}'")

  # Set the session
  self.dispatcher.session = session

  # Register for the procedures
  Wamp::Worker.register_procedures(self.name, self.dispatcher, session)

  # Subscribe to the topics
  Wamp::Worker.subscribe_topics(self.name, self.dispatcher, session)
end
leave_handler(reason, details) click to toggle source
# File lib/wamp/worker/runner.rb, line 191
def leave_handler(reason, details)
  logger.info("#{self.class.name} runner '#{self.name}' left session: #{reason}")

  # Clear the session
  self.dispatcher.session = nil
end
tick_handler() click to toggle source

This method periodically checks if any work has come in from the queues

# File lib/wamp/worker/runner.rb, line 211
def tick_handler

  # This code will implement the ticker every second.  This tells the
  # requestors that the worker is alive
  current_time = Time.now.to_i
  if current_time > @last_tick
    self.dispatcher.increment_ticker
    @last_tick = current_time
  end

  # Loop until the queue is empty
  until self.descriptor_queue.empty? do

    # Pop the value and process it
    descriptor = self.descriptor_queue.pop
    self.dispatcher.process(descriptor)

  end
end