class Rmsg::Task

Task handles publishing tasks and processing them.

Public Class Methods

new(params) click to toggle source

When initializing a task handler, the queue will be declared durable, to survive RabbitMQ restarts. @param params [Hash] @option params :rabbit [Rmsg::Rabbit] Example: Rmsg::Rabbit.new @option params :queue [String] Example: ‘messages’

# File lib/rmsg/task.rb, line 9
def initialize(params)
  @rabbit = params[:rabbit]
  @queue = @rabbit.channel.queue(params[:queue], durable: true)
end

Public Instance Methods

publish(message) click to toggle source

Publish a message in the tasks queue. It is marked a persistent to survive RabbitMQ restarts. @param message [Hash] The message to be consumed.

# File lib/rmsg/task.rb, line 17
def publish(message)
  @queue.publish(message.to_json, presistent: true)
end
subscribe() { |message| ... } click to toggle source

Subscribe to the tasks queue. Subscribing happens by continuously blocking the current process. It is specifically designed for long running processes. When receiving INT it will gracefully close. Consumer processes have a prefetch value of 1 for round-robin distribution. Consumer processes will send a manual ack after processing, to avoid losing tasks. @yieldparam message [Hash] The message received, to be processed within the block.

# File lib/rmsg/task.rb, line 28
def subscribe
  @rabbit.channel.prefetch(1)
  begin
    @queue.subscribe(block: true, manual_ack: true) do |delivery_info, metadata, payload|
      message = JSON.parse(payload, symbolize_names: true)
      yield message
      @rabbit.channel.ack(delivery_info.delivery_tag)
    end
  rescue Interrupt => _
    @rabbit.close
  end
end