class QueueWorker

Constants

VERSION

Attributes

stomp[RW]
client[W]
handler[RW]
log[W]
queue[RW]

Public Class Methods

configure() { |self| ... } click to toggle source
# File lib/queue_worker.rb, line 16
def configure
  yield self
end
new(queue_name = nil, log = nil, &block) click to toggle source

@param [String] queue_name the queue to pub/sub @param [#info, warn, error] log @param [Proc] block to process a message received by subscribe

# File lib/queue_worker.rb, line 27
def initialize(queue_name = nil, log = nil, &block)
  @queue = queue_name
  @log = log
  @handler = block || proc { |body| Kernel.const_get(body[:class]).call(body[:args]) }
end
peek(queue_name, size = 1) click to toggle source

Peek at any number messages in the queue

@param [String] queue_name @param [Integer] size specify the number of messages to return

# File lib/queue_worker.rb, line 47
def self.peek(queue_name, size = 1)
  counter = 0
  messages = []
  worker = new(queue_name)
  worker.subscribe_with_timeout(2, size) do |message|
    counter += 1
    messages << JSON.parse(message.body).merge('message-id' => message.headers['message-id'])
    worker.quit if counter == size
  end
  messages
end
publish(queue, *messages) click to toggle source

Publish one or more messages to a queue

@param [String] queue name @param [Array] messages a list of objects that are or can be converted to JSON

# File lib/queue_worker.rb, line 37
def self.publish(queue, *messages)
  worker = new(queue)
  messages.each { |msg| worker.publish(msg) }
  worker.close
end
subscribe(*args, &block) click to toggle source

Start a subscription worker with the given args

# File lib/queue_worker.rb, line 60
def self.subscribe(*args, &block)
  worker = new(*args, &block)
  worker.subscribe
  worker.join
end

Public Instance Methods

call(message) click to toggle source

Handles subscribe callback

Tries to delegate processing of message to a class based on the name of the queue. For example:

If the queue is named "scheduled/default" it will look for a class called Scheduled::Default to
initialize with the message body and then call it's +call+ method

@param [Stomp::Message] message is the container object Stomp gives us for what is really a “frame” or package from the queue

# File lib/queue_worker.rb, line 120
def call(message)
  if message.command == 'MESSAGE'
    handler.call(JSON.parse(message.body, symbolize_names: true))
  end
rescue => e
  log.error(e.message) { "\n#{e.backtrace.inspect}" }
ensure
  ack(message)
  log.info('Processed') { %(#{message.headers['message-id']} from "#{message.headers['destination']}") }
end
client() click to toggle source
# File lib/queue_worker.rb, line 131
def client
  @client ||= Stomp::Client.new(self.class.stomp)
end
log() click to toggle source
# File lib/queue_worker.rb, line 135
def log
  @log ||= Logger.new(STDOUT)
end
publish(message, headers = {}) click to toggle source

Publish a message to a queue

@param [Hash] message - Data to serialize @param [Hash] headers - Additional header options for ActiveMQ

# File lib/queue_worker.rb, line 70
def publish(message, headers = {})
  message = message.to_json unless message.is_a?(String)
  client.publish("/queue/#{queue}", message, { priority: 4, persistent: true }.merge(headers))
end
Also aliased as: push
push(message, headers = {})
Alias for: publish
quit(queue_name = nil) click to toggle source

Unsubscribe from the current queue and close the connection

# File lib/queue_worker.rb, line 107
def quit(queue_name = nil)
  unsubscribe(queue_name)
  close
end
subscribe(queue_name = nil, size = 1, &block) click to toggle source

Subscribe (listen) to a queue

@param [String] queue_name specify the queue name @param [Integer] size specify the number of messages the block may receive without sending ack @param [Proc] block to handle the subscribe callback

# File lib/queue_worker.rb, line 82
def subscribe(queue_name = nil, size = 1, &block)
  callback = block || method(:call)
  client.subscribe("/queue/#{queue_name || queue}", { :ack => 'client', 'activemq.prefetchSize' => size }, &callback)
end
subscribe_with_timeout(duration, size = 1, &block) click to toggle source

Subscribe to a queue for a limited time

@param [Integer] duration to subscribe for before closing connection @param [Integer] size specify the number of messages the block may receive without sending ack @param [Proc] block to handle the subscribe callback

# File lib/queue_worker.rb, line 92
def subscribe_with_timeout(duration, size = 1, &block)
  Timeout::timeout(duration) do
    subscribe(nil, size, &block)
    join
  end
rescue Timeout::Error
  quit
end
unsubscribe(queue_name = nil) click to toggle source

Unsubscribe from the current queue

# File lib/queue_worker.rb, line 102
def unsubscribe(queue_name = nil)
  client.unsubscribe("/queue/#{queue_name || queue}")
end