class LogStash::Inputs::Tcp

Read events over a TCP socket.

Like stdin and file inputs, each event is assumed to be one line of text.

Can either accept connections from clients or connect to a server, depending on `mode`.

#### Accepting log4j2 logs

Log4j2 can send JSON over a socket, and we can use that combined with our tcp input to accept the logs.

First, we need to configure your application to send logs in JSON over a socket. The following log4j2.xml accomplishes this task.

Note, you will want to change the `host` and `port` settings in this configuration to match your needs.

<Configuration>
  <Appenders>
     <Socket name="Socket" host="localhost" port="12345">
       <JsonLayout compact="true" eventEol="true" />
    </Socket>
  </Appenders>
  <Loggers>
    <Root level="info">
      <AppenderRef ref="Socket"/>
    </Root>
  </Loggers>
</Configuration>

To accept this in Logstash, you will want tcp input and a date filter:

input {
  tcp {
    port => 12345
    codec => json
  }
}

and add a date filter to take log4j2's `timeMillis` field and use it as the event timestamp

filter {
  date {
    match => [ "timeMillis", "UNIX_MS" ]
  }
}

Constants

HOST_FIELD
PORT_FIELD
PROXY_HOST_FIELD
PROXY_PORT_FIELD
SSLSUBJECT_FIELD

Public Class Methods

new(*args) click to toggle source
Calls superclass method
# File lib/logstash/inputs/tcp.rb, line 108
def initialize(*args)
  super(*args)

  # monkey patch TCPSocket and SSLSocket to include socket peer
  TCPSocket.module_eval{include ::LogStash::Util::SocketPeer}
  OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer}

  # threadsafe socket bookkeeping
  @server_socket = nil
  @client_socket = nil
  @connection_sockets = {}
  @socket_mutex = Mutex.new

  @ssl_context = nil
end

Public Instance Methods

close() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 151
def close
  # see related comment in register: we must make sure to close the server socket here
  # because it is created in the register method and we could be in the context of having
  # register called but never run & stop, only close.
  # catch all rescue nil on close to discard any close errors or invalid socket
  server_socket.close rescue nil
end
register() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 124
def register
  fix_streaming_codecs

  # note that since we are opening a socket in register, we must also make sure we close it
  # in the close method even if we also close it in the stop method since we could have
  # a situation where register is called but not run & stop.

  self.server_socket = new_server_socket if server?
end
run(output_queue) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 134
def run(output_queue)
  if server?
    run_server(output_queue)
  else
    run_client(output_queue)
  end
end
stop() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 142
def stop
  # force close all sockets which will escape any blocking read with a IO exception
  # and any thread using them will exit.
  # catch all rescue nil on close to discard any close errors or invalid socket
  server_socket.close rescue nil
  client_socket.close rescue nil
  connection_sockets.each{|socket| socket.close rescue nil}
end

Private Instance Methods

add_connection_socket(socket) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 371
def add_connection_socket(socket)
  @socket_mutex.synchronize{@connection_sockets[socket] = true}
  socket
end
client_socket() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 359
def client_socket
  @socket_mutex.synchronize{@client_socket}
end
client_socket=(socket) click to toggle source

threadsafe sockets bookkeeping

# File lib/logstash/inputs/tcp.rb, line 355
def client_socket=(socket)
  @socket_mutex.synchronize{@client_socket = socket}
end
client_thread(output_queue, socket) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 263
def client_thread(output_queue, socket)
  Thread.new(output_queue, socket) do |q, s|
    begin
      @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
      handle_socket(s, s.peeraddr[3], s.peeraddr[1], q, @codec.clone)
    rescue Interrupted
      s.close rescue nil
    ensure
      @client_threads_lock.synchronize{@client_threads.delete(Thread.current)}
    end
  end
end
connection_sockets() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 380
def connection_sockets
  @socket_mutex.synchronize{@connection_sockets.keys.dup}
end
delete_connection_socket(socket) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 376
def delete_connection_socket(socket)
  @socket_mutex.synchronize{@connection_sockets.delete(socket)}
end
handle_socket(socket, client_address, client_port, output_queue, codec) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 202
def handle_socket(socket, client_address, client_port, output_queue, codec)
  peer = "#{client_address}:#{client_port}"
  first_read = true
  while !stop?
    tbuf = read(socket)
    if @proxy_protocol && first_read
      first_read = false
      orig_buf = tbuf
      pp_hdr, tbuf = tbuf.split("\r\n", 2)

      pp_info = pp_hdr.split(/\s/)
      # PROXY proto clientip proxyip clientport proxyport
      if pp_info[0] != "PROXY"
        @logger.error("invalid proxy protocol header label", :hdr => pp_hdr)
        raise IOError
      else
        proxy_address = pp_info[3]
        proxy_port = pp_info[5]
        client_address = pp_info[2]
        client_port = pp_info[4]
      end
    end
    codec.decode(tbuf) do |event|
      if @proxy_protocol
        event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD)
        event.set(PROXY_PORT_FIELD, proxy_port) unless event.get(PROXY_PORT_FIELD)
      end
      event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD)
      event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD)
      event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil?

      decorate(event)
      output_queue << event
    end
  end
