module ManageIQ::Messaging::Kafka::Queue
Constants
- GROUP_FOR_QUEUE_MESSAGES
Private Instance Methods
publish_message_impl(options)
click to toggle source
# File lib/manageiq/messaging/kafka/queue.rb, line 9 def publish_message_impl(options) raise ArgumentError, "Kafka messaging implementation does not take a block" if block_given? raw_publish(*queue_for_publish(options)).wait end
publish_messages_impl(messages)
click to toggle source
# File lib/manageiq/messaging/kafka/queue.rb, line 14 def publish_messages_impl(messages) handles = messages.collect { |msg_options| raw_publish(*queue_for_publish(msg_options)) } handles.each(&:wait) end
subscribe_messages_impl(options, &block)
click to toggle source
# File lib/manageiq/messaging/kafka/queue.rb, line 19 def subscribe_messages_impl(options, &block) topic = address(options) options[:persist_ref] = GROUP_FOR_QUEUE_MESSAGES + topic queue_consumer = consumer(true, options) queue_consumer.subscribe(topic) queue_consumer.each do |message| process_queue_message(queue_consumer, topic, message, &block) end end