module QueueingRabbit
Constants
- VERSION
Attributes
client[RW]
logger[RW]
Public Instance Methods
begin_worker_loop() { || ... }
click to toggle source
# File lib/queueing_rabbit.rb, line 86 def begin_worker_loop conn.begin_worker_loop do yield end end
connect()
click to toggle source
# File lib/queueing_rabbit.rb, line 34 def connect synchronize do @connection ||= client.connect end end
Also aliased as: conn, connection
connected?()
click to toggle source
# File lib/queueing_rabbit.rb, line 51 def connected? @connection && @connection.open? end
disconnect()
click to toggle source
# File lib/queueing_rabbit.rb, line 42 def disconnect synchronize do if connected? @connection.close end drop_connection end end
drop_connection()
click to toggle source
# File lib/queueing_rabbit.rb, line 55 def drop_connection @connection = nil end
enqueue(job, payload = nil, options = {})
click to toggle source
# File lib/queueing_rabbit.rb, line 59 def enqueue(job, payload = nil, options = {}) info "enqueueing job #{job}" follow_job_requirements(job) do |channel, exchange, _| publish_to_exchange(exchange, payload, options) channel.close end true end
follow_bus_requirements(bus) { |ch, ex| ... }
click to toggle source
# File lib/queueing_rabbit.rb, line 106 def follow_bus_requirements(bus) ret = nil conn.open_channel(bus.channel_options) do |ch, _| conn.define_exchange(ch, bus.exchange_name, bus.exchange_options) do |ex| ret = yield ch, ex end end ret end
follow_job_requirements(job) { |ch, ex, q| ... }
click to toggle source
# File lib/queueing_rabbit.rb, line 92 def follow_job_requirements(job) ret = nil follow_bus_requirements(job) do |ch, ex| conn.define_queue(ch, job.queue_name, job.queue_options) do |q| if job.bind_queue? job.binding_declarations.each { |o| conn.bind_queue(q, ex, o) } end ret = yield ch, ex, q end end ret end
publish(bus, payload = nil, options = {})
click to toggle source
# File lib/queueing_rabbit.rb, line 70 def publish(bus, payload = nil, options = {}) info "publishing to event bus #{bus}" follow_bus_requirements(bus) do |channel, exchange| publish_to_exchange(exchange, payload, options) channel.close end true end
publish_to_exchange(exchange, payload = nil, options = {})
click to toggle source
# File lib/queueing_rabbit.rb, line 81 def publish_to_exchange(exchange, payload = nil, options = {}) conn.publish(exchange, payload, options) true end
purge_queue(job)
click to toggle source
# File lib/queueing_rabbit.rb, line 126 def purge_queue(job) connection.open_channel(job.channel_options) do |c, _| connection.define_queue(c, job.queue_name, job.queue_options) do |q| connection.purge_queue(q) { c.close } end end true end
queue_size(job)
click to toggle source
# File lib/queueing_rabbit.rb, line 116 def queue_size(job) size = 0 connection.open_channel(job.channel_options) do |c, _| queue = connection.define_queue(c, job.queue_name, job.queue_options) size = connection.queue_size(queue) c.close end size end