module ManageIQ::Messaging::Kafka::Topic

Constants

GROUP_FOR_ADHOC_LISTENERS

Private Instance Methods

publish_topic_impl(messages) click to toggle source
# File lib/manageiq/messaging/kafka/topic.rb, line 11
def publish_topic_impl(messages)
  handles = messages.collect { |message| raw_publish(*topic_for_publish(message)) }
  handles.each(&:wait)
end
subscribe_topic_impl(options, &block) click to toggle source
# File lib/manageiq/messaging/kafka/topic.rb, line 16
def subscribe_topic_impl(options, &block)
  topic = address(options)

  options[:persist_ref] = "#{GROUP_FOR_ADHOC_LISTENERS}_#{Time.now.to_i}" unless options[:persist_ref]
  topic_consumer = consumer(false, options)
  topic_consumer.subscribe(topic)
  topic_consumer.each do |message|
    process_topic_message(topic_consumer, topic, message, &block)
  end
end