class Corleone::Worker
Constants
- MAX_RETRIES
Public Class Methods
new(runner_class, server_uri)
click to toggle source
# File lib/corleone/worker.rb, line 3 def initialize(runner_class, server_uri) @name = `hostname`.strip + '-' + SecureRandom.hex @runner_class = runner_class @input_queue = Queue.new @output_queue = Queue.new @pool = Corleone::Pool.new do DRbObject.new_with_uri(server_uri) end end
Public Instance Methods
block_until_server_ready()
click to toggle source
# File lib/corleone/worker.rb, line 15 def block_until_server_ready loop do begin conn = @pool.get conn.ping break rescue DRb::DRbConnError Kernel.sleep(5) end end end
handle_example(message)
click to toggle source
# File lib/corleone/worker.rb, line 48 def handle_example(message) @input_queue << message.payload loop do result = @output_queue.pop break if result.instance_of?(Corleone::Message::Finished) publish_result(result) end end
handle_message(message)
click to toggle source
# File lib/corleone/worker.rb, line 35 def handle_message(message) case message when Corleone::Message::Item handle_example(message) when Corleone::Message::ZeroItems handle_zero_items when Corleone::Message::RunnerArgs handle_runner_args(message.payload) else logger.warn("invalid received message: #{message}") end end
handle_runner_args(payload)
click to toggle source
# File lib/corleone/worker.rb, line 58 def handle_runner_args(payload) logger.debug("runner_args arguments: #{payload}") @runner = @runner_class.new(payload, logger) start_runner end
handle_zero_items()
click to toggle source
# File lib/corleone/worker.rb, line 64 def handle_zero_items @quit = true @input_queue << Corleone::Message::Stop.new @runner_thread.join end
logger()
click to toggle source
# File lib/corleone/worker.rb, line 27 def logger @logger ||= RemoteServerLogger.new("WORKER #{@name}", @pool.get) end
publish_result(result)
click to toggle source
# File lib/corleone/worker.rb, line 70 def publish_result(result) conn = @pool.get conn.return_result(result) ensure @pool.return(conn) end
start()
click to toggle source
# File lib/corleone/worker.rb, line 77 def start logger.info("starting worker") conn = @pool.get conn.check_in(@name) runner_args = conn.get_runner_args handle_message(runner_args) loop do message = conn.get_item handle_message(message) break if @quit end conn.check_out(@name) rescue StandardError => e logger.warn("exception raised: #{e}") e.backtrace.each do |line| logger.warn(" #{line}") end ensure @pool.return(conn) end
start_runner()
click to toggle source
# File lib/corleone/worker.rb, line 31 def start_runner @runner_thread = Thread.new { @runner.run_each(@input_queue, @output_queue) } end