class Adrian::Dispatcher

Attributes

running[R]

Public Class Methods

new(options = {}) click to toggle source
# File lib/adrian/dispatcher.rb, line 7
def initialize(options = {})
  @failure_handler     = FailureHandler.new
  @stop_when_done      = !!options[:stop_when_done]
  @stop_when_signalled = options.fetch(:stop_when_signalled, true)
  @sleep               = options[:sleep] || 0.5
  @options             = options
end

Public Instance Methods

delegate_work(item, worker_class) click to toggle source
# File lib/adrian/dispatcher.rb, line 59
def delegate_work(item, worker_class)
  worker = worker_class.new(item)
  worker.report_to(self)
  worker.perform
end
on_done() click to toggle source
# File lib/adrian/dispatcher.rb, line 19
def on_done
  @failure_handler.add_rule(nil, &Proc.new)
end
on_failure(*exceptions) click to toggle source
# File lib/adrian/dispatcher.rb, line 15
def on_failure(*exceptions)
  @failure_handler.add_rule(*exceptions, &Proc.new)
end
start(queue, worker_class) click to toggle source
# File lib/adrian/dispatcher.rb, line 23
def start(queue, worker_class)
  trap_stop_signals if @stop_when_signalled
  @running = true

  while @running do
    begin
      item = queue.pop
    rescue Adrian::Queue::ItemTooOldError => e
      if handler = @failure_handler.handle(e)
        handler.call(e.item, nil, e)
      end
      item = nil
      next
    end

    if item
      delegate_work(item, worker_class)
    else
      if @stop_when_done
        stop
      else
        sleep(@sleep) if @sleep
      end
    end
  end
end
stop() click to toggle source
# File lib/adrian/dispatcher.rb, line 50
def stop
  @running = false
end
trap_stop_signals() click to toggle source
# File lib/adrian/dispatcher.rb, line 54
def trap_stop_signals
  Signal.trap('TERM') { stop }
  Signal.trap('INT')  { stop }
end
work_done(item, worker, exception = nil) click to toggle source
# File lib/adrian/dispatcher.rb, line 65
def work_done(item, worker, exception = nil)
  if handler = @failure_handler.handle(exception)
    handler.call(item, worker, exception)
  else
    raise exception if exception
  end
end