class DeliveryBoy::Instance

This class implements the actual logic of DeliveryBoy. The DeliveryBoy module has a module-level singleton instance.

Attributes

config[R]
logger[R]

Public Class Methods

new(config, logger) click to toggle source
# File lib/delivery_boy/instance.rb, line 6
def initialize(config, logger)
  @config = config
  @logger = logger
  @async_producer = nil
end

Public Instance Methods

buffer_size() click to toggle source
# File lib/delivery_boy/instance.rb, line 43
def buffer_size
  sync_producer.buffer_size
end
clear_buffer() click to toggle source
# File lib/delivery_boy/instance.rb, line 39
def clear_buffer
  sync_producer.clear_buffer
end
deliver(value, topic:, **options) click to toggle source
# File lib/delivery_boy/instance.rb, line 12
def deliver(value, topic:, **options)
  sync_producer.produce(value, topic: topic, **options)
  sync_producer.deliver_messages
rescue
  # Make sure to clear any buffered messages if there's an error.
  clear_buffer

  raise
end
deliver_async!(value, topic:, **options) click to toggle source
# File lib/delivery_boy/instance.rb, line 22
def deliver_async!(value, topic:, **options)
  async_producer.produce(value, topic: topic, **options)
end
deliver_messages() click to toggle source
# File lib/delivery_boy/instance.rb, line 35
def deliver_messages
  sync_producer.deliver_messages
end
produce(value, topic:, **options) click to toggle source
# File lib/delivery_boy/instance.rb, line 31
def produce(value, topic:, **options)
  sync_producer.produce(value, topic: topic, **options)
end
shutdown() click to toggle source
# File lib/delivery_boy/instance.rb, line 26
def shutdown
  sync_producer.shutdown if sync_producer?
  async_producer.shutdown if async_producer?
end

Private Instance Methods

async_producer() click to toggle source
# File lib/delivery_boy/instance.rb, line 61
def async_producer
  # The async producer doesn't have to be per-thread, since all deliveries are
  # performed by a single background thread.
  @async_producer ||= kafka.async_producer(
    max_queue_size: config.max_queue_size,
    delivery_threshold: config.delivery_threshold,
    delivery_interval: config.delivery_interval,
    **producer_options,
  )
end
async_producer?() click to toggle source
# File lib/delivery_boy/instance.rb, line 72
def async_producer?
  !@async_producer.nil?
end
kafka() click to toggle source
# File lib/delivery_boy/instance.rb, line 76
def kafka
  @kafka ||= Kafka.new(
    seed_brokers: config.brokers,
    client_id: config.client_id,
    logger: logger,
    connect_timeout: config.connect_timeout,
    socket_timeout: config.socket_timeout,
    ssl_ca_cert: config.ssl_ca_cert,
    ssl_ca_cert_file_path: config.ssl_ca_cert_file_path,
    ssl_client_cert: config.ssl_client_cert,
    ssl_client_cert_key: config.ssl_client_cert_key,
    ssl_client_cert_key_password: config.ssl_client_cert_key_password,
    ssl_ca_certs_from_system: config.ssl_ca_certs_from_system,
    ssl_verify_hostname: config.ssl_verify_hostname,
    sasl_gssapi_principal: config.sasl_gssapi_principal,
    sasl_gssapi_keytab: config.sasl_gssapi_keytab,
    sasl_plain_authzid: config.sasl_plain_authzid,
    sasl_plain_username: config.sasl_plain_username,
    sasl_plain_password: config.sasl_plain_password,
    sasl_scram_username: config.sasl_scram_username,
    sasl_scram_password: config.sasl_scram_password,
    sasl_scram_mechanism: config.sasl_scram_mechanism,
    sasl_over_ssl: config.sasl_over_ssl,
    sasl_oauth_token_provider: config.sasl_oauth_token_provider
  )
end
producer_options() click to toggle source

Options for both the sync and async producers.

# File lib/delivery_boy/instance.rb, line 104
def producer_options
  {
    required_acks: config.required_acks,
    ack_timeout: config.ack_timeout,
    max_retries: config.max_retries,
    retry_backoff: config.retry_backoff,
    max_buffer_size: config.max_buffer_size,
    max_buffer_bytesize: config.max_buffer_bytesize,
    compression_codec: (config.compression_codec.to_sym if config.compression_codec),
    compression_threshold: config.compression_threshold,
    idempotent: config.idempotent,
    transactional: config.transactional,
    transactional_timeout: config.transactional_timeout,
  }
end
sync_producer() click to toggle source
# File lib/delivery_boy/instance.rb, line 51
def sync_producer
  # We want synchronous producers to be per-thread in order to avoid problems with
  # concurrent deliveries.
  Thread.current[:delivery_boy_sync_producer] ||= kafka.producer(**producer_options)
end
sync_producer?() click to toggle source
# File lib/delivery_boy/instance.rb, line 57
def sync_producer?
  Thread.current.key?(:delivery_boy_sync_producer)
end