class LogStash::Inputs::Unix
Read events over a UNIX socket.
Like ‘stdin` and `file` inputs, each event is assumed to be one line of text.
Can either accept connections from clients or connect to a server, depending on ‘mode`.
Public Class Methods
# File lib/logstash/inputs/unix.rb, line 45 def initialize(*params) super @host_name_field = ecs_select[disabled: 'host', v1: '[host][name]'] @file_path_field = ecs_select[disabled: 'path', v1: '[file][path]'] end
Public Instance Methods
# File lib/logstash/inputs/unix.rb, line 53 def register require "socket" if server? @logger.info("Starting unix input listener", :address => @path, :force_unlink => @force_unlink) begin @server_socket = UNIXServer.new(@path) rescue Errno::EADDRINUSE, IOError if @force_unlink File.unlink(@path) begin @server_socket = UNIXServer.new(@path) return rescue Errno::EADDRINUSE, IOError @logger.error("Could not start UNIX server: address in use", :path => @path) raise end end @logger.error("Could not start UNIX server: address in use", :path => @path) raise end else # client if socket_not_present_retry_interval_seconds < 0 @logger.warn("Value #{socket_not_present_retry_interval_seconds} for socket_not_present_retry_interval_seconds is not valid, using default value of 5 instead") @socket_not_present_retry_interval_seconds = 5 end end end
# File lib/logstash/inputs/unix.rb, line 155 def run(output_queue) if server? @client_threads = [] while !stop? # Start a new thread for each connection. @client_threads << Thread.start(@server_socket.accept) do |s| @logger.debug("Accepted connection", :server => @path) handle_socket(s, output_queue) end end else while !stop? if File.socket?(@path) @client_socket = UNIXSocket.new(@path) @client_socket.extend ::LogStash::Util::SocketPeer @logger.debug("Opened connection", :client => @path) handle_socket(@client_socket, output_queue) else @logger.warn("Socket not present, wait for #{socket_not_present_retry_interval_seconds} seconds for socket to appear", :client => @path) sleep socket_not_present_retry_interval_seconds end end end rescue IOError # if stop is called during @server_socket.accept # the thread running `run` will raise an IOError # We catch IOError here and do nothing, just let the method terminate end
# File lib/logstash/inputs/unix.rb, line 185 def stop if server? File.unlink(@path) @server_socket.close unless @server_socket.nil? else @client_socket.close unless @client_socket.nil? end rescue IOError # if socket with @mode == client was closed by the client, an other call to @client_socket.close # will raise an IOError. We catch IOError here and do nothing, just let logstash terminate @logger.warn("Could not close socket while Logstash is shutting down. Socket already closed by the other party?", :path => @path) end
Private Instance Methods
# File lib/logstash/inputs/unix.rb, line 83 def handle_socket(socket, output_queue) begin hostname = Socket.gethostname while !stop? data = io_interruptable_readpartial(socket, 16384, @data_timeout) if data == :data_timeout # socket not ready after @data_timeout seconds @logger.info("Closing connection after read timeout", :path => @path) return elsif data == :stopping @logger.trace("Shutdown in progress", :path => @path) next # let next loop handle graceful stop end @codec.decode(data) do |event| decorate(event) event.set(@host_name_field, hostname) unless event.include?(@host_name_field) event.set(@file_path_field, @path) unless event.include?(@file_path_field) output_queue << event end end rescue => e if @logger.debug? @logger.debug("Closing connection", :path => @path, :exception => e, :backtrace => e.backtrace) else @logger.info("Closing connection", :path => @path, :exception => e) end end ensure begin socket.close rescue IOError #pass end end
Emulates ‘IO#readpartial` with a timeout and our plugin’s stop-condition, limiting blocking calls to windows of 10s or less to ensure it can be interrupted.
@param readable_io [IO] the IO to read from @param maxlen [Integer] the max bytes to be read @param timeout [Number] the maximum number of seconds to , or -1 to disable timeouts
@return [:data_timeout] if timeout was reached before bytes were available @return [:stopping] if plugin stop-condition was detected before bytes were available @return [String] a non-empty string if bytes became available before the timeout was reached
# File lib/logstash/inputs/unix.rb, line 131 def io_interruptable_readpartial(readable_io, maxlen, timeout) data_timeout_deadline = timeout < 0 ? nil : Time.now + timeout maximum_blocking_seconds = timeout < 0 || timeout > 10 ? 10 : timeout loop do return :stopping if stop? result = readable_io.read_nonblock(maxlen, exception: false) return result if result.kind_of?(String) raise EOFError if result.nil? return :data_timeout if (data_timeout_deadline && data_timeout_deadline < Time.now) IO.select([readable_io], nil, nil, maximum_blocking_seconds) end end
# File lib/logstash/inputs/unix.rb, line 150 def server? @mode == "server" end