class Emque::Producing::Publisher::RabbitMq

Constants

CHANNEL_POOL
CONFIRM_CHANNEL_POOL
CONN

Public Instance Methods

get_channel(raise_on_failure) click to toggle source
# File lib/emque/producing/publisher/rabbitmq.rb, line 65
def get_channel(raise_on_failure)
  begin
    if raise_on_failure
      ch = CONFIRM_CHANNEL_POOL.pop(true)
    else
      ch = CHANNEL_POOL.pop(true)
    end
  rescue ThreadError
    ch = CONN.create_channel
  end
end
publish(topic, message_type, message, key = nil, raise_on_failure) click to toggle source
# File lib/emque/producing/publisher/rabbitmq.rb, line 21
def publish(topic, message_type, message, key = nil, raise_on_failure)
  ch = get_channel(raise_on_failure)

  ch.open if ch.closed?
  begin
    exchange = ch.fanout(topic, :durable => true, :auto_delete => false)

    ch.confirm_select if raise_on_failure
    sent = true

    exchange.on_return do |return_info, properties, content|
      Emque::Producing.logger.warn("App [#{properties[:app_id]}] message was returned from exchange [#{return_info[:exchange]}]")
      sent = false
    end

    exchange.publish(
      message,
      :mandatory => true,
      :persistent => true,
      :type => message_type,
      :app_id => Emque::Producing.configuration.app_name,
      :content_type => "application/json"
    )

    if raise_on_failure
      success = ch.wait_for_confirms
      unless success
        Emque::Producing.logger.warn("RabbitMQ Publisher: message was nacked")
        ch.nacked_set.each do |n|
          Emque::Producing.logger.warn("message id: #{n}")
        end
      end
    end

    return sent
  ensure
    if raise_on_failure
      CONFIRM_CHANNEL_POOL << ch unless ch.nil?
    else
      CHANNEL_POOL << ch unless ch.nil?
    end
  end
end