class Banter::Publisher

Attributes

channel[R]
publisher[R]

Public Class Methods

instance() click to toggle source
# File lib/banter/publisher.rb, line 8
def self.instance
  if @@publisher.nil?
    @@publisher = ::Banter::Publisher.new
  end
  @@publisher
end
new(exchange = nil, exchange_type = :topic) click to toggle source
# File lib/banter/publisher.rb, line 20
def initialize(exchange = nil, exchange_type = :topic)
  @exchange = exchange || Banter::Configuration.exchange_name
  @exchange_type = exchange_type
  @disabled = false
  @batch_messages = false
  @stack_depth = 0
end
teardown() click to toggle source
# File lib/banter/publisher.rb, line 15
def self.teardown
  @@publisher.teardown if @publisher
  @@publisher = nil
end

Public Instance Methods

delay_messages() { || ... } click to toggle source
# File lib/banter/publisher.rb, line 33
def delay_messages
  delay_start
  yield
ensure
  delay_execute
end
enable(value) click to toggle source
# File lib/banter/publisher.rb, line 28
def enable(value)
  @disabled = !value
  @disabled
end
execute_publish(routing_key, envelope, use_default_exchange=false) click to toggle source

Calls the ampq server with normalized data @param routing_key [String] - routing_key that amqp server uses @param envelope [Hash] - normalized data to publish

# File lib/banter/publisher.rb, line 89
def execute_publish(routing_key, envelope, use_default_exchange=false)


   if @publisher.nil?
     start
   end

   if @disabled || @publisher.nil?
     Banter::RabbitLogger.failed_publish(routing_key, {}, envelope)
   else
     tries = 2
     begin
       instance = use_default_exchange ? @channel.default_exchange : @publisher
       instance.publish(envelope.to_json, :persistent => true, :mandatory => true, :timestamp => envelope[:ts], :content_type => "application/json", :routing_key => routing_key)
       Banter::RabbitLogger.log_publish(routing_key, envelope)
     # FIX!!! -thl
     # What kind of errors could be fired from a failure to publish?
     # Should we be more specific?
     # Docs only have errors while connecting, and really not during some sort of long running socket.  For now
     # We'll log until we get more info.
     rescue => e
       Banter::RabbitLogger.log(Logger::WARN, "Error occured on publish: #{e.message}: #{e.inspect}, #{routing_key}: #{envelope.inspect}")
       tries -= 1
       teardown
       start
       if tries > 0 && @publisher
         retry
       else
         Banter::RabbitLogger.failed_publish(routing_key, { error: e.message }, envelope)
       end
     end

   end

 end
publish(context, key, payload) click to toggle source
# File lib/banter/publisher.rb, line 66
def publish(context, key, payload)
  routing_key = "#{@exchange}.#{key}"
  envelope    = ::Banter::Message.new.serialize(context, key, payload)

  if @batch_messages
    add_message(routing_key, envelope)
  else
    execute_publish(routing_key, envelope)
  end
end
publish_to_queue(queue_name, envelope) click to toggle source

Special method to publish to specific queues. This is useful for deadletter queue processing, where the dead letter will want to fire a message to a specific queue, as well at the consumer will want to inform the dead letter queue that it is finished processing a retry @param queue_name [String] name to publish to (can be retrieved from headers[:queue] from dead letter exchange processor) @param envelope [Hash] hash of contents that the amqp server returns

# File lib/banter/publisher.rb, line 82
def publish_to_queue(queue_name, envelope)
  execute_publish(queue_name, envelope, true)
end
start() click to toggle source
# File lib/banter/publisher.rb, line 40
def start
  unless Configuration.push_enabled
    @disabled = true
    return
  end

  # grab server configuration from initialization file somewhere
  begin
    @connection = Bunny.new(Configuration.connection)
    @connection.start

    @channel   = @connection.create_channel
    @publisher = @channel.send(@exchange_type, @exchange, :durable => true, :auto_delete => false)

  rescue => e
    ::Banter::Notifier.notify(e, parameters: { message: e.message }, environment_name: ENV['RAILS_ENV'])
    return
  end

  @publisher.on_return do |return_info, properties, content|
    # contents are already transformed into message that we want to send
    Banter::RabbitLogger.failed_publish(return_info[:routing_key], properties, Hashie::Mash.new(::JSON.parse(content)))
  end

end

Private Instance Methods

add_message(key, envelope) click to toggle source
# File lib/banter/publisher.rb, line 153
def add_message(key, envelope)
  Banter::RabbitLogger.log(Logger::INFO, "Adding #{key} to delayed publish with #{envelope}")
  @messages << [key, envelope]
end
delay_execute() click to toggle source
# File lib/banter/publisher.rb, line 141
def delay_execute
  @stack_depth -= 1
  if @stack_depth == 0
    Banter::RabbitLogger.log(Logger::INFO, "Publishing all messages from delayed publish: #{@messages.count}")
    @messages.each do |key, envelope|
      execute_publish(key, envelope)
    end
    @messages.clear
    @batch_messages = false
  end
end
delay_start() click to toggle source
# File lib/banter/publisher.rb, line 135
def delay_start
  @batch_messages = true
  @stack_depth += 1
  @messages = []
end
teardown() click to toggle source
# File lib/banter/publisher.rb, line 126
def teardown
  begin
    @connection.close
  rescue => e
    Banter::RabbitLogger.log(Logger::WARN, "RabbitMQ teardown failed: #{e.message}: #{e.backtrace}")
  end
  @publisher = nil
end