class Philotic::Publisher
Attributes
connection[RW]
log_message_handler[RW]
Public Class Methods
new(connection)
click to toggle source
# File lib/philotic/publisher.rb, line 10 def initialize(connection) @connection = connection end
Public Instance Methods
config()
click to toggle source
# File lib/philotic/publisher.rb, line 18 def config connection.config end
logger()
click to toggle source
# File lib/philotic/publisher.rb, line 14 def logger connection.logger end
normalize_payload_times(payload)
click to toggle source
# File lib/philotic/publisher.rb, line 35 def normalize_payload_times(payload) payload.each do |k, v| if v.respond_to?(:utc) payload[k] = v.utc elsif v.respond_to?(:to_utc) payload[k] = v.to_utc end end end
publish(message)
click to toggle source
# File lib/philotic/publisher.rb, line 22 def publish(message) metadata = {headers: message.headers} metadata.merge!(message.metadata) if message.metadata begin message.published = _publish(message.payload, metadata) rescue => e message.publish_error = e logger.error e.message raise e if config.raise_error_on_publish end message end
Private Instance Methods
_publish(payload, metadata = {})
click to toggle source
# File lib/philotic/publisher.rb, line 46 def _publish(payload, metadata = {}) if config.disable_publish log_message_published(:warn, metadata, payload, 'attempted to publish a message when publishing is disabled.') return false end connection.connect! unless connection.connected? log_message_published(:error, metadata, payload, 'unable to publish message, not connected to RabbitMQ') return false end metadata = merge_metadata(metadata) payload = normalize_payload_times(payload) connection.exchange.publish(Philotic::Serialization::Serializer.dump(payload, metadata), metadata) log_message_published(:debug, metadata, payload, 'published message') true end
log_message_published(severity, metadata, payload, message)
click to toggle source
# File lib/philotic/publisher.rb, line 80 def log_message_published(severity, metadata, payload, message) if @log_message_handler @log_message_handler.call(severity, metadata, payload, message) else logger.send(severity, "#{message}; metadata:#{metadata}, payload:#{payload.to_json}") end end
merge_metadata(metadata)
click to toggle source
# File lib/philotic/publisher.rb, line 65 def merge_metadata(metadata) publish_defaults = {} Philotic::MESSAGE_OPTIONS.each do |key| publish_defaults[key] = config.send(key.to_s) end metadata = publish_defaults.merge metadata metadata[:headers] ||= {} metadata[:headers] = {philotic_firehose: true}.merge(metadata[:headers]) metadata end
on_publish_message(&block)
click to toggle source
# File lib/philotic/publisher.rb, line 76 def on_publish_message(&block) @log_message_handler = block end