class Msgr::Connection
Attributes
config[R]
uri[R]
Public Class Methods
new(uri, config, dispatcher)
click to toggle source
# File lib/msgr/connection.rb, line 11 def initialize(uri, config, dispatcher) @uri = uri @config = config @dispatcher = dispatcher @channels = [] end
Public Instance Methods
bind(routes)
click to toggle source
# File lib/msgr/connection.rb, line 87 def bind(routes) if routes.empty? log(:warn) do "No routes to bound to. Bind will have no effect:\n" \ " #{routes.inspect}" end else bind_all(routes) end end
bindings()
click to toggle source
# File lib/msgr/connection.rb, line 83 def bindings @bindings ||= [] end
channel(prefetch: 1)
click to toggle source
# File lib/msgr/connection.rb, line 38 def channel(prefetch: 1) channel = Msgr::Channel.new(config, connection) channel.prefetch(prefetch) @channels << channel channel end
close()
click to toggle source
# File lib/msgr/connection.rb, line 98 def close @channels.each(&:close) connection.close if @connection log(:debug) { 'Closed.' } end
connect()
click to toggle source
# File lib/msgr/connection.rb, line 34 def connect connection end
connection()
click to toggle source
# File lib/msgr/connection.rb, line 30 def connection @connection ||= ::Bunny.new(config).tap(&:start) end
delete()
click to toggle source
# File lib/msgr/connection.rb, line 57 def delete return if bindings.empty? log(:debug) { "Delete bindings (#{bindings.size})..." } bindings.each(&:delete) end
exchange()
click to toggle source
# File lib/msgr/connection.rb, line 45 def exchange @exchange ||= channel.exchange end
publish(message, opts = {})
click to toggle source
# File lib/msgr/connection.rb, line 22 def publish(message, opts = {}) opts[:routing_key] = opts.delete(:to) if opts[:to] exchange.publish message.to_s, opts.merge(persistent: true) log(:debug) { "Published message to #{opts[:routing_key]}" } end
purge(**kwargs)
click to toggle source
# File lib/msgr/connection.rb, line 65 def purge(**kwargs) return if bindings.empty? log(:debug) { "Purge bindings (#{bindings.size})..." } bindings.each {|b| b.purge(**kwargs) } end
purge_queue(name)
click to toggle source
# File lib/msgr/connection.rb, line 73 def purge_queue(name) # Creating the queue in passive mode ensures that queues that do not exist # won't be created just to purge them. # That requires creating a new channel every time, as exceptions (on # missing queues) invalidate the channel. channel.queue(name, passive: true).purge rescue Bunny::NotFound nil end
release()
click to toggle source
# File lib/msgr/connection.rb, line 49 def release return if bindings.empty? log(:debug) { "Release bindings (#{bindings.size})..." } bindings.each(&:release) end
running?()
click to toggle source
# File lib/msgr/connection.rb, line 18 def running? bindings.any? end
Private Instance Methods
bind_all(routes)
click to toggle source
# File lib/msgr/connection.rb, line 106 def bind_all(routes) routes.each {|route| bindings << Binding.new(self, route, @dispatcher) } end