class Fluent::Plugin::NetflowipfixInput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_netflowipfix.rb, line 94
def configure(conf)
        super
        $log.debug "NetflowipfixInput::configure: #{@bind}:#{@port}"
        @@connections ||=  {}
        if @@connections.nil?
        end
        @@connections[@port] = PortConnection.new(@bind, @port, @tag, @cache_ttl, @definitions, @queuesleep, log)
        log.debug "NetflowipfixInput::configure NB=#{@@connections.length}"  
        @total = 0
end
restartConnections() click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 134
def restartConnections
                @@connections.each do | port, conn |
                        $log.debug "restart parser #{conn.bind}:#{conn.port}"
                        conn.restartParser                         
                end
                        before = GC.stat(:total_freed_objects)
                        GC.start
                        after = GC.stat(:total_freed_objects)

end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_netflowipfix.rb, line 120
def shutdown
        super
        $log.debug "NetflowipfixInput::shutdown NB=#{@@connections.length}"  
        if @@connections.nil?
        else
                @@connections.each do | port, conn |
                        $log.debug "shutdown listening UDP on #{conn.bind}:#{conn.port}"
                        conn.stop                          
                end
                @@connections = nil
        end

end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/in_netflowipfix.rb, line 105
def start
        super
        
        $log.debug "NetflowipfixInput::start NB=#{@@connections.length}"     
        if @@connections.nil?
        else
                @@connections.each do | port, conn |
                        $log.debug "start listening UDP on #{conn.bind}:#{conn.port}"
                        conn.start                         
                end
        end                  
        
        waitForEvents
end
waitForEvents() click to toggle source
# File lib/fluent/plugin/in_netflowipfix.rb, line 145
                def waitForEvents
                        timeStart = Time.now.getutc.to_i
                        nb = 0
                        loop do
                                        @@connections.each do | port, conn |
                                                if (conn.event_queue_length > 0) 
                                                        $log.trace "waitForEvents: #{conn.bind}:#{conn.port} queue has #{conn.event_queue_length} elements"
                                                        nbq = conn.event_queue_length 
                                                        loop do
                                                                ar = conn.event_pop                     
                                                                time = ar[0]
                                                                record = ar[1]
                                                                router.emit(conn.tag, EventTime.new(time.to_i), record)
                                                                # Free up variables for garbage collection
                                                                ar = nil
                                                                time = nil
                                                                record = nil
                                                                nb = nb + 1
                                                                nbq = nbq - 1
                                                                break if nbq == 0
                                                        end 
                                                end
                                        end
#                                       @log.trace "NetflowipfixInput::waitForEvents ObjectSpace.memsize_of(NetflowipfixInput)=#{ObjectSpace.memsize_of(self)}"
                                        if Time.now.getutc.to_i - timeStart > 600 # 300 = 5 min
                                                restartConnections
                                                timeStart = Time.now.getutc.to_i
                                        end

                                        # Garbage collection
                                        if nb >= 20
                                                nb = 0
#                                               debugSpace
                                        end
                                        before = GC.stat(:total_freed_objects)
                                        GC.start
                                        after = GC.stat(:total_freed_objects)
#                                       $log.trace "waitForEvents: sleep #{@queuesleep}"
                                        sleep(@queuesleep)

                        end

                end