class Alephant::Publisher::Queue::SQSHelper::Archiver
Attributes
async[R]
log_message_body[R]
log_validator[R]
storage[R]
Public Class Methods
new(storage, opts)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 14 def initialize(storage, opts) @storage = storage @async = opts[:async_store] @log_message_body = opts[:log_archive_message] @log_validator = opts[:log_validator] || -> _ { true } end
Public Instance Methods
see(message)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 21 def see(message) return if message.nil? message.tap do |m| async ? async_store(m) : sync_store(m) end end
Private Instance Methods
async_store(message)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 30 def async_store(message) Thread.new do logger.metric "AsynchronouslyArchivedData" store message end end
body_for(message)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 72 def body_for(message) log_message_body ? message.body : '{ "Message": "No message body available" }' end
date_key()
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 76 def date_key DateTime.now.strftime("%d-%m-%Y_%H") end
log_message_parts(id)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 65 def log_message_parts(id) [ "#{self.class}#store:", "'#archive/#{date_key}/#{id}'" ] end
meta_for(m)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 80 def meta_for(m) { :id => m.id, :md5 => m.md5, :logged_at => DateTime.now.to_s, :queue => m.queue.url } end
storage_key(id)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 61 def storage_key(id) "archive/#{date_key}/#{id}" end
store(message)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 42 def store(message) msg_body = body_for(message) store_item(message).tap do logger.info( "event" => "MessageStored", "messageBody" => msg_body, "method" => "#{self.class}#store" ) if log_validator.call(msg_body) end end
store_item(message)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 53 def store_item(message) storage.put( storage_key(message.id), message.body, meta_for(message) ) end
sync_store(message)
click to toggle source
# File lib/alephant/publisher/queue/sqs_helper/archiver.rb, line 37 def sync_store(message) logger.metric "SynchronouslyArchivedData" store message end