class Banter::Publisher
Attributes
channel[R]
publisher[R]
Public Class Methods
instance()
click to toggle source
# File lib/banter/publisher.rb, line 8 def self.instance if @@publisher.nil? @@publisher = ::Banter::Publisher.new end @@publisher end
new(exchange = nil, exchange_type = :topic)
click to toggle source
# File lib/banter/publisher.rb, line 20 def initialize(exchange = nil, exchange_type = :topic) @exchange = exchange || Banter::Configuration.exchange_name @exchange_type = exchange_type @disabled = false @batch_messages = false @stack_depth = 0 end
teardown()
click to toggle source
# File lib/banter/publisher.rb, line 15 def self.teardown @@publisher.teardown if @publisher @@publisher = nil end
Public Instance Methods
delay_messages() { || ... }
click to toggle source
# File lib/banter/publisher.rb, line 33 def delay_messages delay_start yield ensure delay_execute end
enable(value)
click to toggle source
# File lib/banter/publisher.rb, line 28 def enable(value) @disabled = !value @disabled end
execute_publish(routing_key, envelope, use_default_exchange=false)
click to toggle source
Calls the ampq server with normalized data @param routing_key [String] - routing_key that amqp server uses @param envelope [Hash] - normalized data to publish
# File lib/banter/publisher.rb, line 89 def execute_publish(routing_key, envelope, use_default_exchange=false) if @publisher.nil? start end if @disabled || @publisher.nil? Banter::RabbitLogger.failed_publish(routing_key, {}, envelope) else tries = 2 begin instance = use_default_exchange ? @channel.default_exchange : @publisher instance.publish(envelope.to_json, :persistent => true, :mandatory => true, :timestamp => envelope[:ts], :content_type => "application/json", :routing_key => routing_key) Banter::RabbitLogger.log_publish(routing_key, envelope) # FIX!!! -thl # What kind of errors could be fired from a failure to publish? # Should we be more specific? # Docs only have errors while connecting, and really not during some sort of long running socket. For now # We'll log until we get more info. rescue => e Banter::RabbitLogger.log(Logger::WARN, "Error occured on publish: #{e.message}: #{e.inspect}, #{routing_key}: #{envelope.inspect}") tries -= 1 teardown start if tries > 0 && @publisher retry else Banter::RabbitLogger.failed_publish(routing_key, { error: e.message }, envelope) end end end end
publish(context, key, payload)
click to toggle source
# File lib/banter/publisher.rb, line 66 def publish(context, key, payload) routing_key = "#{@exchange}.#{key}" envelope = ::Banter::Message.new.serialize(context, key, payload) if @batch_messages add_message(routing_key, envelope) else execute_publish(routing_key, envelope) end end
publish_to_queue(queue_name, envelope)
click to toggle source
Special method to publish to specific queues. This is useful for deadletter queue processing, where the dead letter will want to fire a message to a specific queue, as well at the consumer will want to inform the dead letter queue that it is finished processing a retry @param queue_name [String] name to publish to (can be retrieved from headers[:queue] from dead letter exchange processor) @param envelope [Hash] hash of contents that the amqp server returns
# File lib/banter/publisher.rb, line 82 def publish_to_queue(queue_name, envelope) execute_publish(queue_name, envelope, true) end
start()
click to toggle source
# File lib/banter/publisher.rb, line 40 def start unless Configuration.push_enabled @disabled = true return end # grab server configuration from initialization file somewhere begin @connection = Bunny.new(Configuration.connection) @connection.start @channel = @connection.create_channel @publisher = @channel.send(@exchange_type, @exchange, :durable => true, :auto_delete => false) rescue => e ::Banter::Notifier.notify(e, parameters: { message: e.message }, environment_name: ENV['RAILS_ENV']) return end @publisher.on_return do |return_info, properties, content| # contents are already transformed into message that we want to send Banter::RabbitLogger.failed_publish(return_info[:routing_key], properties, Hashie::Mash.new(::JSON.parse(content))) end end
Private Instance Methods
add_message(key, envelope)
click to toggle source
# File lib/banter/publisher.rb, line 153 def add_message(key, envelope) Banter::RabbitLogger.log(Logger::INFO, "Adding #{key} to delayed publish with #{envelope}") @messages << [key, envelope] end
delay_execute()
click to toggle source
# File lib/banter/publisher.rb, line 141 def delay_execute @stack_depth -= 1 if @stack_depth == 0 Banter::RabbitLogger.log(Logger::INFO, "Publishing all messages from delayed publish: #{@messages.count}") @messages.each do |key, envelope| execute_publish(key, envelope) end @messages.clear @batch_messages = false end end
delay_start()
click to toggle source
# File lib/banter/publisher.rb, line 135 def delay_start @batch_messages = true @stack_depth += 1 @messages = [] end
teardown()
click to toggle source
# File lib/banter/publisher.rb, line 126 def teardown begin @connection.close rescue => e Banter::RabbitLogger.log(Logger::WARN, "RabbitMQ teardown failed: #{e.message}: #{e.backtrace}") end @publisher = nil end