module DeliveryBoy
Constants
- ConfigError
- VERSION
Attributes
Public Class Methods
Return the number of messages in the buffer
# File lib/delivery_boy.rb, line 90 def buffer_size instance.buffer_size end
Clear any buffered messages generated by {.produce} or {.produce!} methods.
# File lib/delivery_boy.rb, line 85 def clear_buffer instance.clear_buffer end
The configuration used by DeliveryBoy
.
@return [DeliveryBoy::Config]
# File lib/delivery_boy.rb, line 119 def config @config ||= DeliveryBoy::Config.new(env: ENV) rescue KingKonf::ConfigError => e raise ConfigError, e.message end
Configure DeliveryBoy
in a block.
DeliveryBoy.configure do |config| config.client_id = "yolo" end
@yield [DeliveryBoy::Config] @return [nil]
# File lib/delivery_boy.rb, line 133 def configure yield config end
Write a message to a specified Kafka topic synchronously.
Keep in mind that the client will block until the message has been delivered.
@param value [String] the message value. @param topic [String] the topic that the message should be written to. @param key [String, nil] the message key. @param partition [Integer, nil] the topic partition that the message should
be written to.
@param partition_key [String, nil] a key used to deterministically assign
a partition to the message.
@return [nil] @raise [Kafka::BufferOverflow] if the producer's buffer is full. @raise [Kafka::DeliveryFailed] if delivery failed for some reason.
# File lib/delivery_boy.rb, line 28 def deliver(value, topic:, **options) instance.deliver(value, topic: topic, **options) end
Like {.deliver_async!}, but handles Kafka::BufferOverflow
errors by logging them and just going on with normal business.
@return [nil]
# File lib/delivery_boy.rb, line 36 def deliver_async(value, topic:, **options) deliver_async!(value, topic: topic, **options) rescue Kafka::BufferOverflow logger.error "Message for `#{topic}` dropped due to buffer overflow" end
Like {.deliver}, but returns immediately.
The actual delivery takes place in a background thread.
@return [nil]
# File lib/delivery_boy.rb, line 47 def deliver_async!(value, topic:, **options) instance.deliver_async!(value, topic: topic, **options) end
Delivers the items currently in the producer buffer.
@return [nil] @raise [Kafka::DeliveryFailed] if delivery failed for some reason.
# File lib/delivery_boy.rb, line 80 def deliver_messages instance.deliver_messages end
The logger used by DeliveryBoy
.
@return [Logger]
# File lib/delivery_boy.rb, line 106 def logger @logger ||= Logger.new($stdout).tap do |logger| if config.log_level logger.level = Object.const_get("Logger::#{config.log_level.upcase}") end end end
Like {.produce!}, but handles Kafka::BufferOverflow
errors by logging them and just going on with normal business.
@return [nil]
# File lib/delivery_boy.rb, line 55 def produce(value, topic:, **options) produce!(value, topic: topic, **options) rescue Kafka::BufferOverflow logger.error "Message for `#{topic}` dropped due to buffer overflow" end
Appends the given message to the producer buffer but does not send it until {.deliver_messages} is called.
@param value [String] the message value. @param topic [String] the topic that the message should be written to. @param key [String, nil] the message key. @param partition [Integer, nil] the topic partition that the message should
be written to.
@param partition_key [String, nil] a key used to deterministically assign
a partition to the message.
@return [nil] @raise [Kafka::BufferOverflow] if the producer's buffer is full.
# File lib/delivery_boy.rb, line 72 def produce!(value, topic:, **options) instance.produce(value, topic: topic, **options) end
Shut down DeliveryBoy
.
Automatically called when the process exits.
@return [nil]
# File lib/delivery_boy.rb, line 99 def shutdown instance.shutdown end
# File lib/delivery_boy.rb, line 137 def test_mode! @instance = testing end
# File lib/delivery_boy.rb, line 141 def testing @testing ||= Fake.new end
Private Class Methods
# File lib/delivery_boy.rb, line 147 def instance @instance ||= Instance.new(config, logger) end