module QueueingRabbit

Constants

VERSION

Attributes

client[RW]
logger[RW]

Public Instance Methods

begin_worker_loop() { || ... } click to toggle source
# File lib/queueing_rabbit.rb, line 86
def begin_worker_loop
  conn.begin_worker_loop do
    yield
  end
end
conn()
Alias for: connect
connect() click to toggle source
# File lib/queueing_rabbit.rb, line 34
def connect
  synchronize do
    @connection ||= client.connect
  end
end
Also aliased as: conn, connection
connected?() click to toggle source
# File lib/queueing_rabbit.rb, line 51
def connected?
  @connection && @connection.open?
end
connection()
Alias for: connect
disconnect() click to toggle source
# File lib/queueing_rabbit.rb, line 42
def disconnect
  synchronize do
    if connected?
      @connection.close
    end
    drop_connection
  end
end
drop_connection() click to toggle source
# File lib/queueing_rabbit.rb, line 55
def drop_connection
  @connection = nil
end
enqueue(job, payload = nil, options = {}) click to toggle source
# File lib/queueing_rabbit.rb, line 59
def enqueue(job, payload = nil, options = {})
  info "enqueueing job #{job}"

  follow_job_requirements(job) do |channel, exchange, _|
    publish_to_exchange(exchange, payload, options)
    channel.close
  end

  true
end
follow_bus_requirements(bus) { |ch, ex| ... } click to toggle source
# File lib/queueing_rabbit.rb, line 106
def follow_bus_requirements(bus)
  ret = nil
  conn.open_channel(bus.channel_options) do |ch, _|
    conn.define_exchange(ch, bus.exchange_name, bus.exchange_options) do |ex|
      ret = yield ch, ex
    end
  end
  ret
end
follow_job_requirements(job) { |ch, ex, q| ... } click to toggle source
# File lib/queueing_rabbit.rb, line 92
def follow_job_requirements(job)
  ret = nil
  follow_bus_requirements(job) do |ch, ex|
    conn.define_queue(ch, job.queue_name, job.queue_options) do |q|
      if job.bind_queue?
        job.binding_declarations.each { |o| conn.bind_queue(q, ex, o) }
      end

      ret = yield ch, ex, q
    end
  end
  ret
end
publish(bus, payload = nil, options = {}) click to toggle source
# File lib/queueing_rabbit.rb, line 70
def publish(bus, payload = nil, options = {})
  info "publishing to event bus #{bus}"

  follow_bus_requirements(bus) do |channel, exchange|
    publish_to_exchange(exchange, payload, options)
    channel.close
  end

  true
end
publish_to_exchange(exchange, payload = nil, options = {}) click to toggle source
# File lib/queueing_rabbit.rb, line 81
def publish_to_exchange(exchange, payload = nil, options = {})
  conn.publish(exchange, payload, options)
  true
end
purge_queue(job) click to toggle source
# File lib/queueing_rabbit.rb, line 126
def purge_queue(job)
  connection.open_channel(job.channel_options) do |c, _|
    connection.define_queue(c, job.queue_name, job.queue_options) do |q|
      connection.purge_queue(q) { c.close }
    end
  end
  true
end
queue_size(job) click to toggle source
# File lib/queueing_rabbit.rb, line 116
def queue_size(job)
  size = 0
  connection.open_channel(job.channel_options) do |c, _|
    queue = connection.define_queue(c, job.queue_name, job.queue_options)
    size = connection.queue_size(queue)
    c.close
  end
  size
end