class Stalking::Consumer
Public Class Methods
new(options = {}, &block)
click to toggle source
# File lib/stalking/consumer.rb, line 4 def initialize(options = {}, &block) @servers = options[:servers] || ["localhost:11300"] @logger = options[:logger] || Stalking::NullLogger.new @handlers = Handlers.new if block_given? @handlers.instance_eval &block work end end
Public Instance Methods
handle(handlers, name, args)
click to toggle source
# File lib/stalking/consumer.rb, line 16 def handle(handlers, name, args) handlers.each do |handler| handler.call name, args end end
handle_error(e, name, args)
click to toggle source
# File lib/stalking/consumer.rb, line 22 def handle_error(e, name, args) @handlers.error_handlers.each do |handler| handler.call e, name, args end rescue => e @logger.fatal e end
Private Instance Methods
connect()
click to toggle source
# File lib/stalking/consumer.rb, line 104 def connect @connection ||= Beanstalk::Pool.new(@servers) @handlers.job_handlers.keys.each { |name| @connection.watch name } @connection rescue Beanstalk::NotConnected => e @logger.fatal e sleep 1 retry end
Also aliased as: connection
lock() { || ... }
click to toggle source
# File lib/stalking/consumer.rb, line 96 def lock @locked = true yield ensure @locked = false end
looping() { || ... }
click to toggle source
# File lib/stalking/consumer.rb, line 76 def looping continue = true trap "QUIT" do exit unless @locked continue = false end trap "USR2" do exit unless @locked continue = false end while continue yield end end
perform(job)
click to toggle source
# File lib/stalking/consumer.rb, line 56 def perform(job) name, args = JSON.parse(job.body) if handler = @handlers.job_handlers[name] begin Timeout::timeout(job.ttr - 1) do handle @handlers.before_handlers, name, args handler.call args handle @handlers.after_handlers, name, args end rescue Beanstalk::NotConnected => e raise e # Re-raise rescue Timeout::Error, StandardError => e handle_error e, name, args end end end
reconnect()
click to toggle source
# File lib/stalking/consumer.rb, line 120 def reconnect @connection = nil connect end
work()
click to toggle source
# File lib/stalking/consumer.rb, line 32 def work looping do job = connection.reserve lock do perform job job.delete end end rescue EOFError, Beanstalk::NotConnected => e @logger.fatal e sleep 1 reconnect retry rescue => e @logger.fatal e raise e end