class LogStashLogger::Device::Kafka
Constants
- DEFAULT_BACKOFF
- DEFAULT_HOST
- DEFAULT_PORT
- DEFAULT_PRODUCER
- DEFAULT_TOPIC
Attributes
backoff[RW]
hosts[RW]
producer[RW]
topic[RW]
Public Class Methods
new(opts)
click to toggle source
Calls superclass method
LogStashLogger::Device::Connectable::new
# File lib/logstash-logger/device/kafka.rb, line 15 def initialize(opts) super host = opts[:host] || DEFAULT_HOST port = opts[:port] || DEFAULT_PORT @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" } @topic = opts[:path] || DEFAULT_TOPIC @producer = opts[:producer] || DEFAULT_PRODUCER @backoff = opts[:backoff] || DEFAULT_BACKOFF @buffer_group = @topic end
Public Instance Methods
connect()
click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 26 def connect @io = ::Poseidon::Producer.new(@hosts, @producer) end
with_connection() { || ... }
click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 30 def with_connection connect unless connected? yield rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e log_error(e) log_warning("reconnect/retry") sleep backoff if backoff reconnect retry rescue => e log_error(e) log_warning("giving up") close(flush: false) end
write_batch(messages, topic = nil)
click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 45 def write_batch(messages, topic = nil) topic ||= @topic with_connection do @io.send_messages messages.map { |message| Poseidon::MessageToSend.new(topic, message) } end end
write_one(message, topic = nil)
click to toggle source
# File lib/logstash-logger/device/kafka.rb, line 52 def write_one(message, topic = nil) write_batch([message], topic) end