class Consumers::Base
Constants
- QUEUE_NAME
The queue name should be defined here.
Attributes
queue_name[RW]
Public Instance Methods
channel()
click to toggle source
Create channel with the established connection.
# File lib/smart_que/consumers/base.rb, line 27 def channel # Create new channel if closed if @channel.nil? || @channel.closed? @channel = connection.create_channel end @channel end
config()
click to toggle source
# File lib/smart_que/consumers/base.rb, line 35 def config ::SmartQue.config end
connection()
click to toggle source
Establish connection to Message Queues.
# File lib/smart_que/consumers/base.rb, line 22 def connection ::SmartQue.establish_connection end
queue()
click to toggle source
This method will return the default queue which present in the message queues. Consumer specific queue should be defined and implemented in the consumer sub classes.
# File lib/smart_que/consumers/base.rb, line 17 def queue @queue ||= channel.queue(queue_name) end
start()
click to toggle source
Method which kick start the consumer process thread
# File lib/smart_que/consumers/base.rb, line 40 def start channel.prefetch(10) queue.subscribe(manual_ack: true, exclusive: false) do |delivery_info, metadata, payload| begin body = JSON.parse(payload).with_indifferent_access status = run(body) rescue => e status = :error end if status == :ok channel.ack(delivery_info.delivery_tag) elsif status == :retry channel.reject(delivery_info.delivery_tag, true) else # :error, nil etc channel.reject(delivery_info.delivery_tag, false) end end wait_for_threads end
wait_for_threads()
click to toggle source
# File lib/smart_que/consumers/base.rb, line 62 def wait_for_threads sleep end