module GrapeApe::Consumer::Application

Public Instance Methods

amqp_channel() click to toggle source
# File lib/grape_ape/consumer/application.rb, line 12
def amqp_channel
  @channel
end
connect() click to toggle source
# File lib/grape_ape/consumer/application.rb, line 27
def connect
  AMQP.connect do |connection|
    @channel = AMQP::Channel.new(connection)

    @keys_and_klasses.each do |key, value|
      q = @channel.queue(key, :auto_delete => true)
      q.subscribe do |metadata, payload|
        value.new.handle_message(metadata, payload)
      end
    end
  end
end
register(routing_key, klass) click to toggle source
# File lib/grape_ape/consumer/application.rb, line 16
def register(routing_key, klass)
  @keys_and_klasses ||= {}
  @keys_and_klasses[routing_key] = klass
end
run!() click to toggle source
# File lib/grape_ape/consumer/application.rb, line 21
def run!
  start do
    connect
  end
end
start() { || ... } click to toggle source
# File lib/grape_ape/consumer/application.rb, line 46
def start
  EM.epoll
  EM.synchrony do
    trap('INT') { stop }
    trap('TERM') { stop }
    yield if block_given?
  end
end
stop() click to toggle source

Stops the consumer running.

# File lib/grape_ape/consumer/application.rb, line 41
def stop
  logger.info('Stopping the consumers...')
  EM.stop
end