class ActionMailerKafka::BaseProducer

Constants

BUFFER_SIZE
DELIVERY_INTERVAL
MAX_RETRIES
RETRY_BACKOFF

Public Class Methods

new( kafka_client_info:, transactional_id: Socket.gethostname, logger: nil ) click to toggle source
# File lib/action_mailer_kafka/base_producer.rb, line 8
def initialize(
  kafka_client_info:,
  transactional_id: Socket.gethostname,
  logger: nil
)
  @logger = logger
  kafka_client = ::Kafka.new(kafka_client_info)
  @kafka_async_producer = kafka_client.async_producer(
    delivery_threshold: BUFFER_SIZE,
    delivery_interval: DELIVERY_INTERVAL,
    max_retries: MAX_RETRIES,
    retry_backoff: RETRY_BACKOFF,
    idempotent: true,
    required_acks: :all,
    transactional_id: transactional_id
  )
end

Public Instance Methods

publish(data, message_key, topic) click to toggle source
# File lib/action_mailer_kafka/base_producer.rb, line 26
def publish(data, message_key, topic)
  @kafka_async_producer.produce(data, key: message_key, topic: topic)
  @kafka_async_producer.deliver_messages
rescue Kafka::DeliveryFailed => e
  @logger&.error("Fail to deliver some kafka messages: #{e}")
end