class Quebert::Backend::Beanstalk

Manage jobs on a Beanstalk queue out of process

Constants

TTR_BUFFER

A buffer time in seconds added to the Beanstalk TTR for Quebert to do its own job cleanup The job will perform based on the Beanstalk TTR, but Beanstalk hangs on to the job just a little longer so that Quebert can bury the job or schedule a retry with the appropriate delay

Attributes

host[R]
queue[R]
queues[W]

Public Class Methods

configure(opts = {}) click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 26
def self.configure(opts = {})
  new(opts.fetch(:host, "127.0.0.1:11300"), opts.fetch(:queue))
end
new(host, queue) click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 20
def initialize(host, queue)
  @host = host
  @queue = queue
  @queues = []
end

Public Instance Methods

drain!() click to toggle source

For testing purposes… I think there's a better way to do this though.

# File lib/quebert/backend/beanstalk.rb, line 40
def drain!
  while peek(:ready) do
    reserve_without_controller.delete
  end
  while peek(:delayed) do
    reserve_without_controller.delete
  end
  while peek(:buried) do
    kick
    reserve_without_controller.delete
  end
end
put(job) click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 56
def put(job)
  tube = beanstalkd_tubes[job.queue || queue]
  tube.put(job.to_json,
    :pri => job.priority,
    :delay => job.delay,
    :ttr => job.ttr + TTR_BUFFER)
end
reserve(timeout=nil) click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 35
def reserve(timeout=nil)
  Controller::Beanstalk.new(reserve_without_controller(timeout))
end
reserve_without_controller(timeout=nil) click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 30
def reserve_without_controller(timeout=nil)
  watch_tubes
  beanstalkd_tubes.reserve(timeout)
end

Private Instance Methods

beanstalkd_connection() click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 70
def beanstalkd_connection
  @beanstalkd_connection ||= Beaneater.new(host)
end
beanstalkd_tubes() click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 74
def beanstalkd_tubes
  beanstalkd_connection.tubes
end
default_tube() click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 66
def default_tube
  @default_tube ||= beanstalkd_tubes[queue]
end
queues() click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 86
def queues
  @queues.empty? ? [queue] : @queues
end
watch_tubes() click to toggle source
# File lib/quebert/backend/beanstalk.rb, line 78
def watch_tubes
  if queues != @watched_tube_names
    @watched_tube_names = queues
    logger.info "Watching beanstalkd queues #{@watched_tube_names.inspect}"
    beanstalkd_tubes.watch!(*@watched_tube_names)
  end
end