class Fluent::Plugin::NetflowipfixInput::ParserThread

Public Class Methods

new(udpQueue, queuesleep, eventQueue, cache_ttl, definitions, log) click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 247
def initialize(udpQueue, queuesleep, eventQueue, cache_ttl, definitions, log)
        @udpQueue = udpQueue
        @queuesleep = queuesleep
        @eventQueue = eventQueue
        @log = log

        @parser_v5 = NetflowipfixInput::ParserNetflowv5.new
        @parser_v9 = NetflowipfixInput::ParserNetflowv9.new
        @parser_v10 = NetflowipfixInput::ParserIPfixv10.new

        @parser_v9.configure(cache_ttl, definitions)
        @parser_v10.configure(cache_ttl, definitions)
end

Public Instance Methods

close() click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 265
def close
        # Garbage collection
        @parser_v5 = nil
        @parser_v9 = nil
        @parser_v10 = nil
        GC.start
end
emit(time, event, host = nil) click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 318
def emit(time, event, host = nil)
        if !host.nil?
                event["host"] = host
        end
        @eventQueue << [time, event]
        @log.trace "ParserThread::emit #{@eventQueue.length}"
end
join() click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 273
def join
                @thread.join
end
run() click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 277
def run
        loop do
                if @udpQueue.length == 0
                        sleep(@queuesleep)

                else
                        block = method(:emit)
                        ar = @udpQueue.pop
                        time = ar[0]
                        msg = ar[1]
                        payload = msg["message"]
                        host = msg["sender"]
                        
                        version,_ = payload[0,2].unpack('n')
                        @log.trace "ParserThread::pop #{@udpQueue.length} v#{version}"

                        case version
                                when 5          
                                        packet = NetflowipfixInput::Netflow5Packet.read(payload)
                                        @parser_v5.handle_v5(host, packet, block)
                                when 9
                                        packet = NetflowipfixInput::Netflow9Packet.read(payload)
                                        @parser_v9.handle_v9(host, packet, block)
                                when 10
                                        packet = NetflowipfixInput::Netflow10Packet.read(payload)
                                        @parser_v10.handle_v10(host, packet, block)
                                else
                                        $log.warn "Unsupported Netflow version v#{version}: #{version.class}"
                        end # case

                        # Free up variables for garbage collection
                        ar = @udpQueue.pop
                        version = nil
                        time = nil
                        msg = nil
                        payload = nil
                        host = nil

                end
        end # loop do
end
start() click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 260
def start
        @thread = Thread.new(&method(:run))
        @log.debug "ParserThread::start"
end