class Stalking::Producer
Public Class Methods
new(options = {}, &block)
click to toggle source
# File lib/stalking/producer.rb, line 4 def initialize(options = {}, &block) @timeout = options[:timeout] || 0.5 @pri = options[:pri] || 65536 @delay = options[:delay] || 0 @ttr = options[:ttr] || 120 @logger = options[:logger] || Stalking::NullLogger.new @servers = options[:servers] || ["localhost:11300"] @tries = options[:tries] || [@servers.size, 2].max @connections = {} end
Public Instance Methods
enqueue(name, args = {}, options = {})
click to toggle source
# File lib/stalking/producer.rb, line 16 def enqueue(name, args = {}, options = {}) @logger.info "Enqueue #{name.inspect} with #{args.inspect} and #{options.inspect}" # Send the job to a random server. (@servers.shuffle * @tries).first(@tries).each do |server| return true if enqueue_at(server, name, args, options) end @logger.error "Enqueue #{name.inspect} with #{args.inspect} and #{options.inspect} failed" # All servers are currently unavailable. # Enqueuing the job has failed. false end
Private Instance Methods
enq(connection, name, args = {}, options = {})
click to toggle source
# File lib/stalking/producer.rb, line 51 def enq(connection, name, args = {}, options = {}) pri = options[:pri] || @pri delay = options[:delay] || @delay ttr = options[:ttr] || @ttr connection.use name connection.put [name, args].to_json, pri, delay, ttr end
enqueue_at(server, name, args = {}, options = {})
click to toggle source
# File lib/stalking/producer.rb, line 35 def enqueue_at(server, name, args = {}, options = {}) Timeout::timeout @timeout do @connections[server] ||= Beanstalk::Pool.new([server]) enq @connections[server], name, args, options end true rescue Beanstalk::NotConnected, Timeout::Error, StandardError => e @connections[server] = nil # Reset the connection. @logger.fatal e false end