class Emque::Producing::Publisher::RabbitMq
Constants
- CHANNEL_POOL
- CONFIRM_CHANNEL_POOL
- CONN
Public Instance Methods
get_channel(raise_on_failure)
click to toggle source
# File lib/emque/producing/publisher/rabbitmq.rb, line 65 def get_channel(raise_on_failure) begin if raise_on_failure ch = CONFIRM_CHANNEL_POOL.pop(true) else ch = CHANNEL_POOL.pop(true) end rescue ThreadError ch = CONN.create_channel end end
publish(topic, message_type, message, key = nil, raise_on_failure)
click to toggle source
# File lib/emque/producing/publisher/rabbitmq.rb, line 21 def publish(topic, message_type, message, key = nil, raise_on_failure) ch = get_channel(raise_on_failure) ch.open if ch.closed? begin exchange = ch.fanout(topic, :durable => true, :auto_delete => false) ch.confirm_select if raise_on_failure sent = true exchange.on_return do |return_info, properties, content| Emque::Producing.logger.warn("App [#{properties[:app_id]}] message was returned from exchange [#{return_info[:exchange]}]") sent = false end exchange.publish( message, :mandatory => true, :persistent => true, :type => message_type, :app_id => Emque::Producing.configuration.app_name, :content_type => "application/json" ) if raise_on_failure success = ch.wait_for_confirms unless success Emque::Producing.logger.warn("RabbitMQ Publisher: message was nacked") ch.nacked_set.each do |n| Emque::Producing.logger.warn("message id: #{n}") end end end return sent ensure if raise_on_failure CONFIRM_CHANNEL_POOL << ch unless ch.nil? else CHANNEL_POOL << ch unless ch.nil? end end end