class Clamour::Bus

Attributes

configuration[R]

@return [Clamour::Configuration]

connection_settings[R]

@return [Hash]

exchange_name[R]

@return [String]

logger[R]

@return [Logger]

Public Class Methods

new(configuration = Clamour.configuration) click to toggle source

@param [Clamour::Configuration] configuration

# File lib/clamour/bus.rb, line 25
def initialize(configuration = Clamour.configuration)
  @configuration = configuration
  @connection_settings = configuration.rabbit_mq.to_hash
  @exchange_name = configuration.exchange
  @logger = configuration.logger
end

Public Instance Methods

before_shutdown(&block) click to toggle source

Do something gentle on SIGINT

# File lib/clamour/bus.rb, line 116
def before_shutdown(&block)
  Signal.trap('INT') do
    logger.info 'Shutting down on SIGINT...'
    block.call if block_given?
  end
end
dump_json(message) click to toggle source

@param [Clamour::Message] message @return [String]

# File lib/clamour/bus.rb, line 125
def dump_json(message)
  Oj.dump(message, mode: :compat)
end
em_publish(message, &block) click to toggle source

@param [Clamour::Message] message

# File lib/clamour/bus.rb, line 56
def em_publish(message, &block)
  logger.debug "Message #{message.inspect} is going to be published"
  if configuration.enable_connection?
    AMQP.connect(connection_settings) do |connection|
      AMQP::Channel.new(connection) do |channel|
        channel.fanout(exchange_name, durable: true) do |exchange|
          options = { content_type: 'application/json' }
          exchange.publish(dump_json(message), options) do
            logger.debug "Message #{message.inspect} is published to #{exchange_name}"
            connection.disconnect do
              block.call if block_given?
            end
          end
        end
      end
    end
  else
    logger.debug "Connection is disabled. Message #{message.inspect} is not really published"
    block.call if block_given?
  end
end
em_subscribe(&block) click to toggle source
# File lib/clamour/bus.rb, line 78
def em_subscribe(&block)
  raise ArgumentError.new('You have to provide a block') unless block_given?

  if configuration.enable_connection?
    AMQP.connect(connection_settings) do |connection|
      before_shutdown do
        connection.close do
          EM.stop
        end
      end

      AMQP::Channel.new(connection) do |channel|
        channel.fanout(exchange_name, durable: true) do |exchange|
          EM.schedule do
            channel.queue('', exclusive: true) do |queue|
              queue.bind(exchange).subscribe do |header, delivery|
                message_hash =
                    case header.content_type
                      when 'application/json'
                        ActiveSupport::HashWithIndifferentAccess.new(load_json(delivery))
                      else
                        raise WrongContentTypeError.new("Got #{delivery.inspect} for content type #{header.content_type}")
                    end
                logger.debug "Got hash #{message_hash}"
                block.call(message_hash)
              end
            end
          end
        end
      end
    end
  else
    logger.info 'Connection is disabled. Doing nothing...'
    before_shutdown { EM.stop }
  end
end
load_json(json) click to toggle source

@param [String] json @return [Hash]

# File lib/clamour/bus.rb, line 131
def load_json(json)
  Oj.load(json)
end
publish(message) click to toggle source

@param [Clamour::Message] message

# File lib/clamour/bus.rb, line 33
def publish(message)
  if EM.reactor_running?
    em_publish(message)
  else
    EM.run do
      em_publish(message) do
        EM.stop
      end
    end
  end
end
subscribe(&block) click to toggle source
# File lib/clamour/bus.rb, line 45
def subscribe(&block)
  if EM.reactor_running?
    em_subscribe(&block)
  else
    EM.run do
      em_subscribe(&block)
    end
  end
end