module DeliveryBoy

Constants

ConfigError
VERSION

Attributes

logger[W]

Public Class Methods

buffer_size() click to toggle source

Return the number of messages in the buffer

# File lib/delivery_boy.rb, line 90
def buffer_size
  instance.buffer_size
end
clear_buffer() click to toggle source

Clear any buffered messages generated by {.produce} or {.produce!} methods.

# File lib/delivery_boy.rb, line 85
def clear_buffer
  instance.clear_buffer
end
config() click to toggle source

The configuration used by DeliveryBoy.

@return [DeliveryBoy::Config]

# File lib/delivery_boy.rb, line 119
def config
  @config ||= DeliveryBoy::Config.new(env: ENV)
rescue KingKonf::ConfigError => e
  raise ConfigError, e.message
end
configure() { |config| ... } click to toggle source

Configure DeliveryBoy in a block.

DeliveryBoy.configure do |config|
  config.client_id = "yolo"
end

@yield [DeliveryBoy::Config] @return [nil]

# File lib/delivery_boy.rb, line 133
def configure
  yield config
end
deliver(value, topic:, **options) click to toggle source

Write a message to a specified Kafka topic synchronously.

Keep in mind that the client will block until the message has been delivered.

@param value [String] the message value. @param topic [String] the topic that the message should be written to. @param key [String, nil] the message key. @param partition [Integer, nil] the topic partition that the message should

be written to.

@param partition_key [String, nil] a key used to deterministically assign

a partition to the message.

@return [nil] @raise [Kafka::BufferOverflow] if the producer's buffer is full. @raise [Kafka::DeliveryFailed] if delivery failed for some reason.

# File lib/delivery_boy.rb, line 28
def deliver(value, topic:, **options)
  instance.deliver(value, topic: topic, **options)
end
deliver_async(value, topic:, **options) click to toggle source

Like {.deliver_async!}, but handles Kafka::BufferOverflow errors by logging them and just going on with normal business.

@return [nil]

# File lib/delivery_boy.rb, line 36
def deliver_async(value, topic:, **options)
  deliver_async!(value, topic: topic, **options)
rescue Kafka::BufferOverflow
  logger.error "Message for `#{topic}` dropped due to buffer overflow"
end
deliver_async!(value, topic:, **options) click to toggle source

Like {.deliver}, but returns immediately.

The actual delivery takes place in a background thread.

@return [nil]

# File lib/delivery_boy.rb, line 47
def deliver_async!(value, topic:, **options)
  instance.deliver_async!(value, topic: topic, **options)
end
deliver_messages() click to toggle source

Delivers the items currently in the producer buffer.

@return [nil] @raise [Kafka::DeliveryFailed] if delivery failed for some reason.

# File lib/delivery_boy.rb, line 80
def deliver_messages
  instance.deliver_messages
end
logger() click to toggle source

The logger used by DeliveryBoy.

@return [Logger]

# File lib/delivery_boy.rb, line 106
def logger
  @logger ||= Logger.new($stdout).tap do |logger|
    if config.log_level
      logger.level = Object.const_get("Logger::#{config.log_level.upcase}")
    end
  end
end
produce(value, topic:, **options) click to toggle source

Like {.produce!}, but handles Kafka::BufferOverflow errors by logging them and just going on with normal business.

@return [nil]

# File lib/delivery_boy.rb, line 55
def produce(value, topic:, **options)
  produce!(value, topic: topic, **options)
rescue Kafka::BufferOverflow
  logger.error "Message for `#{topic}` dropped due to buffer overflow"
end
produce!(value, topic:, **options) click to toggle source

Appends the given message to the producer buffer but does not send it until {.deliver_messages} is called.

@param value [String] the message value. @param topic [String] the topic that the message should be written to. @param key [String, nil] the message key. @param partition [Integer, nil] the topic partition that the message should

be written to.

@param partition_key [String, nil] a key used to deterministically assign

a partition to the message.

@return [nil] @raise [Kafka::BufferOverflow] if the producer's buffer is full.

# File lib/delivery_boy.rb, line 72
def produce!(value, topic:, **options)
  instance.produce(value, topic: topic, **options)
end
shutdown() click to toggle source

Shut down DeliveryBoy.

Automatically called when the process exits.

@return [nil]

# File lib/delivery_boy.rb, line 99
def shutdown
  instance.shutdown
end
test_mode!() click to toggle source
# File lib/delivery_boy.rb, line 137
def test_mode!
  @instance = testing
end
testing() click to toggle source
# File lib/delivery_boy.rb, line 141
def testing
  @testing ||= Fake.new
end

Private Class Methods

instance() click to toggle source
# File lib/delivery_boy.rb, line 147
def instance
  @instance ||= Instance.new(config, logger)
end