class Fluent::Papertrail
Constants
- DISCARD_STRING
declare const string for nullifying token if we decide to discard records
Attributes
sockets[RW]
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_papertrail.rb, line 24 def configure(conf) super # create initial sockets hash and socket based on config param @sockets = {} socket_key = form_socket_key(@papertrail_host, @papertrail_port) @sockets[socket_key] = create_socket(socket_key) # redefine default hostname if it's been passed in through ENV @default_hostname = ENV['FLUENT_HOSTNAME'] || @default_hostname end
create_packet(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_papertrail.rb, line 77 def create_packet(tag, time, record) # construct syslog packet from fluent record packet = SyslogProtocol::Packet.new packet.hostname = record['hostname'] || @default_hostname packet.facility = record['facility'] || 'local0' packet.severity = record['severity'] || 'info' packet.tag = record['program'] || tag packet.content = record['message'] || record['log'] packet.time = time ? Time.at(time) : Time.now packet end
create_socket(socket_key)
click to toggle source
# File lib/fluent/plugin/out_papertrail.rb, line 59 def create_socket(socket_key) log.info "initializing tcp socket for #{socket_key}" begin host, port = split_socket_key(socket_key) socket = TCPSocket.new(host, port) log.debug "enabling ssl for socket #{socket_key}" ssl = OpenSSL::SSL::SSLSocket.new(socket) # close tcp and ssl socket when either fails ssl.sync_close = true # initiate SSL/TLS handshake with server ssl.connect rescue => e log.warn "failed to create tcp socket #{socket_key}: #{e}" ssl = nil end ssl end
form_socket_key(host, port)
click to toggle source
# File lib/fluent/plugin/out_papertrail.rb, line 50 def form_socket_key(host, port) "#{host}:#{port}" end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_papertrail.rb, line 34 def format(tag, time, record) [tag, time, record].to_msgpack end
pick_socket(record)
click to toggle source
# File lib/fluent/plugin/out_papertrail.rb, line 89 def pick_socket(record) # if kubernetes pod has papertrail destination as annotation, use it if record.dig('kubernetes', 'annotations', 'solarwinds_io/papertrail_host') && \ record.dig('kubernetes', 'annotations', 'solarwinds_io/papertrail_port') host = record['kubernetes']['annotations']['solarwinds_io/papertrail_host'] port = record['kubernetes']['annotations']['solarwinds_io/papertrail_port'] # else if kubernetes namespace has papertrail destination as annotation, use it elsif record.dig('kubernetes', 'namespace_annotations', 'solarwinds_io/papertrail_host') && \ record.dig('kubernetes', 'namespace_annotations', 'solarwinds_io/papertrail_port') host = record['kubernetes']['namespace_annotations']['solarwinds_io/papertrail_host'] port = record['kubernetes']['namespace_annotations']['solarwinds_io/papertrail_port'] # else if it is a kubernetes log and we're discarding unannotated logs elsif @discard_unannotated_pod_logs && record.dig('kubernetes') host = DISCARD_STRING port = DISCARD_STRING # else use pre-configured destination else host = @papertrail_host port = @papertrail_port end socket_key = form_socket_key(host, port) if socket_key == ':' kubernetes_err_msg = '' if record.dig('kubernetes', 'namespace_name') namespace_name = record['kubernetes']['namespace_name'] kubernetes_err_msg = " from Kubernetes namespace: \"#{namespace_name}\"" end log.warn("Received nil socket_configuration#{kubernetes_err_msg}. Discarding message.") host = DISCARD_STRING port = DISCARD_STRING socket_key = form_socket_key(host, port) end socket_key end
send_to_papertrail(packet, socket_key)
click to toggle source
# File lib/fluent/plugin/out_papertrail.rb, line 126 def send_to_papertrail(packet, socket_key) if @sockets[socket_key].nil? err_msg = "Unable to create socket with #{socket_key}" raise SocketFailureError, err_msg else begin # send it @sockets[socket_key].puts packet.assemble(max_size=@maximum_syslog_packet_size) rescue => e err_msg = "Error writing to #{socket_key}: #{e}" # socket failed, reset to nil to recreate for the next write @sockets[socket_key] = nil raise SocketFailureError, err_msg, e.backtrace end end end
split_socket_key(socket_key)
click to toggle source
# File lib/fluent/plugin/out_papertrail.rb, line 54 def split_socket_key(socket_key) socket_key_arr = socket_key.split(':') return socket_key_arr[0], socket_key_arr[1] end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_papertrail.rb, line 38 def write(chunk) chunk.msgpack_each {|(tag, time, record)| socket_key = pick_socket(record) unless socket_key.eql? form_socket_key(DISCARD_STRING, DISCARD_STRING) # recreate the socket if it's nil @sockets[socket_key] ||= create_socket(socket_key) packet = create_packet(tag, time, record) send_to_papertrail(packet, socket_key) end } end