class Philotic::Publisher

Attributes

connection[RW]
log_message_handler[RW]

Public Class Methods

new(connection) click to toggle source
# File lib/philotic/publisher.rb, line 10
def initialize(connection)
  @connection = connection
end

Public Instance Methods

config() click to toggle source
# File lib/philotic/publisher.rb, line 18
def config
  connection.config
end
logger() click to toggle source
# File lib/philotic/publisher.rb, line 14
def logger
  connection.logger
end
normalize_payload_times(payload) click to toggle source
# File lib/philotic/publisher.rb, line 35
def normalize_payload_times(payload)
  payload.each do |k, v|
    if v.respond_to?(:utc)
      payload[k] = v.utc
    elsif v.respond_to?(:to_utc)
      payload[k] = v.to_utc
    end
  end
end
publish(message) click to toggle source
# File lib/philotic/publisher.rb, line 22
def publish(message)
  metadata = {headers: message.headers}
  metadata.merge!(message.metadata) if message.metadata
  begin
    message.published = _publish(message.payload, metadata)
  rescue => e
    message.publish_error = e
    logger.error e.message
    raise e if config.raise_error_on_publish
  end
  message
end

Private Instance Methods

_publish(payload, metadata = {}) click to toggle source
# File lib/philotic/publisher.rb, line 46
def _publish(payload, metadata = {})
  if config.disable_publish
    log_message_published(:warn, metadata, payload, 'attempted to publish a message when publishing is disabled.')
    return false
  end
  connection.connect!
  unless connection.connected?
    log_message_published(:error, metadata, payload, 'unable to publish message, not connected to RabbitMQ')
    return false
  end
  metadata = merge_metadata(metadata)

  payload = normalize_payload_times(payload)

  connection.exchange.publish(Philotic::Serialization::Serializer.dump(payload, metadata), metadata)
  log_message_published(:debug, metadata, payload, 'published message')
  true
end
log_message_published(severity, metadata, payload, message) click to toggle source
# File lib/philotic/publisher.rb, line 80
def log_message_published(severity, metadata, payload, message)
  if @log_message_handler
    @log_message_handler.call(severity, metadata, payload, message)
  else
    logger.send(severity, "#{message}; metadata:#{metadata}, payload:#{payload.to_json}")
  end
end
merge_metadata(metadata) click to toggle source
# File lib/philotic/publisher.rb, line 65
def merge_metadata(metadata)
  publish_defaults = {}
  Philotic::MESSAGE_OPTIONS.each do |key|
    publish_defaults[key] = config.send(key.to_s)
  end
  metadata           = publish_defaults.merge metadata
  metadata[:headers] ||= {}
  metadata[:headers] = {philotic_firehose: true}.merge(metadata[:headers])
  metadata
end
on_publish_message(&block) click to toggle source
# File lib/philotic/publisher.rb, line 76
def on_publish_message(&block)
  @log_message_handler = block
end