class CottonTail::Queue::Bunny
A wrapper around a ::Bunny::Queue that makes it interchangeable with a standard Ruby Queue
Attributes
connection[R]
source[R]
Public Class Methods
call(**opts)
click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 11 def self.call(**opts) new(**opts) end
new(name:, connection:, manual_ack: false, **opts)
click to toggle source
Calls superclass method
# File lib/cotton_tail/queue/bunny.rb, line 15 def initialize(name:, connection:, manual_ack: false, **opts) super ::Queue.new @connection = connection @source = build_source(name, opts) watch_source manual_ack end
Public Instance Methods
bind(routing_key)
click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 34 def bind(routing_key) source.bind('amq.topic', routing_key: Route.new(routing_key).binding) end
pop()
click to toggle source
Calls superclass method
# File lib/cotton_tail/queue/bunny.rb, line 29 def pop delivery_info, properties, payload = super Request.new(delivery_info, MessageProperties.new(properties.to_h), payload) end
push(request)
click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 24 def push(request) bind request.routing_key exchange.publish request.payload, routing_key: request.routing_key end
Private Instance Methods
build_source(name, opts)
click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 50 def build_source(name, opts) return channel.queue('', **nil_opts(opts)) if name.nil? channel.queue(name, **opts) end
channel()
click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 56 def channel @channel ||= connection.create_channel end
exchange()
click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 60 def exchange @exchange ||= channel.exchange('amq.topic') end
nil_opts(opts)
click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 42 def nil_opts(opts) { exclusive: true }.merge(opts) end
watch_source(manual_ack)
click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 46 def watch_source(manual_ack) source.subscribe(manual_ack: manual_ack) { |*args| self << args } end