module QueueKit::Worker

Public Class Methods

new(queue, options = {}) click to toggle source
# File lib/queue_kit/worker.rb, line 5
def initialize(queue, options = {})
  @queue = queue
  @processor = options.fetch(:processor) { method(:process) }
  @cooler = options.fetch(:cooler) { method(:cool) }
  @error_handler = options.fetch(:error_handler) { method(:handle_error) }
  @stopped = true

  instrumenter_from(options)
end

Public Instance Methods

cool() click to toggle source
# File lib/queue_kit/worker.rb, line 19
def cool
end
default_instrument_options() click to toggle source
# File lib/queue_kit/worker.rb, line 92
def default_instrument_options
  {:worker => self}
end
handle_error(err) click to toggle source
# File lib/queue_kit/worker.rb, line 22
def handle_error(err)
  raise err
end
name() click to toggle source
# File lib/queue_kit/worker.rb, line 66
def name
  @name ||= "#{self.class} #{Socket.gethostname}:#{Process.pid}"
end
process(item) click to toggle source
# File lib/queue_kit/worker.rb, line 15
def process(item)
  raise NotImplementedError, "This worker can't do anything with #{item.inspect}"
end
procline(string) click to toggle source
# File lib/queue_kit/worker.rb, line 41
def procline(string)
  $0 = "QueueKit-#{QueueKit::VERSION}: #{string}"
  debug { ["worker.procline", {:message => string}] }
end
run() click to toggle source
# File lib/queue_kit/worker.rb, line 30
def run
  start
  interval_debugger = lambda { "worker.interval" }

  loop do
    work
    break unless working?
    debug(&interval_debugger)
  end
end
set_popping_procline() click to toggle source
# File lib/queue_kit/worker.rb, line 87
def set_popping_procline
  @last_job_at = Time.now
  procline("Waiting since #{@last_job_at.to_i}")
end
set_working_procline() click to toggle source
# File lib/queue_kit/worker.rb, line 83
def set_working_procline
  procline("Processing since #{Time.now.to_i}")
end
start() click to toggle source
# File lib/queue_kit/worker.rb, line 70
def start
  set_popping_procline
  @stopped = false
end
stop() click to toggle source
# File lib/queue_kit/worker.rb, line 75
def stop
  @stopped = true
end
trap_signals(signal_handler) click to toggle source
# File lib/queue_kit/worker.rb, line 26
def trap_signals(signal_handler)
  SignalChecker.trap(self, signal_handler)
end
work() click to toggle source
# File lib/queue_kit/worker.rb, line 46
def work
  wrap_error { work! }
end
work!() click to toggle source
# File lib/queue_kit/worker.rb, line 50
def work!
  if item = @queue.pop
    set_working_procline
    @processor.call(item)
    set_popping_procline
  else
    @cooler.call if working?
  end
end
working?() click to toggle source
# File lib/queue_kit/worker.rb, line 79
def working?
  !@stopped
end
wrap_error() { || ... } click to toggle source
# File lib/queue_kit/worker.rb, line 60
def wrap_error
  yield
rescue Exception => exception
  @error_handler.call(exception)
end