class LogStash::Inputs::Tcp
Read events over a TCP socket.
Like stdin and file inputs, each event is assumed to be one line of text.
Can either accept connections from clients or connect to a server, depending on `mode`.
#### Accepting log4j2 logs
Log4j2 can send JSON over a socket, and we can use that combined with our tcp input to accept the logs.
First, we need to configure your application to send logs in JSON over a socket. The following log4j2.xml accomplishes this task.
Note, you will want to change the `host` and `port` settings in this configuration to match your needs.
<Configuration> <Appenders> <Socket name="Socket" host="localhost" port="12345"> <JsonLayout compact="true" eventEol="true" /> </Socket> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Socket"/> </Root> </Loggers> </Configuration>
To accept this in Logstash, you will want tcp input and a date filter:
input { tcp { port => 12345 codec => json } }
and add a date filter to take log4j2's `timeMillis` field and use it as the event timestamp
filter { date { match => [ "timeMillis", "UNIX_MS" ] } }
Constants
- HOST_FIELD
- PORT_FIELD
- PROXY_HOST_FIELD
- PROXY_PORT_FIELD
- SSLSUBJECT_FIELD
Public Class Methods
new(*args)
click to toggle source
Calls superclass method
# File lib/logstash/inputs/tcp.rb, line 108 def initialize(*args) super(*args) # monkey patch TCPSocket and SSLSocket to include socket peer TCPSocket.module_eval{include ::LogStash::Util::SocketPeer} OpenSSL::SSL::SSLSocket.module_eval{include ::LogStash::Util::SocketPeer} # threadsafe socket bookkeeping @server_socket = nil @client_socket = nil @connection_sockets = {} @socket_mutex = Mutex.new @ssl_context = nil end
Public Instance Methods
close()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 151 def close # see related comment in register: we must make sure to close the server socket here # because it is created in the register method and we could be in the context of having # register called but never run & stop, only close. # catch all rescue nil on close to discard any close errors or invalid socket server_socket.close rescue nil end
register()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 124 def register fix_streaming_codecs # note that since we are opening a socket in register, we must also make sure we close it # in the close method even if we also close it in the stop method since we could have # a situation where register is called but not run & stop. self.server_socket = new_server_socket if server? end
run(output_queue)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 134 def run(output_queue) if server? run_server(output_queue) else run_client(output_queue) end end
stop()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 142 def stop # force close all sockets which will escape any blocking read with a IO exception # and any thread using them will exit. # catch all rescue nil on close to discard any close errors or invalid socket server_socket.close rescue nil client_socket.close rescue nil connection_sockets.each{|socket| socket.close rescue nil} end
Private Instance Methods
add_connection_socket(socket)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 371 def add_connection_socket(socket) @socket_mutex.synchronize{@connection_sockets[socket] = true} socket end
client_socket()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 359 def client_socket @socket_mutex.synchronize{@client_socket} end
client_socket=(socket)
click to toggle source
threadsafe sockets bookkeeping
# File lib/logstash/inputs/tcp.rb, line 355 def client_socket=(socket) @socket_mutex.synchronize{@client_socket = socket} end
client_thread(output_queue, socket)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 263 def client_thread(output_queue, socket) Thread.new(output_queue, socket) do |q, s| begin @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") handle_socket(s, s.peeraddr[3], s.peeraddr[1], q, @codec.clone) rescue Interrupted s.close rescue nil ensure @client_threads_lock.synchronize{@client_threads.delete(Thread.current)} end end end
connection_sockets()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 380 def connection_sockets @socket_mutex.synchronize{@connection_sockets.keys.dup} end
delete_connection_socket(socket)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 376 def delete_connection_socket(socket) @socket_mutex.synchronize{@connection_sockets.delete(socket)} end
handle_socket(socket, client_address, client_port, output_queue, codec)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 202 def handle_socket(socket, client_address, client_port, output_queue, codec) peer = "#{client_address}:#{client_port}" first_read = true while !stop? tbuf = read(socket) if @proxy_protocol && first_read first_read = false orig_buf = tbuf pp_hdr, tbuf = tbuf.split("\r\n", 2) pp_info = pp_hdr.split(/\s/) # PROXY proto clientip proxyip clientport proxyport if pp_info[0] != "PROXY" @logger.error("invalid proxy protocol header label", :hdr => pp_hdr) raise IOError else proxy_address = pp_info[3] proxy_port = pp_info[5] client_address = pp_info[2] client_port = pp_info[4] end end codec.decode(tbuf) do |event| if @proxy_protocol event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD) event.set(PROXY_PORT_FIELD, proxy_port) unless event.get(PROXY_PORT_FIELD) end event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD) event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD) event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil? decorate(event) output_queue << event end end rescue EOFError @logger.debug? && @logger.debug("Connection closed", :client => peer) rescue Errno::ECONNRESET @logger.debug? && @logger.debug("Connection reset by peer", :client => peer) rescue OpenSSL::SSL::SSLError => e # Fixes issue #23 @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace) socket.close rescue nil rescue => e # if plugin is stopping, don't bother logging it as an error !stop? && @logger.error("An error occurred. Closing connection", :client => peer, :exception => e, :backtrace => e.backtrace) ensure # catch all rescue nil on close to discard any close errors or invalid socket socket.close rescue nil codec.respond_to?(:flush) && codec.flush do |event| event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD) event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD) event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil? decorate(event) output_queue << event end end
load_cert_store()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 304 def load_cert_store cert_store = OpenSSL::X509::Store.new cert_store.set_default_paths if File.directory?(@ssl_cacert) cert_store.add_path(@ssl_cacert) else cert_store.add_file(@ssl_cacert) end if @ssl_cacert @ssl_extra_chain_certs.each do |cert| cert_store.add_file(cert) end cert_store end
new_client_socket()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 330 def new_client_socket socket = TCPSocket.new(@host, @port) if @ssl_enable socket = OpenSSL::SSL::SSLSocket.new(socket, ssl_context) socket.connect end @logger.debug? && @logger.debug("Opened connection", :client => "#{socket.peer}") socket rescue OpenSSL::SSL::SSLError => e @logger.error("SSL Error", :exception => e, :backtrace => e.backtrace) # catch all rescue nil on close to discard any close errors or invalid socket socket.close rescue nil sleep(1) # prevent hammering peer retry rescue # if this exception occured while the plugin is stopping # just ignore and exit raise unless stop? end
new_server_socket()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 318 def new_server_socket @logger.info("Starting tcp input listener", :address => "#{@host}:#{@port}") begin socket = TCPServer.new(@host, @port) rescue Errno::EADDRINUSE @logger.error("Could not start TCP server: Address in use", :host => @host, :port => @port) raise end @ssl_enable ? OpenSSL::SSL::SSLServer.new(socket, ssl_context) : socket end
read(socket)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 281 def read(socket) socket.sysread(16384) end
run_client(output_queue)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 181 def run_client(output_queue) while !stop? self.client_socket = new_client_socket handle_socket(client_socket, client_socket.peeraddr[3], client_socket.peeraddr[1], output_queue, @codec.clone) end ensure # catch all rescue nil on close to discard any close errors or invalid socket client_socket.close rescue nil end
run_server(output_queue)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 161 def run_server(output_queue) while !stop? begin socket = add_connection_socket(server_socket.accept) # start a new thread for each connection. server_connection_thread(output_queue, socket) rescue OpenSSL::SSL::SSLError => e # log error, close socket, accept next connection @logger.debug? && @logger.debug("SSL Error", :exception => e, :backtrace => e.backtrace) rescue => e # if this exception occured while the plugin is stopping # just ignore and exit raise e unless stop? end end ensure # catch all rescue nil on close to discard any close errors or invalid socket server_socket.close rescue nil end
server?()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 277 def server? @mode == "server" end
server_connection_thread(output_queue, socket)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 191 def server_connection_thread(output_queue, socket) Thread.new(output_queue, socket) do |q, s| begin @logger.debug? && @logger.debug("Accepted connection", :client => s.peer, :server => "#{@host}:#{@port}") handle_socket(s, s.peeraddr[3], s.peeraddr[1], q, @codec.clone) ensure delete_connection_socket(s) end end end
server_socket()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 367 def server_socket @socket_mutex.synchronize{@server_socket} end
server_socket=(socket)
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 363 def server_socket=(socket) @socket_mutex.synchronize{@server_socket = socket} end
ssl_context()
click to toggle source
# File lib/logstash/inputs/tcp.rb, line 285 def ssl_context return @ssl_context if @ssl_context begin @ssl_context = OpenSSL::SSL::SSLContext.new @ssl_context.cert = OpenSSL::X509::Certificate.new(File.read(@ssl_cert)) @ssl_context.key = OpenSSL::PKey::RSA.new(File.read(@ssl_key),@ssl_key_passphrase.value) if @ssl_verify @ssl_context.cert_store = load_cert_store @ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER|OpenSSL::SSL::VERIFY_FAIL_IF_NO_PEER_CERT end rescue => e @logger.error("Could not inititalize SSL context", :exception => e, :backtrace => e.backtrace) raise e end @ssl_context end