class ManageIQ::Messaging::Kafka::Client

Messaging client implementation with Kafka being the underlying supporting system. Do not directly instantiate an instance from this class. Use ManageIQ::Messaging::Client.open method.

Kafka specific connection options accepted by open method:

For additional security options, please refer to github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka and github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka

Kafka specific publish_message options:

Kafka specific subscribe_topic options:

Kafka specific subscribe_messages options:

Without :persist_ref every topic subscriber receives a copy of each message only when they are active. If multiple topic subscribers join with the same :persist_ref, each message is consumed by only one of the subscribers. This allows a load balancing among the subscribers. Also any messages sent when all members of the :persist_ref group are offline will be persisted and delivered when any member in the group is back online. Each message is still copied and delivered to other subscribers that belongs to other :persist_ref groups or no group.

subscribe_background_job is currently not implemented.

Attributes

encoding[RW]
kafka_client[R]

Public Class Methods

new(options) click to toggle source
# File lib/manageiq/messaging/kafka/client.rb, line 78
def initialize(options)
  @encoding = options[:encoding] || 'yaml'
  require "json" if @encoding == "json"

  ::Rdkafka::Config.logger = logger
  @kafka_client = ::Rdkafka::Config.new(rdkafka_connection_opts(options))
end

Public Instance Methods

ack(ack_ref) click to toggle source
# File lib/manageiq/messaging/kafka/client.rb, line 53
def ack(ack_ref)
  ack_ref.commit
rescue Rdkafka::RdkafkaError => e
  logger.warn("ack failed with error #{e.message}")
  raise unless e.message =~ /no_offset/
end
close() click to toggle source
# File lib/manageiq/messaging/kafka/client.rb, line 60
def close
  @producer&.close
  @producer = nil

  @consumer&.close
  @consumer = nil
end
topics() click to toggle source

list all topics

# File lib/manageiq/messaging/kafka/client.rb, line 69
def topics
  native_kafka = producer.instance_variable_get(:@native_kafka)
  Rdkafka::Metadata.new(native_kafka).topics.collect { |topic| topic[:topic_name] }
end

Private Instance Methods

rdkafka_connection_opts(options) click to toggle source
# File lib/manageiq/messaging/kafka/client.rb, line 86
def rdkafka_connection_opts(options)
  hosts = Array(options[:hosts] || options[:host])
  hosts.collect! { |host| "#{host}:#{options[:port]}" }

  result = {:"bootstrap.servers" => hosts.join(',')}
  result[:"client.id"] = options[:client_ref] if options[:client_ref]
  result[:"sasl.username"] = options[:username] if options[:username]
  result[:"sasl.password"] = options[:password] if options[:password]

  result.merge(options.except(:port, :host, :hosts, :encoding, :protocol, :client_ref, :username, :password))
end