class Jackhammer::Topic

Constants

QUEUE_NAME_KEY
ROUTING_KEY_KEY

Attributes

queue_config[R]
topic[R]

Public Class Methods

new(name:, queue_config:, options: {}) click to toggle source
# File lib/jackhammer/topic.rb, line 6
def initialize(name:, queue_config:, options: {})
  @topic = Jackhammer.channel.topic(name, options)
  @queue_config = normalize_queue_config(queue_config)
end

Public Instance Methods

publish(message, options) click to toggle source

We're expecting the client to specify at least the routing_key in options for each message published.

# File lib/jackhammer/topic.rb, line 17
def publish(message, options)
  Jackhammer.client_middleware.call(message, Jackhammer.publish_options(options)) do |msg, opts|
    topic.publish msg, opts
  end
end
queues() click to toggle source
# File lib/jackhammer/topic.rb, line 23
def queues
  return @queues if @queues

  @queues = queue_config.map do |options|
    handler = MessageReceiver.new(options.delete('handler'))
    routing_key = fetch_and_delete_key(options, ROUTING_KEY_KEY)
    queue_name = options.delete(QUEUE_NAME_KEY) || QueueName.from_routing_key(routing_key)
    queue = Jackhammer.channel.queue(queue_name, options)
    Log.info { "'#{queue_name}' configured to subscribe on '#{routing_key}'" }
    Queue.new(topic: topic, queue: queue, handler: handler, routing_key: routing_key)
  end
end
subscribe_queues() click to toggle source
# File lib/jackhammer/topic.rb, line 11
def subscribe_queues
  queues.each(&:subscribe)
end

Private Instance Methods

fetch_and_delete_key(options, key) click to toggle source
# File lib/jackhammer/topic.rb, line 53
def fetch_and_delete_key(options, key)
  options.delete(key) || fail(InvalidConfigError, "#{key} not found in #{options.inspect}")
end
normalize_queue_config(config) click to toggle source

`queue_config` can be either:

  • an array of options containing `queue_name` key

or

  • a hash containing `queue_name => options` pairs

# File lib/jackhammer/topic.rb, line 44
def normalize_queue_config(config)
  return config if config.is_a?(Array)

  config.map do |name, options|
    options[QUEUE_NAME_KEY] = name
    options
  end
end