module ManageIQ::Messaging::Kafka::Topic
Constants
- GROUP_FOR_ADHOC_LISTENERS
Private Instance Methods
publish_topic_impl(messages)
click to toggle source
# File lib/manageiq/messaging/kafka/topic.rb, line 11 def publish_topic_impl(messages) handles = messages.collect { |message| raw_publish(*topic_for_publish(message)) } handles.each(&:wait) end
subscribe_topic_impl(options, &block)
click to toggle source
# File lib/manageiq/messaging/kafka/topic.rb, line 16 def subscribe_topic_impl(options, &block) topic = address(options) options[:persist_ref] = "#{GROUP_FOR_ADHOC_LISTENERS}_#{Time.now.to_i}" unless options[:persist_ref] topic_consumer = consumer(false, options) topic_consumer.subscribe(topic) topic_consumer.each do |message| process_topic_message(topic_consumer, topic, message, &block) end end