class DeliveryBoy::Instance
This class implements the actual logic of DeliveryBoy
. The DeliveryBoy
module has a module-level singleton instance.
Attributes
config[R]
logger[R]
Public Class Methods
new(config, logger)
click to toggle source
# File lib/delivery_boy/instance.rb, line 6 def initialize(config, logger) @config = config @logger = logger @async_producer = nil end
Public Instance Methods
buffer_size()
click to toggle source
# File lib/delivery_boy/instance.rb, line 43 def buffer_size sync_producer.buffer_size end
clear_buffer()
click to toggle source
# File lib/delivery_boy/instance.rb, line 39 def clear_buffer sync_producer.clear_buffer end
deliver(value, topic:, **options)
click to toggle source
# File lib/delivery_boy/instance.rb, line 12 def deliver(value, topic:, **options) sync_producer.produce(value, topic: topic, **options) sync_producer.deliver_messages rescue # Make sure to clear any buffered messages if there's an error. clear_buffer raise end
deliver_async!(value, topic:, **options)
click to toggle source
# File lib/delivery_boy/instance.rb, line 22 def deliver_async!(value, topic:, **options) async_producer.produce(value, topic: topic, **options) end
deliver_messages()
click to toggle source
# File lib/delivery_boy/instance.rb, line 35 def deliver_messages sync_producer.deliver_messages end
produce(value, topic:, **options)
click to toggle source
# File lib/delivery_boy/instance.rb, line 31 def produce(value, topic:, **options) sync_producer.produce(value, topic: topic, **options) end
shutdown()
click to toggle source
# File lib/delivery_boy/instance.rb, line 26 def shutdown sync_producer.shutdown if sync_producer? async_producer.shutdown if async_producer? end
Private Instance Methods
async_producer()
click to toggle source
# File lib/delivery_boy/instance.rb, line 61 def async_producer # The async producer doesn't have to be per-thread, since all deliveries are # performed by a single background thread. @async_producer ||= kafka.async_producer( max_queue_size: config.max_queue_size, delivery_threshold: config.delivery_threshold, delivery_interval: config.delivery_interval, **producer_options, ) end
async_producer?()
click to toggle source
# File lib/delivery_boy/instance.rb, line 72 def async_producer? !@async_producer.nil? end
kafka()
click to toggle source
# File lib/delivery_boy/instance.rb, line 76 def kafka @kafka ||= Kafka.new( seed_brokers: config.brokers, client_id: config.client_id, logger: logger, connect_timeout: config.connect_timeout, socket_timeout: config.socket_timeout, ssl_ca_cert: config.ssl_ca_cert, ssl_ca_cert_file_path: config.ssl_ca_cert_file_path, ssl_client_cert: config.ssl_client_cert, ssl_client_cert_key: config.ssl_client_cert_key, ssl_client_cert_key_password: config.ssl_client_cert_key_password, ssl_ca_certs_from_system: config.ssl_ca_certs_from_system, ssl_verify_hostname: config.ssl_verify_hostname, sasl_gssapi_principal: config.sasl_gssapi_principal, sasl_gssapi_keytab: config.sasl_gssapi_keytab, sasl_plain_authzid: config.sasl_plain_authzid, sasl_plain_username: config.sasl_plain_username, sasl_plain_password: config.sasl_plain_password, sasl_scram_username: config.sasl_scram_username, sasl_scram_password: config.sasl_scram_password, sasl_scram_mechanism: config.sasl_scram_mechanism, sasl_over_ssl: config.sasl_over_ssl, sasl_oauth_token_provider: config.sasl_oauth_token_provider ) end
producer_options()
click to toggle source
Options for both the sync and async producers.
# File lib/delivery_boy/instance.rb, line 104 def producer_options { required_acks: config.required_acks, ack_timeout: config.ack_timeout, max_retries: config.max_retries, retry_backoff: config.retry_backoff, max_buffer_size: config.max_buffer_size, max_buffer_bytesize: config.max_buffer_bytesize, compression_codec: (config.compression_codec.to_sym if config.compression_codec), compression_threshold: config.compression_threshold, idempotent: config.idempotent, transactional: config.transactional, transactional_timeout: config.transactional_timeout, } end
sync_producer()
click to toggle source
# File lib/delivery_boy/instance.rb, line 51 def sync_producer # We want synchronous producers to be per-thread in order to avoid problems with # concurrent deliveries. Thread.current[:delivery_boy_sync_producer] ||= kafka.producer(**producer_options) end
sync_producer?()
click to toggle source
# File lib/delivery_boy/instance.rb, line 57 def sync_producer? Thread.current.key?(:delivery_boy_sync_producer) end