class Startback::Bus::Bunny::Async

Asynchronous implementation of the bus abstraction, on top of RabbitMQ and using the 'bunny' gem (you need to include it in your Gemfile yourself: it is NOT a startback official dependency).

This bus implementation emits events by dumping them to RabbitMQ using the event type as exchange name. Listeners may use the `processor` parameter to specify the queue name ; otherwise a default “main” queue is used.

Examples:

# Connects to RabbitMQ using all default options
#
# Uses the STARTBACK_BUS_BUNNY_ASYNC_URL environment variable for
# connection URL if present.
Startback::Bus::Bunny::Async.new

# Connects to RabbitMQ using a specific URL
Startback::Bus::Bunny::Async.new("amqp://rabbituser:rabbitpass@192.168.17.17")
Startback::Bus::Bunny::Async.new(url: "amqp://rabbituser:rabbitpass@192.168.17.17")

# Connects to RabbitMQ using specific connection options. See Bunny's own
# documentation
Startback::Bus::Bunny::Async.new({
  connection_options: {
    host: "192.168.17.17"
  }
})

Constants

DEFAULT_OPTIONS

Attributes

channel[R]
options[R]

Public Class Methods

new(options = {}) click to toggle source

Creates a bus instance, using the various options provided to fine-tune behavior.

# File lib/startback/bus/bunny/async.rb, line 60
def initialize(options = {})
  options = { url: options } if options.is_a?(String)
  @options = DEFAULT_OPTIONS.merge(options)
  retried = 0
  conn = options[:connection_options] || options[:url]
  try_max_times(10) do
    @bunny = ::Bunny.new(conn)
    @bunny.start
    @channel = @bunny.create_channel
    log(:info, {op: "#{self.class.name}#connect", op_data: conn}, options[:context])
  end
end

Public Instance Methods

emit(event) click to toggle source
# File lib/startback/bus/bunny/async.rb, line 74
def emit(event)
  stop_errors(self, "emit", event.context) do
    fanout = channel.fanout(event.type.to_s, fanout_options)
    fanout.publish(event.to_json)
  end
end
listen(type, processor = nil, listener = nil, &bl) click to toggle source
# File lib/startback/bus/bunny/async.rb, line 81
def listen(type, processor = nil, listener = nil, &bl)
  raise ArgumentError, "A listener must be provided" unless listener || bl
  fanout = channel.fanout(type.to_s, fanout_options)
  queue = channel.queue((processor || "main").to_s, queue_options)
  queue.bind(fanout)
  queue.subscribe do |delivery_info, properties, body|
    event = stop_errors(self, "listen") do
      factor_event(body)
    end
    stop_errors(self, "listen", event.context) do
      (listener || bl).call(event)
    end
  end
end

Protected Instance Methods

factor_event(body) click to toggle source
# File lib/startback/bus/bunny/async.rb, line 106
def factor_event(body)
  if options[:event_factory]
    options[:event_factory].call(body)
  else
    Event.json(body, options)
  end
end
fanout_options() click to toggle source
# File lib/startback/bus/bunny/async.rb, line 98
def fanout_options
  options[:fanout_options]
end
queue_options() click to toggle source
# File lib/startback/bus/bunny/async.rb, line 102
def queue_options
  options[:queue_options]
end