class Fluent::Plugin::NetflowipfixInput::ParserNetflowIpfix

Constants

FIELDS_FOR_COPY_v9_10
NETFLOWIPFIX_FIELD_CATEGORIES

Public Instance Methods

configure(cache_ttl, definitions) click to toggle source

config_param :switched_times_from_uptime, :bool, default: false config_param :versions, :array, default: [5, 9, 10]

# File lib/fluent/plugin/parser_netflow_v9.rb, line 38
                def configure(cache_ttl, definitions)
                        @cache_ttl = cache_ttl


                        @switched_times_from_uptime = false #, :bool, default: false
#                     @versions = [5, 9, 10]
                        @definitions = definitions
                        @missingTemplates = {}
                        @skipUnsupportedField = {}
                end

Private Instance Methods

handle_flowset_data(host, packet, flowset, block, templates, fields, ver) click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 142
                                        def handle_flowset_data(host, packet, flowset, block, templates, fields, ver)
                                                template_key = "#{host}|#{packet.source_id}|#{flowset.flowset_id}"
                                                #        $log.warn 'handle_flowset_data template:', template_key
                                                template = templates[template_key]
                                                if !template
                                                        # FIXED: repeating error message adds no value, added a count of missing packet until template is received
                                                        if @missingTemplates[template_key].nil? ||  @missingTemplates[template_key] == 0
                                                                @missingTemplates[template_key] = 1 
                                                                $log.warn 'No matching template for', host: host, source_id: packet.source_id, flowset_id: flowset.flowset_id
                                                        else
                                                                @missingTemplates[template_key] = @missingTemplates[template_key] + 1
                                                        end

                                                        return
                                                end # if !template

                                                # $log.info "v #{packet.version} flowset ", $flowset
                                                length = flowset.flowset_length - 4
                                                #        length = flowset.flowset_length

                                                if packet.version == 9
                                                        # Template shouldn't be longer than the flowset and there should
                                                        # be at most 3 padding bytes
                                                        #          if template.num_bytes > length or ! (length % template.num_bytes).between?(0, 3)
                                                        # warn: v9 Template length doesn't fit cleanly into flowset template_id=1024 template_length=59 flowset_length=120
                                                        # p (124
                                                        # TODO: is this a bug ????
                                                        if template.num_bytes > flowset.flowset_length or ! (length % template.num_bytes).between?(0, 3)
                                                                $log.warn "v9 Template length doesn't fit cleanly into flowset",
                                                                                template_id: flowset.flowset_id, 
                                                                                template_length: template.num_bytes, 
                                                                                flowset_length: length
                                                        #            return
                                                        end

                                                        array = BinData::Array.new(type: template, initial_length: length / template.num_bytes)
                                                elsif packet.version == 10
                                                        #          array = BinData::Array.new(type: template, initial_length: length / template.num_bytes)
                                                        array = BinData::Array.new(type: template, :read_until => :eof)
                                                end


                                                fields = array.read(flowset.flowset_data)
                                                        fields.each do |r|
                                                        #if is_sampler?(r)
                                                        #  sampler_key = "#{host}|#{pdu.source_id}|#{r.flow_sampler_id}"
                                                        #  register_sampler_v9 sampler_key, r
                                                        #  next
                                                        #end

                                                                time = packet.unix_sec  # TODO: pending from netflow plugin:  Fluent::EventTime (see: forV5)
                                                                event = {}

                                                                # Fewer fields in the v9 header
                                                                FIELDS_FOR_COPY_v9_10.each do |f|
                                                                        event[f] = packet[f]
                                                                end

                                                                event['flowset_id'] = flowset.flowset_id

                                                                r.each_pair { |k,v| event[k.to_s] = v }
                                                                # TODO: bug - this causes crashes, need to debug
                                                                unless @switched_times_from_uptime
                                                                        if packet.version == 9
                                                                        #              event['first_switched'] = format_for_switched(msec_from_boot_to_time(event['first_switched'], packet.uptime, time, 0))
                                                                        #              event['last_switched']  = format_for_switched(msec_from_boot_to_time(event['last_switched'] , packet.uptime, time, 0))
                                                                        elsif packet.version == 10
                                                                        #              event['first_switched'] = format_for_switched(msec_from_boot_to_time(event['first_switched'], packet.unix_sec, time, 0))
                                                                        #              event['last_switched']  = format_for_switched(msec_from_boot_to_time(event['last_switched'] , packet.unix_sec, time, 0))
                                                                        end
                                                                end

                                                        #if sampler_id = r['flow_sampler_id']
                                                        #  sampler_key = "#{host}|#{pdu.source_id}|#{sampler_id}"
                                                        #  if sampler = @samplers_v9[sampler_key]
                                                        #    event['sampling_algorithm'] ||= sampler['flow_sampler_mode']
                                                        #    event['sampling_interval'] ||= sampler['flow_sampler_random_interval']
                                                        #  end
                                                        #end

