class Fluent::Plugin::NetflowipfixInput::ParserThread
Public Class Methods
new(udpQueue, queuesleep, eventQueue, cache_ttl, definitions, log)
click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 247 def initialize(udpQueue, queuesleep, eventQueue, cache_ttl, definitions, log) @udpQueue = udpQueue @queuesleep = queuesleep @eventQueue = eventQueue @log = log @parser_v5 = NetflowipfixInput::ParserNetflowv5.new @parser_v9 = NetflowipfixInput::ParserNetflowv9.new @parser_v10 = NetflowipfixInput::ParserIPfixv10.new @parser_v9.configure(cache_ttl, definitions) @parser_v10.configure(cache_ttl, definitions) end
Public Instance Methods
close()
click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 265 def close # Garbage collection @parser_v5 = nil @parser_v9 = nil @parser_v10 = nil GC.start end
emit(time, event, host = nil)
click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 318 def emit(time, event, host = nil) if !host.nil? event["host"] = host end @eventQueue << [time, event] @log.trace "ParserThread::emit #{@eventQueue.length}" end
join()
click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 273 def join @thread.join end
run()
click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 277 def run loop do if @udpQueue.length == 0 sleep(@queuesleep) else block = method(:emit) ar = @udpQueue.pop time = ar[0] msg = ar[1] payload = msg["message"] host = msg["sender"] version,_ = payload[0,2].unpack('n') @log.trace "ParserThread::pop #{@udpQueue.length} v#{version}" case version when 5 packet = NetflowipfixInput::Netflow5Packet.read(payload) @parser_v5.handle_v5(host, packet, block) when 9 packet = NetflowipfixInput::Netflow9Packet.read(payload) @parser_v9.handle_v9(host, packet, block) when 10 packet = NetflowipfixInput::Netflow10Packet.read(payload) @parser_v10.handle_v10(host, packet, block) else $log.warn "Unsupported Netflow version v#{version}: #{version.class}" end # case # Free up variables for garbage collection ar = @udpQueue.pop version = nil time = nil msg = nil payload = nil host = nil end end # loop do end
start()
click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 260 def start @thread = Thread.new(&method(:run)) @log.debug "ParserThread::start" end