class Corleone::Worker

Constants

MAX_RETRIES

Public Class Methods

new(runner_class, server_uri) click to toggle source
# File lib/corleone/worker.rb, line 3
def initialize(runner_class, server_uri)
  @name = `hostname`.strip + '-' + SecureRandom.hex
  @runner_class = runner_class
  @input_queue = Queue.new
  @output_queue = Queue.new
  @pool = Corleone::Pool.new do
    DRbObject.new_with_uri(server_uri)
  end
end

Public Instance Methods

block_until_server_ready() click to toggle source
# File lib/corleone/worker.rb, line 15
def block_until_server_ready
  loop do
    begin
      conn = @pool.get
      conn.ping
      break
    rescue DRb::DRbConnError
      Kernel.sleep(5)
    end
  end
end
handle_example(message) click to toggle source
# File lib/corleone/worker.rb, line 48
def handle_example(message)
  @input_queue << message.payload

  loop do
    result = @output_queue.pop
    break if result.instance_of?(Corleone::Message::Finished)
    publish_result(result)
  end
end
handle_message(message) click to toggle source
# File lib/corleone/worker.rb, line 35
def handle_message(message)
  case message
  when Corleone::Message::Item
    handle_example(message)
  when Corleone::Message::ZeroItems
    handle_zero_items
  when Corleone::Message::RunnerArgs
    handle_runner_args(message.payload)
  else
    logger.warn("invalid received message: #{message}")
  end
end
handle_runner_args(payload) click to toggle source
# File lib/corleone/worker.rb, line 58
def handle_runner_args(payload)
  logger.debug("runner_args arguments: #{payload}")
  @runner = @runner_class.new(payload, logger)
  start_runner
end
handle_zero_items() click to toggle source
# File lib/corleone/worker.rb, line 64
def handle_zero_items
  @quit = true
  @input_queue << Corleone::Message::Stop.new
  @runner_thread.join
end
logger() click to toggle source
# File lib/corleone/worker.rb, line 27
def logger
  @logger ||= RemoteServerLogger.new("WORKER #{@name}", @pool.get)
end
publish_result(result) click to toggle source
# File lib/corleone/worker.rb, line 70
def publish_result(result)
  conn = @pool.get
  conn.return_result(result)
ensure
  @pool.return(conn)
end
start() click to toggle source
# File lib/corleone/worker.rb, line 77
def start
  logger.info("starting worker")
  conn = @pool.get

  conn.check_in(@name)
  runner_args = conn.get_runner_args
  handle_message(runner_args)

  loop do
    message = conn.get_item
    handle_message(message)
    break if @quit
  end

  conn.check_out(@name)
rescue StandardError => e
  logger.warn("exception raised: #{e}")
  e.backtrace.each do |line|
    logger.warn("    #{line}")
  end
ensure
  @pool.return(conn)
end
start_runner() click to toggle source
# File lib/corleone/worker.rb, line 31
def start_runner
  @runner_thread = Thread.new { @runner.run_each(@input_queue, @output_queue) }
end