#                                                       block.call(time, event)
#                                                       if (defined?(block)).nil?
#                                                               log.error "*** handle_flowset_data block is blank"
#                                                       else
#                                                       end
                                                          block.call(time, event, host)

                                                end # fields = array.read
                                        end
handle_flowset_options_template(host, pdu, flowset, templates, p_fields) click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 117
def handle_flowset_options_template(host, pdu, flowset, templates, p_fields)
        flowset.flowset_data.templates.each do |template|
                catch (:field) do
                # We get this far, we have a list of fields
                key = "#{host}|#{pdu.source_id}|#{template.template_id}"

                fields = []

                NETFLOWIPFIX_FIELD_CATEGORIES.each do |category|
                        template["#{category}_fields"].each do |field|
                                entry = netflowipfix_field_for(field.field_type, field.field_length, p_fields, category, key)
                                throw :field unless entry
                                fields += entry
                        end # do field
                end # do category

                templates[key, @cache_ttl] = BinData::Struct.new(endian: :big, fields: fields)
                # Purge any expired templates
                templates.cleanup!
                end # catch
        end # do templates
end
handle_flowset_template(host, pdu, flowset, templates, p_fields) click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 51
def handle_flowset_template(host, pdu, flowset, templates, p_fields)
        #        $log.warn 'handle_flowset_template:', host, ';', pdu.version
        flowset.flowset_data.templates.each do |template|
                # $log.warn 'added template:', template.template_id, ',ver:',pdu.version
                key = "#{host}|#{pdu.source_id}|#{template.template_id}"
                catch (:field) do
                        fields = []
                        template.template_fields.each do |field|
                                # $log.warn 'v9 added field:', field.field_type
                                entry = netflowipfix_field_for(field.field_type, field.field_length, p_fields, key)
                                throw :field unless entry
                                fields += entry
                        end # do field
                        if !@missingTemplates[key].nil? && @missingTemplates[key] > 0
                                $log.warn "Template received after missing #{@missingTemplates[key]} packets",
                                host: host, source_id: pdu.source_id, flowset_id: template.template_id
                                @missingTemplates[key] = 0
                        end
                        # We get this far, we have a list of fields
                        templates[key, @cache_ttl] = BinData::Struct.new(endian: :big, fields: fields)
                        # $log.info("cache_ttl is #{@cache_ttl}")
                        # $log.info("v9 added template,flowset.source_id|template.template_id is #{key}")
                        # Purge any expired templates
                        templates.cleanup!
                end # catch
        end # each do |template|
end
is_sampler?(record) click to toggle source

covers Netflow v9 and v10 (a.k.a IPFIX)

# File lib/fluent/plugin/parser_netflow_v9.rb, line 233
def is_sampler?(record)
        record['flow_sampler_id'] && record['flow_sampler_mode'] && record['flow_sampler_random_interval']
end
netflowipfix_field_for(type, length, p_fields, category='option', key) click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 79
def netflowipfix_field_for(type, length, p_fields, category='option', key)
        unless field = p_fields[category][type]
                # TODO?: repeated message, but acceptable now
                # Skip unsupported field type=201 length=4 key="172.17.0.1|0|2049
                fkey = "#{key}|#{type}|#{length}"
                if @skipUnsupportedField[fkey].nil? || @skipUnsupportedField[fkey] == 0
                        $log.warn "Skip unsupported field", type: type, length: length, key:key
                        @skipUnsupportedField[fkey] = 1
                else
                        @skipUnsupportedField[fkey] = @skipUnsupportedField[fkey] + 1
                end
                return [[:skip, nil, {length: length}]]
        end # unless

        unless field.is_a?(Array)
                $log.warn "Skip non-Array definition", fields: field
                return [[:skip, nil, {length: length}]]
        end # unless

        # Small bit of fixup for numeric value, :skip or :string field length, which are dynamic
        case field[0]
                when Integer
                        [[uint_field(length, field[0]), field[1]]]
                when :skip
                        [field + [nil, {length: length}]]
                when :string
                        [field + [{length: length, trim_padding: true}]]
                when "octetArray"
                #          $log.warn "v10_paddingOctets ", type:field[0], name:field[1], len:length
                        oField = octetArray(length)
                        [[oField, field[1]]]
                else
                        [field]
        end # case
end
octetArray(length) click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 245
def octetArray(length)
        ("OctetArray" + length.to_s).to_sym
        case length
        when 1,"1"
                ("OctetArray1").to_sym
        when 2,"2"
                ("OctetArray2").to_sym
        else
                $log.error "No octet array of #{length} bytes"
        end
end
uint_field(length, default) click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 238
def uint_field(length, default)
        # If length is 4, return :uint32, etc. and use default if length is 0
        # $log.warn        ("uint" + (((length > 0) ? length : default) * 8).to_s)
        ("uint" + (((length > 0) ? length : default) * 8).to_s).to_sym
end