class Startback::Bus::Bunny::Async
Asynchronous implementation of the bus abstraction, on top of RabbitMQ and using the 'bunny' gem (you need to include it in your Gemfile yourself: it is NOT a startback official dependency).
This bus implementation emits events by dumping them to RabbitMQ using the event type as exchange name. Listeners may use the `processor` parameter to specify the queue name ; otherwise a default “main” queue is used.
Examples:
# Connects to RabbitMQ using all default options # # Uses the STARTBACK_BUS_BUNNY_ASYNC_URL environment variable for # connection URL if present. Startback::Bus::Bunny::Async.new # Connects to RabbitMQ using a specific URL Startback::Bus::Bunny::Async.new("amqp://rabbituser:rabbitpass@192.168.17.17") Startback::Bus::Bunny::Async.new(url: "amqp://rabbituser:rabbitpass@192.168.17.17") # Connects to RabbitMQ using specific connection options. See Bunny's own # documentation Startback::Bus::Bunny::Async.new({ connection_options: { host: "192.168.17.17" } })
Constants
- DEFAULT_OPTIONS
Attributes
channel[R]
options[R]
Public Class Methods
new(options = {})
click to toggle source
Creates a bus instance, using the various options provided to fine-tune behavior.
# File lib/startback/bus/bunny/async.rb, line 60 def initialize(options = {}) options = { url: options } if options.is_a?(String) @options = DEFAULT_OPTIONS.merge(options) retried = 0 conn = options[:connection_options] || options[:url] try_max_times(10) do @bunny = ::Bunny.new(conn) @bunny.start @channel = @bunny.create_channel log(:info, {op: "#{self.class.name}#connect", op_data: conn}, options[:context]) end end
Public Instance Methods
emit(event)
click to toggle source
# File lib/startback/bus/bunny/async.rb, line 74 def emit(event) stop_errors(self, "emit", event.context) do fanout = channel.fanout(event.type.to_s, fanout_options) fanout.publish(event.to_json) end end
listen(type, processor = nil, listener = nil, &bl)
click to toggle source
# File lib/startback/bus/bunny/async.rb, line 81 def listen(type, processor = nil, listener = nil, &bl) raise ArgumentError, "A listener must be provided" unless listener || bl fanout = channel.fanout(type.to_s, fanout_options) queue = channel.queue((processor || "main").to_s, queue_options) queue.bind(fanout) queue.subscribe do |delivery_info, properties, body| event = stop_errors(self, "listen") do factor_event(body) end stop_errors(self, "listen", event.context) do (listener || bl).call(event) end end end
Protected Instance Methods
factor_event(body)
click to toggle source
# File lib/startback/bus/bunny/async.rb, line 106 def factor_event(body) if options[:event_factory] options[:event_factory].call(body) else Event.json(body, options) end end
fanout_options()
click to toggle source
# File lib/startback/bus/bunny/async.rb, line 98 def fanout_options options[:fanout_options] end
queue_options()
click to toggle source
# File lib/startback/bus/bunny/async.rb, line 102 def queue_options options[:queue_options] end