class Emque::Consuming::Consumer

Public Instance Methods

process(message) click to toggle source
# File lib/emque/consuming/consumer.rb, line 12
def process(message)
  pipe(message, :through => [:parse, :route])
rescue => e
  handle_error(e, message)
  raise
end

Private Instance Methods

handle_error(e, subject) click to toggle source
# File lib/emque/consuming/consumer.rb, line 36
def handle_error(e, subject)
  context = {
    :consumer => self.class.name,
    :message => {
      :current => subject.values,
      :original => subject.original
    },
    :topic => subject.topic
  }

  # log the error by default
  Emque::Consuming.logger.error("Error consuming message #{e}")
  Emque::Consuming.logger.error(context)
  Emque::Consuming.logger.error e.backtrace.join("\n") unless e.backtrace.nil?

  Emque::Consuming.config.error_handlers.each do |handler|
    handler.call(e, context)
  end

  Emque::Consuming.application.instance.notice_error(context)
end
parse(message) click to toggle source
# File lib/emque/consuming/consumer.rb, line 21
def parse(message)
  message.with(
    :values =>
      Oj.load(message.original, :symbol_keys => true)
  )
end
route(message) click to toggle source
# File lib/emque/consuming/consumer.rb, line 28
def route(message)
  Emque::Consuming.application.router.route(
    message.values.fetch(:metadata).fetch(:topic),
    message.values.fetch(:metadata).fetch(:type),
    message
  )
end