class ActionMailerKafka::BaseProducer
Constants
- BUFFER_SIZE
- DELIVERY_INTERVAL
- MAX_RETRIES
- RETRY_BACKOFF
Public Class Methods
new( kafka_client_info:, transactional_id: Socket.gethostname, logger: nil )
click to toggle source
# File lib/action_mailer_kafka/base_producer.rb, line 8 def initialize( kafka_client_info:, transactional_id: Socket.gethostname, logger: nil ) @logger = logger kafka_client = ::Kafka.new(kafka_client_info) @kafka_async_producer = kafka_client.async_producer( delivery_threshold: BUFFER_SIZE, delivery_interval: DELIVERY_INTERVAL, max_retries: MAX_RETRIES, retry_backoff: RETRY_BACKOFF, idempotent: true, required_acks: :all, transactional_id: transactional_id ) end
Public Instance Methods
publish(data, message_key, topic)
click to toggle source
# File lib/action_mailer_kafka/base_producer.rb, line 26 def publish(data, message_key, topic) @kafka_async_producer.produce(data, key: message_key, topic: topic) @kafka_async_producer.deliver_messages rescue Kafka::DeliveryFailed => e @logger&.error("Fail to deliver some kafka messages: #{e}") end