class SimpleSqs::Processor

Public Instance Methods

process_sqs_message(json_message_body, sqs_message = nil, transaction_safe = true) click to toggle source
# File lib/simple_sqs/processor.rb, line 9
def process_sqs_message(json_message_body, sqs_message = nil, transaction_safe = true)
  if Object.const_defined?("ActiveRecord") && transaction_safe
    ActiveRecord::Base.transaction do
      json_message_body['Events'].each do |event|
        process event, sqs_message
      end
    end
  else
    json_message_body['Events'].each do |event|
      process event, sqs_message
    end
  end
end

Private Instance Methods

logger() click to toggle source
# File lib/simple_sqs/processor.rb, line 41
def logger
  @logger ||= Logger.new(STDOUT)
end
process(event, sqs_message) click to toggle source
# File lib/simple_sqs/processor.rb, line 24
def process event, sqs_message
  logger.debug "Processing SQS event #{event.inspect}, raw message: #{sqs_message.inspect}"
  Librato.timing("sqs.process", source: event['EventType']) do
    klass = SIMPLE_SQS_EVENTS_NAMESPACE.const_get(event['EventType'])
    sqs_event = klass.new(event.freeze, sqs_message)

    lag = ((Time.now - sqs_event.timestamp) * 1000).ceil
    Librato.measure("sqs.lag", lag, source: event['EventType'])
    Librato.increment("sqs.events", source: event['EventType'])

    sqs_event.process
  end
rescue NameError => e
  Raven.capture_exception(e, extra: {parameters: event, cgi_data: ENV})
  logger.error e
end