module Mumukit::Nuntius::Consumer

Public Class Methods

handle_message(channel, delivery_info, properties, body, &block) click to toggle source
# File lib/mumukit/nuntius/consumer.rb, line 41
def handle_message(channel, delivery_info, properties, body, &block)
  block.call delivery_info, properties, parse_body(body)
  channel.ack delivery_info.delivery_tag
rescue => e
  Mumukit::Nuntius::Logger.warn "Failed to read body: #{e.message} \n #{e.backtrace}"
  channel.nack delivery_info.delivery_tag, false, true
end
negligent_start!(queue_name, &block) click to toggle source
# File lib/mumukit/nuntius/consumer.rb, line 22
def negligent_start!(queue_name, &block)
  start queue_name, queue_name do |_delivery_info, _properties, body|
    begin
      block.call(body)
    rescue => e
      Mumukit::Nuntius::Logger.error "#{queue_name} item couldn't be processed #{e}. body was: #{body}"
    end
  end
end
parse_body(body) click to toggle source
# File lib/mumukit/nuntius/consumer.rb, line 49
def parse_body(body)
  JSON.parse(body).with_indifferent_access
end
start(queue_name, exchange_name, &block) click to toggle source
# File lib/mumukit/nuntius/consumer.rb, line 5
def start(queue_name, exchange_name, &block)
  Mumukit::Nuntius::Logger.info "Attaching to queue #{queue_name}"
  Mumukit::Nuntius::Connection.establish_connection
  channel, exchange = Mumukit::Nuntius::Connection.start_channel(exchange_name)
  queue = channel.queue(queue_name, durable: true)
  queue.bind(exchange)
  channel.prefetch(1)

  begin
    subscribe queue, channel, &block
  rescue Interrupt => _
    Mumukit::Nuntius::Logger.info "Leaving queue #{queue_name}"
  ensure
    channel.close
  end
end
subscribe(queue, channel, &block) click to toggle source
# File lib/mumukit/nuntius/consumer.rb, line 32
def subscribe(queue, channel, &block)
  Mumukit::Nuntius::Logger.debug "Subscribed to queue #{queue}"

  queue.subscribe(manual_ack: true, block: true) do |delivery_info, properties, body|
    Mumukit::Nuntius::Logger.debug "Processing message #{body}"
    handle_message channel, delivery_info, properties, body, &block
  end
end