class Jackhammer::Topic
Constants
- QUEUE_NAME_KEY
- ROUTING_KEY_KEY
Attributes
queue_config[R]
topic[R]
Public Class Methods
new(name:, queue_config:, options: {})
click to toggle source
# File lib/jackhammer/topic.rb, line 6 def initialize(name:, queue_config:, options: {}) @topic = Jackhammer.channel.topic(name, options) @queue_config = normalize_queue_config(queue_config) end
Public Instance Methods
publish(message, options)
click to toggle source
We're expecting the client to specify at least the routing_key in options for each message published.
# File lib/jackhammer/topic.rb, line 17 def publish(message, options) Jackhammer.client_middleware.call(message, Jackhammer.publish_options(options)) do |msg, opts| topic.publish msg, opts end end
queues()
click to toggle source
# File lib/jackhammer/topic.rb, line 23 def queues return @queues if @queues @queues = queue_config.map do |options| handler = MessageReceiver.new(options.delete('handler')) routing_key = fetch_and_delete_key(options, ROUTING_KEY_KEY) queue_name = options.delete(QUEUE_NAME_KEY) || QueueName.from_routing_key(routing_key) queue = Jackhammer.channel.queue(queue_name, options) Log.info { "'#{queue_name}' configured to subscribe on '#{routing_key}'" } Queue.new(topic: topic, queue: queue, handler: handler, routing_key: routing_key) end end
subscribe_queues()
click to toggle source
# File lib/jackhammer/topic.rb, line 11 def subscribe_queues queues.each(&:subscribe) end
Private Instance Methods
fetch_and_delete_key(options, key)
click to toggle source
# File lib/jackhammer/topic.rb, line 53 def fetch_and_delete_key(options, key) options.delete(key) || fail(InvalidConfigError, "#{key} not found in #{options.inspect}") end
normalize_queue_config(config)
click to toggle source
`queue_config` can be either:
-
an array of options containing `queue_name` key
or
-
a hash containing `queue_name => options` pairs
# File lib/jackhammer/topic.rb, line 44 def normalize_queue_config(config) return config if config.is_a?(Array) config.map do |name, options| options[QUEUE_NAME_KEY] = name options end end