class LogStashLogger::Device::Kafka

Constants

DEFAULT_BACKOFF
DEFAULT_HOST
DEFAULT_PORT
DEFAULT_PRODUCER
DEFAULT_TOPIC

Attributes

backoff[RW]
hosts[RW]
producer[RW]
topic[RW]

Public Class Methods

new(opts) click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 15
def initialize(opts)
  super
  host = opts[:host] || DEFAULT_HOST
  port = opts[:port] || DEFAULT_PORT
  @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" }
  @topic = opts[:path] || DEFAULT_TOPIC
  @producer = opts[:producer] || DEFAULT_PRODUCER
  @backoff = opts[:backoff] || DEFAULT_BACKOFF
  @buffer_group = @topic
end

Public Instance Methods

connect() click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 26
def connect
  @io = ::Poseidon::Producer.new(@hosts, @producer)
end
with_connection() { || ... } click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 30
def with_connection
  connect unless connected?
  yield
rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e
  log_error(e)
  log_warning("reconnect/retry")
  sleep backoff if backoff
  reconnect
  retry
rescue => e
  log_error(e)
  log_warning("giving up")
  close(flush: false)
end
write_batch(messages, topic = nil) click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 45
def write_batch(messages, topic = nil)
  topic ||= @topic
  with_connection do
    @io.send_messages messages.map { |message| Poseidon::MessageToSend.new(topic, message) }
  end
end
write_one(message, topic = nil) click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 52
def write_one(message, topic = nil)
  write_batch([message], topic)
end