class Stalking::Producer

Public Class Methods

new(options = {}, &block) click to toggle source
# File lib/stalking/producer.rb, line 4
def initialize(options = {}, &block)
  @timeout = options[:timeout] || 0.5
  @pri = options[:pri] || 65536
  @delay = options[:delay] || 0
  @ttr = options[:ttr] || 120
  @logger = options[:logger] || Stalking::NullLogger.new
  @servers = options[:servers] || ["localhost:11300"]
  @tries = options[:tries] || [@servers.size, 2].max

  @connections = {}
end

Public Instance Methods

enqueue(name, args = {}, options = {}) click to toggle source
# File lib/stalking/producer.rb, line 16
def enqueue(name, args = {}, options = {})
  @logger.info "Enqueue #{name.inspect} with #{args.inspect} and #{options.inspect}"

  # Send the job to a random server.

  (@servers.shuffle * @tries).first(@tries).each do |server|
    return true if enqueue_at(server, name, args, options)
  end

  @logger.error "Enqueue #{name.inspect} with #{args.inspect} and #{options.inspect} failed"

  # All servers are currently unavailable.
  # Enqueuing the job has failed.

  false
end

Private Instance Methods

enq(connection, name, args = {}, options = {}) click to toggle source
# File lib/stalking/producer.rb, line 51
def enq(connection, name, args = {}, options = {})
  pri = options[:pri] || @pri
  delay = options[:delay] || @delay
  ttr = options[:ttr] || @ttr

  connection.use name
  connection.put [name, args].to_json, pri, delay, ttr
end
enqueue_at(server, name, args = {}, options = {}) click to toggle source
# File lib/stalking/producer.rb, line 35
def enqueue_at(server, name, args = {}, options = {})
  Timeout::timeout @timeout do
    @connections[server] ||= Beanstalk::Pool.new([server])

    enq @connections[server], name, args, options
  end

  true
rescue Beanstalk::NotConnected, Timeout::Error, StandardError => e
  @connections[server] = nil # Reset the connection.

  @logger.fatal e

  false
end