rescue EOFError
  @logger.debug? && @logger.debug("Connection closed", :client => peer)
rescue Errno::ECONNRESET
  @logger.debug? && @logger.debug("Connection reset by peer", :client => peer)
rescue OpenSSL::SSL::SSLError => e
  # Fixes issue #23
  @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
  socket.close rescue nil
rescue => e
  # if plugin is stopping, don't bother logging it as an error
  !stop? && @logger.error("An error occurred. Closing connection", :client => peer, :exception => e, :backtrace => e.backtrace)
ensure
  # catch all rescue nil on close to discard any close errors or invalid socket
  socket.close rescue nil

  codec.respond_to?(:flush) && codec.flush do |event|
    event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD)
    event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD)
    event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil?

    decorate(event)
    output_queue << event
  end
end
load_cert_store() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 304
def load_cert_store
  cert_store = OpenSSL::X509::Store.new
  cert_store.set_default_paths
  if File.directory?(@ssl_cacert)
    cert_store.add_path(@ssl_cacert)
  else
    cert_store.add_file(@ssl_cacert)
  end if @ssl_cacert
  @ssl_extra_chain_certs.each do |cert|
    cert_store.add_file(cert)
  end
  cert_store
end
new_client_socket() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 330
def new_client_socket
  socket = TCPSocket.new(@host, @port)

  if @ssl_enable
    socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context)
    socket.connect
  end

  @logger.debug? && @logger.debug("Opened connection", :client => "#{socket.peer}")

  socket
rescue OpenSSL::SSL::SSLError => e
  @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace)
  # catch all rescue nil on close to discard any close errors or invalid socket
  socket.close rescue nil
  sleep(1) # prevent hammering peer
  retry
rescue
  # if this exception occured while the plugin is stopping
  # just ignore and exit
  raise unless stop?
end
new_server_socket() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 318
def new_server_socket
  @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}")
  begin
    socket = TCPServer.new(@host, @port)
  rescue Errno::EADDRINUSE
    @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port)
    raise
  end

  @ssl_enable ? OpenSSL::SSL::SSLServer.new(socket, ssl_context) : socket
end
read(socket) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 281
def read(socket)
  socket.sysread(16384)
end
run_client(output_queue) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 181
def run_client(output_queue)
  while !stop?
    self.client_socket = new_client_socket
    handle_socket(client_socket, client_socket.peeraddr[3], client_socket.peeraddr[1], output_queue, @codec.clone)
  end
ensure
  # catch all rescue nil on close to discard any close errors or invalid socket
  client_socket.close rescue nil
end
run_server(output_queue) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 161
def run_server(output_queue)
  while !stop?
    begin
      socket = add_connection_socket(server_socket.accept)
      # start a new thread for each connection.
      server_connection_thread(output_queue, socket)
    rescue OpenSSL::SSL::SSLError => e
      # log error, close socket, accept next connection
      @logger.debug? && @logger.debug("SSL Error", :exception => e, :backtrace => e.backtrace)
    rescue => e
      # if this exception occured while the plugin is stopping
      # just ignore and exit
      raise e unless stop?
    end
  end
ensure
  # catch all rescue nil on close to discard any close errors or invalid socket
  server_socket.close rescue nil
end
server?() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 277
def server?
  @mode == "server"
end
server_connection_thread(output_queue, socket) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 191
def server_connection_thread(output_queue, socket)
  Thread.new(output_queue, socket) do |q, s|
    begin
      @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}")
      handle_socket(s, s.peeraddr[3], s.peeraddr[1], q, @codec.clone)
    ensure
      delete_connection_socket(s)
    end
  end
end
server_socket() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 367
def server_socket
  @socket_mutex.synchronize{@server_socket}
end
server_socket=(socket) click to toggle source
# File lib/logstash/inputs/tcp.rb, line 363
def server_socket=(socket)
  @socket_mutex.synchronize{@server_socket = socket}
end
ssl_context() click to toggle source
# File lib/logstash/inputs/tcp.rb, line 285
def ssl_context
  return @ssl_context if @ssl_context

  begin
    @ssl_context = OpenSSL::SSL::SSLContext.new
    @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert))
    @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase.value)
    if @ssl_verify
      @ssl_context.cert_store  = load_cert_store
      @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT
    end
  rescue => e
    @logger.error("Could not inititalize SSL context", :exception => e, :backtrace => e.backtrace)
    raise e
  end

  @ssl_context
end