class Stalking::Consumer

Public Class Methods

new(options = {}, &block) click to toggle source
# File lib/stalking/consumer.rb, line 4
def initialize(options = {}, &block)
  @servers = options[:servers] || ["localhost:11300"]
  @logger = options[:logger] || Stalking::NullLogger.new
  @handlers = Handlers.new

  if block_given?
    @handlers.instance_eval &block

    work
  end
end

Public Instance Methods

handle(handlers, name, args) click to toggle source
# File lib/stalking/consumer.rb, line 16
def handle(handlers, name, args)
  handlers.each do |handler|
    handler.call name, args
  end
end
handle_error(e, name, args) click to toggle source
# File lib/stalking/consumer.rb, line 22
def handle_error(e, name, args)
  @handlers.error_handlers.each do |handler|
    handler.call e, name, args
  end
rescue => e
  @logger.fatal e
end

Private Instance Methods

connect() click to toggle source
# File lib/stalking/consumer.rb, line 104
def connect
  @connection ||= Beanstalk::Pool.new(@servers)

  @handlers.job_handlers.keys.each { |name| @connection.watch name }

  @connection
rescue Beanstalk::NotConnected => e
  @logger.fatal e

  sleep 1

  retry
end
Also aliased as: connection
connection()
Alias for: connect
lock() { || ... } click to toggle source
# File lib/stalking/consumer.rb, line 96
def lock
  @locked = true

  yield
ensure
  @locked = false
end
looping() { || ... } click to toggle source
# File lib/stalking/consumer.rb, line 76
def looping
  continue = true

  trap "QUIT" do
    exit unless @locked

    continue = false
  end

  trap "USR2" do
    exit unless @locked

    continue = false
  end

  while continue
    yield
  end
end
perform(job) click to toggle source
# File lib/stalking/consumer.rb, line 56
def perform(job)
  name, args = JSON.parse(job.body)

  if handler = @handlers.job_handlers[name]
    begin
      Timeout::timeout(job.ttr - 1) do
        handle @handlers.before_handlers, name, args

        handler.call args

        handle @handlers.after_handlers, name, args
      end
    rescue Beanstalk::NotConnected => e
      raise e # Re-raise
    rescue Timeout::Error, StandardError => e
      handle_error e, name, args
    end
  end
end
reconnect() click to toggle source
# File lib/stalking/consumer.rb, line 120
def reconnect
  @connection = nil

  connect
end
work() click to toggle source
# File lib/stalking/consumer.rb, line 32
def work
  looping do
    job = connection.reserve

    lock do
      perform job

      job.delete
    end
  end
rescue EOFError, Beanstalk::NotConnected => e
  @logger.fatal e

  sleep 1

  reconnect

  retry
rescue => e
  @logger.fatal e

  raise e
end