class CottonTail::Queue::Bunny

A wrapper around a ::Bunny::Queue that makes it interchangeable with a standard Ruby Queue

Attributes

connection[R]
source[R]

Public Class Methods

call(**opts) click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 11
def self.call(**opts)
  new(**opts)
end
new(name:, connection:, manual_ack: false, **opts) click to toggle source
Calls superclass method
# File lib/cotton_tail/queue/bunny.rb, line 15
def initialize(name:, connection:, manual_ack: false, **opts)
  super ::Queue.new

  @connection = connection
  @source = build_source(name, opts)

  watch_source manual_ack
end

Public Instance Methods

bind(routing_key) click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 34
def bind(routing_key)
  source.bind('amq.topic', routing_key: Route.new(routing_key).binding)
end
pop() click to toggle source
Calls superclass method
# File lib/cotton_tail/queue/bunny.rb, line 29
def pop
  delivery_info, properties, payload = super
  Request.new(delivery_info, MessageProperties.new(properties.to_h), payload)
end
push(request) click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 24
def push(request)
  bind request.routing_key
  exchange.publish request.payload, routing_key: request.routing_key
end

Private Instance Methods

build_source(name, opts) click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 50
def build_source(name, opts)
  return channel.queue('', **nil_opts(opts)) if name.nil?

  channel.queue(name, **opts)
end
channel() click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 56
def channel
  @channel ||= connection.create_channel
end
exchange() click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 60
def exchange
  @exchange ||= channel.exchange('amq.topic')
end
nil_opts(opts) click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 42
def nil_opts(opts)
  { exclusive: true }.merge(opts)
end
watch_source(manual_ack) click to toggle source
# File lib/cotton_tail/queue/bunny.rb, line 46
def watch_source(manual_ack)
  source.subscribe(manual_ack: manual_ack) { |*args| self << args }
end