class Clamour::Bus
Attributes
configuration[R]
@return [Clamour::Configuration]
connection_settings[R]
@return [Hash]
exchange_name[R]
@return [String]
logger[R]
@return [Logger]
Public Class Methods
new(configuration = Clamour.configuration)
click to toggle source
@param [Clamour::Configuration] configuration
# File lib/clamour/bus.rb, line 25 def initialize(configuration = Clamour.configuration) @configuration = configuration @connection_settings = configuration.rabbit_mq.to_hash @exchange_name = configuration.exchange @logger = configuration.logger end
Public Instance Methods
before_shutdown(&block)
click to toggle source
Do something gentle on SIGINT
# File lib/clamour/bus.rb, line 116 def before_shutdown(&block) Signal.trap('INT') do logger.info 'Shutting down on SIGINT...' block.call if block_given? end end
dump_json(message)
click to toggle source
@param [Clamour::Message] message @return [String]
# File lib/clamour/bus.rb, line 125 def dump_json(message) Oj.dump(message, mode: :compat) end
em_publish(message, &block)
click to toggle source
@param [Clamour::Message] message
# File lib/clamour/bus.rb, line 56 def em_publish(message, &block) logger.debug "Message #{message.inspect} is going to be published" if configuration.enable_connection? AMQP.connect(connection_settings) do |connection| AMQP::Channel.new(connection) do |channel| channel.fanout(exchange_name, durable: true) do |exchange| options = { content_type: 'application/json' } exchange.publish(dump_json(message), options) do logger.debug "Message #{message.inspect} is published to #{exchange_name}" connection.disconnect do block.call if block_given? end end end end end else logger.debug "Connection is disabled. Message #{message.inspect} is not really published" block.call if block_given? end end
em_subscribe(&block)
click to toggle source
# File lib/clamour/bus.rb, line 78 def em_subscribe(&block) raise ArgumentError.new('You have to provide a block') unless block_given? if configuration.enable_connection? AMQP.connect(connection_settings) do |connection| before_shutdown do connection.close do EM.stop end end AMQP::Channel.new(connection) do |channel| channel.fanout(exchange_name, durable: true) do |exchange| EM.schedule do channel.queue('', exclusive: true) do |queue| queue.bind(exchange).subscribe do |header, delivery| message_hash = case header.content_type when 'application/json' ActiveSupport::HashWithIndifferentAccess.new(load_json(delivery)) else raise WrongContentTypeError.new("Got #{delivery.inspect} for content type #{header.content_type}") end logger.debug "Got hash #{message_hash}" block.call(message_hash) end end end end end end else logger.info 'Connection is disabled. Doing nothing...' before_shutdown { EM.stop } end end
load_json(json)
click to toggle source
@param [String] json @return [Hash]
# File lib/clamour/bus.rb, line 131 def load_json(json) Oj.load(json) end
publish(message)
click to toggle source
@param [Clamour::Message] message
# File lib/clamour/bus.rb, line 33 def publish(message) if EM.reactor_running? em_publish(message) else EM.run do em_publish(message) do EM.stop end end end end
subscribe(&block)
click to toggle source
# File lib/clamour/bus.rb, line 45 def subscribe(&block) if EM.reactor_running? em_subscribe(&block) else EM.run do em_subscribe(&block) end end end