module ManageIQ::Messaging::Kafka::Queue

Constants

GROUP_FOR_QUEUE_MESSAGES

Private Instance Methods

publish_message_impl(options) click to toggle source
# File lib/manageiq/messaging/kafka/queue.rb, line 9
def publish_message_impl(options)
  raise ArgumentError, "Kafka messaging implementation does not take a block" if block_given?
  raw_publish(*queue_for_publish(options)).wait
end
publish_messages_impl(messages) click to toggle source
# File lib/manageiq/messaging/kafka/queue.rb, line 14
def publish_messages_impl(messages)
  handles = messages.collect { |msg_options| raw_publish(*queue_for_publish(msg_options)) }
  handles.each(&:wait)
end
subscribe_messages_impl(options, &block) click to toggle source
# File lib/manageiq/messaging/kafka/queue.rb, line 19
def subscribe_messages_impl(options, &block)
  topic = address(options)
  options[:persist_ref] = GROUP_FOR_QUEUE_MESSAGES + topic

  queue_consumer = consumer(true, options)
  queue_consumer.subscribe(topic)
  queue_consumer.each do |message|
    process_queue_message(queue_consumer, topic, message, &block)
  end
end