class Phobos::Producer::ClassMethods::PublicAPI

Constants

ASYNC_PRODUCER_PARAMS
INTERNAL_PRODUCER_PARAMS
NAMESPACE

Public Instance Methods

async_configs() click to toggle source
# File lib/phobos/producer.rb, line 143
def async_configs
  Phobos.config.producer_hash
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end
async_producer() click to toggle source
# File lib/phobos/producer.rb, line 116
def async_producer
  producer_store[:async_producer]
end
async_producer_shutdown() click to toggle source
# File lib/phobos/producer.rb, line 131
def async_producer_shutdown
  async_producer&.deliver_messages
  async_producer&.shutdown
  producer_store[:async_producer] = nil
end
async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) click to toggle source
# File lib/phobos/producer.rb, line 120
def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
  async_publish_list([{ topic: topic, payload: payload, key: key,
                        partition_key: partition_key, headers: headers }])
end
async_publish_list(messages) click to toggle source
# File lib/phobos/producer.rb, line 125
def async_publish_list(messages)
  producer = async_producer || create_async_producer
  produce_messages(producer, messages)
  producer.deliver_messages unless async_automatic_delivery?
end
configure_kafka_client(kafka_client) click to toggle source

This method configures the kafka client used with publish operations performed by the host class

@param kafka_client [Kafka::Client]

# File lib/phobos/producer.rb, line 70
def configure_kafka_client(kafka_client)
  async_producer_shutdown
  producer_store[:kafka_client] = kafka_client
end
create_async_producer() click to toggle source
# File lib/phobos/producer.rb, line 110
def create_async_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer))
  async_producer = client.async_producer(**async_configs)
  producer_store[:async_producer] = async_producer
end
create_sync_producer() click to toggle source
# File lib/phobos/producer.rb, line 79
def create_sync_producer
  client = kafka_client || configure_kafka_client(Phobos.create_kafka_client(:producer))
  sync_producer = client.producer(**regular_configs)
  if Phobos.config.producer_hash[:persistent_connections]
    producer_store[:sync_producer] = sync_producer
  end
  sync_producer
end
kafka_client() click to toggle source
# File lib/phobos/producer.rb, line 75
def kafka_client
  producer_store[:kafka_client]
end
publish(topic:, payload:, key: nil, partition_key: nil, headers: nil) click to toggle source
# File lib/phobos/producer.rb, line 97
def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
  publish_list([{ topic: topic, payload: payload, key: key,
                  partition_key: partition_key, headers: headers }])
end
publish_list(messages) click to toggle source
# File lib/phobos/producer.rb, line 102
def publish_list(messages)
  producer = sync_producer || create_sync_producer
  produce_messages(producer, messages)
  producer.deliver_messages
ensure
  producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
end
regular_configs() click to toggle source
# File lib/phobos/producer.rb, line 137
def regular_configs
  Phobos.config.producer_hash
        .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
        .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end
sync_producer() click to toggle source
# File lib/phobos/producer.rb, line 88
def sync_producer
  producer_store[:sync_producer]
end
sync_producer_shutdown() click to toggle source
# File lib/phobos/producer.rb, line 92
def sync_producer_shutdown
  sync_producer&.shutdown
  producer_store[:sync_producer] = nil
end

Private Instance Methods

async_automatic_delivery?() click to toggle source
# File lib/phobos/producer.rb, line 160
def async_automatic_delivery?
  async_configs.fetch(:delivery_threshold, 0).positive? ||
    async_configs.fetch(:delivery_interval, 0).positive?
end
produce_messages(producer, messages) click to toggle source
# File lib/phobos/producer.rb, line 150
def produce_messages(producer, messages)
  messages.each do |message|
    partition_key = message[:partition_key] || message[:key]
    producer.produce(message[:payload], topic: message[:topic],
                                        key: message[:key],
                                        headers: message[:headers],
                                        partition_key: partition_key)
  end
end
producer_store() click to toggle source
# File lib/phobos/producer.rb, line 165
def producer_store
  Thread.current[NAMESPACE] ||= {}
end