class Consumers::Base

Constants

QUEUE_NAME

The queue name should be defined here.

Attributes

queue_name[RW]

Public Instance Methods

channel() click to toggle source

Create channel with the established connection.

# File lib/smart_que/consumers/base.rb, line 27
def channel
  # Create new channel if closed
  if @channel.nil? || @channel.closed?
    @channel = connection.create_channel
  end
  @channel
end
config() click to toggle source
# File lib/smart_que/consumers/base.rb, line 35
def config
  ::SmartQue.config
end
connection() click to toggle source

Establish connection to Message Queues.

# File lib/smart_que/consumers/base.rb, line 22
def connection
  ::SmartQue.establish_connection
end
queue() click to toggle source

This method will return the default queue which present in the message queues. Consumer specific queue should be defined and implemented in the consumer sub classes.

# File lib/smart_que/consumers/base.rb, line 17
def queue
  @queue ||= channel.queue(queue_name)
end
start() click to toggle source

Method which kick start the consumer process thread

# File lib/smart_que/consumers/base.rb, line 40
def start
  channel.prefetch(10)
  queue.subscribe(manual_ack: true, exclusive: false) do |delivery_info, metadata, payload|
    begin
      body = JSON.parse(payload).with_indifferent_access
      status = run(body)
    rescue => e
      status = :error
    end

    if status == :ok
      channel.ack(delivery_info.delivery_tag)
    elsif status == :retry
      channel.reject(delivery_info.delivery_tag, true)
    else # :error, nil etc
      channel.reject(delivery_info.delivery_tag, false)
    end
  end

  wait_for_threads
end
wait_for_threads() click to toggle source
# File lib/smart_que/consumers/base.rb, line 62
def wait_for_threads
  sleep
end