class LogStash::Inputs::Lumberjack
Receive events using the Lumberjack
protocol.
This input can be used to reliably and securely transport events between Logstash instances. To do so, use the <<plugins-outputs-lumberjack,lumberjack output plugin>> in the sending Logstash instance(s).
It can also be used to receive events from the deprecated github.com/elastic/logstash-forwarder[logstash-forwarder] tool that has been replaced by github.com/elastic/beats/tree/master/filebeat[Filebeat].
Constants
- BUFFERED_QUEUE_SIZE
TODO(sissel): Add CA to authenticate clients with.
- RECONNECT_BACKOFF_SLEEP
Public Instance Methods
I have created this method to make testing a lot easier, mocking multiples levels of block is unfriendly especially with connection based block.
# File lib/logstash/inputs/lumberjack.rb, line 110 def create_event(fields, &block) line = fields.delete("line") @codec.decode(line, identity(fields)) do |event| decorate(event) fields.each do |k,v| v.force_encoding(Encoding::UTF_8) event.set(k,v) end block.call(event) end end
# File lib/logstash/inputs/lumberjack.rb, line 47 def register require "lumberjack/server" require "concurrent" require "logstash/circuit_breaker" require "logstash/sized_queue_timeout" @logger.info("Starting lumberjack input listener", :address => "#{@host}:#{@port}") @lumberjack = Lumberjack::Server.new(:address => @host, :port => @port, :ssl_certificate => @ssl_certificate, :ssl_key => @ssl_key, :ssl_key_passphrase => @ssl_key_passphrase) # Create a reusable threadpool, we do not limit the number of connections # to the input, the circuit breaker with the timeout should take care # of `blocked` threads and prevent logstash to go oom. @threadpool = Concurrent::CachedThreadPool.new(:idletime => 15) # in 1.5 the main SizeQueue doesnt have the concept of timeout # We are using a small plugin buffer to move events to the internal queue @buffered_queue = LogStash::SizedQueueTimeout.new(BUFFERED_QUEUE_SIZE) @circuit_breaker = LogStash::CircuitBreaker.new("Lumberjack input", :exceptions => [LogStash::SizedQueueTimeout::TimeoutError]) @codec = LogStash::Codecs::IdentityMapCodec.new(@codec) end
# File lib/logstash/inputs/lumberjack.rb, line 73 def run(output_queue) @output_queue = output_queue start_buffer_broker @codec.eviction_block(method(:flush_event)) # Accepting new events coming from LSF while !stop? do # Wrapping the accept call into a CircuitBreaker if @circuit_breaker.closed? connection = @lumberjack.accept # call that creates a new connection next if connection.nil? # if the connection is nil the connection was close. invoke(connection) do |event| if stop? connection.close break end @circuit_breaker.execute { @buffered_queue.push(event, @congestion_threshold) } end else @logger.warn("Lumberjack input: the pipeline is blocked, temporary refusing new connection.") sleep(RECONNECT_BACKOFF_SLEEP) end end end
# File lib/logstash/inputs/lumberjack.rb, line 101 def stop @lumberjack.close @codec.flush { |event| flush_event(event) } end
Private Instance Methods
There is a problem with the way the codecs work for this specific input, when the data is decoded there is no way to attach metadata with the decoded line. If you look at the block used by `@codec.decode` it reference the fields variable which is available when the proc is created, the problem is that variable with the data is not available at eviction time or when we force a flush on the codec before shutting down the input.
Not defining the method will make logstash lose data, so Its still better to force a flush
See this issue github.com/elastic/logstash/issues/4289 for more discussion
# File lib/logstash/inputs/lumberjack.rb, line 139 def flush_event(event) decorate(event) @output_queue << event end
It use the host and the file as the differentiator, if anything is provided it should fallback to an empty string.
# File lib/logstash/inputs/lumberjack.rb, line 126 def identity(fields) [fields["host"], fields["file"]].compact.join("-") end
# File lib/logstash/inputs/lumberjack.rb, line 145 def invoke(connection, &block) @threadpool.post do begin # If any errors occur in from the events the connection should be closed in the # library ensure block and the exception will be handled here connection.run do |fields| create_event(fields, &block) end # When too many errors happen inside the circuit breaker it will throw # this exception and start refusing connection. The bubbling of theses # exceptions make sure that the lumberjack library will close the current # connection which will force the client to reconnect and restransmit # his payload. rescue LogStash::CircuitBreaker::OpenBreaker, LogStash::CircuitBreaker::HalfOpenBreaker => e logger.warn("Lumberjack input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.", :exception => e.class) rescue => e # If we have a malformed packet we should handle that so the input doesn't crash completely. @logger.error("Lumberjack input: unhandled exception", :exception => e, :backtrace => e.backtrace) end end end
# File lib/logstash/inputs/lumberjack.rb, line 168 def start_buffer_broker @threadpool.post do while !stop? @output_queue << @buffered_queue.pop_no_timeout end end end