class Emque::Consuming::Consumer
Public Instance Methods
process(message)
click to toggle source
# File lib/emque/consuming/consumer.rb, line 12 def process(message) pipe(message, :through => [:parse, :route]) rescue => e handle_error(e, message) raise end
Private Instance Methods
handle_error(e, subject)
click to toggle source
# File lib/emque/consuming/consumer.rb, line 36 def handle_error(e, subject) context = { :consumer => self.class.name, :message => { :current => subject.values, :original => subject.original }, :topic => subject.topic } # log the error by default Emque::Consuming.logger.error("Error consuming message #{e}") Emque::Consuming.logger.error(context) Emque::Consuming.logger.error e.backtrace.join("\n") unless e.backtrace.nil? Emque::Consuming.config.error_handlers.each do |handler| handler.call(e, context) end Emque::Consuming.application.instance.notice_error(context) end
parse(message)
click to toggle source
# File lib/emque/consuming/consumer.rb, line 21 def parse(message) message.with( :values => Oj.load(message.original, :symbol_keys => true) ) end
route(message)
click to toggle source
# File lib/emque/consuming/consumer.rb, line 28 def route(message) Emque::Consuming.application.router.route( message.values.fetch(:metadata).fetch(:topic), message.values.fetch(:metadata).fetch(:type), message ) end