module ManageIQ::Messaging::Kafka::Common

Private Instance Methods

address(options) click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 57
def address(options)
  if options[:affinity]
    "#{options[:service]}.#{options[:affinity]}"
  else
    options[:service]
  end
end
consumer(beginning, options) click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 14
def consumer(beginning, options)
  @consumer&.close
  kafka_client[:"group.id"] = options[:persist_ref]
  kafka_client[:"auto.offset.reset"] = beginning ? 'smallest' : 'largest'
  kafka_client[:"enable.auto.commit"] = !!auto_ack?(options)
  kafka_client[:"session.timeout.ms"] = options[:session_timeout] * 1000 if options[:session_timeout].present?
  kafka_client[:"group.instance.id"] = options[:group_instance_id] if options[:group_instance_id].present?
  @consumer = kafka_client.consumer
end
event_header_keys() click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 106
def event_header_keys
  [:sender, :event_type]
end
for_publish(options) click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 46
def for_publish(options)
  kafka_opts = {:topic => address(options)}
  kafka_opts[:partition_key] = options[:group_name] if options[:group_name]
  kafka_opts[:headers] = {}
  kafka_opts[:headers][:sender] = options[:sender] if options[:sender]

  body = options[:payload] || ''

  [body, kafka_opts]
end
message_header_keys() click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 97
def message_header_keys
  [:sender, :message_type, :class_name]
end
parse_event_headers(headers) click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 110
def parse_event_headers(headers)
  return [nil, nil] unless headers.kind_of?(Hash)
  headers.values_at(*event_header_keys)
end
parse_message_headers(headers) click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 101
def parse_message_headers(headers)
  return [nil, nil, nil] unless headers.kind_of?(Hash)
  headers.values_at(*message_header_keys)
end
process_queue_message(queue_consumer, queue, message) { |received_message| ... } click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 65
def process_queue_message(queue_consumer, queue, message)
  begin
    payload = decode_body(message.headers, message.payload)
    sender, message_type, _class_name = parse_message_headers(message.headers)
    client_headers = message.headers.except(*message_header_keys).with_indifferent_access

    logger.info("Message received: queue(#{queue}), message(#{payload_log(payload)}), sender(#{sender}), type(#{message_type})")
    yield [ManageIQ::Messaging::ReceivedMessage.new(sender, message_type, payload, client_headers, queue_consumer, self)]
    logger.info("Messsage processed")
  rescue StandardError => e
    logger.error("Message processing error: #{e.message}")
    logger.error(e.backtrace.join("\n"))
    raise
  end
end
process_topic_message(topic_consumer, topic, message) { |received_message| ... } click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 81
def process_topic_message(topic_consumer, topic, message)
  begin
    payload = decode_body(message.headers, message.payload)
    sender, event_type = parse_event_headers(message.headers)
    client_headers = message.headers.except(*event_header_keys).with_indifferent_access

    logger.info("Event received: topic(#{topic}), event(#{payload_log(payload)}), sender(#{sender}), type(#{event_type})")
    yield ManageIQ::Messaging::ReceivedMessage.new(sender, event_type, payload, client_headers, topic_consumer, self)
    logger.info("Event processed")
  rescue StandardError => e
    logger.error("Event processing error: #{e.message}")
    logger.error(e.backtrace.join("\n"))
    raise
  end
end
producer() click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 10
def producer
  @producer ||= kafka_client.producer
end
queue_for_publish(options) click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 29
def queue_for_publish(options)
  body, kafka_opts = for_publish(options)
  kafka_opts[:headers][:message_type] = options[:message] if options[:message]
  kafka_opts[:headers][:class_name] = options[:class_name] if options[:class_name]
  kafka_opts[:headers].merge!(options[:headers].except(*message_header_keys)) if options.key?(:headers)

  [body, kafka_opts]
end
raw_publish(body, options) click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 24
def raw_publish(body, options)
  options[:payload] = encode_body(options[:headers], body)
  producer.produce(options)
end
topic_for_publish(options) click to toggle source
# File lib/manageiq/messaging/kafka/common.rb, line 38
def topic_for_publish(options)
  body, kafka_opts = for_publish(options)
  kafka_opts[:headers][:event_type] = options[:event] if options[:event]
  kafka_opts[:headers].merge!(options[:headers].except(*event_header_keys)) if options.key?(:headers)

  [body, kafka_opts]
end