class DisqueJockey::Broker

Public Class Methods

new(nodes = ["127.0.0.1:7711"], *args) click to toggle source
# File lib/disque_jockey/broker.rb, line 5
def initialize(nodes = ["127.0.0.1:7711"], *args)
  @client = Disque.new(nodes, *args)
end

Public Instance Methods

acknowledge(job_id) click to toggle source
# File lib/disque_jockey/broker.rb, line 14
def acknowledge(job_id)
  response = @client.call('ACKJOB', job_id)
  raise_error_or_return_true(response)
end
fast_acknowledge(job_id) click to toggle source
# File lib/disque_jockey/broker.rb, line 19
def fast_acknowledge(job_id)
  response = @client.call('FASTACK', job_id)
  raise_error_or_return_true(response)
end
fetch_message_from(queue) click to toggle source
# File lib/disque_jockey/broker.rb, line 9
def fetch_message_from(queue)
  # fetch returns an array of jobs, but we just want the first one
  @client.fetch(from: [queue]).first
end
publish(*args) click to toggle source
# File lib/disque_jockey/broker.rb, line 24
def publish(*args)
  @client.push(*args)
end

Private Instance Methods

raise_error_or_return_true(response) click to toggle source

If there is an error acking the job the Disque client returns an error object but doesn’t raise it, so we raise it here ourselves.

# File lib/disque_jockey/broker.rb, line 33
def raise_error_or_return_true(response)
  response.is_a?(RuntimeError) ? raise(response) : true
end