class Fluent::Plugin::NetflowipfixInput::ParserNetflowIpfix
Constants
- FIELDS_FOR_COPY_v9_10
- NETFLOWIPFIX_FIELD_CATEGORIES
Public Instance Methods
configure(cache_ttl, definitions)
click to toggle source
config_param :switched_times_from_uptime, :bool, default: false config_param :versions, :array, default: [5, 9, 10]
# File lib/fluent/plugin/parser_netflow_v9.rb, line 38 def configure(cache_ttl, definitions) @cache_ttl = cache_ttl @switched_times_from_uptime = false #, :bool, default: false # @versions = [5, 9, 10] @definitions = definitions @missingTemplates = {} @skipUnsupportedField = {} end
Private Instance Methods
handle_flowset_data(host, packet, flowset, block, templates, fields, ver)
click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 142 def handle_flowset_data(host, packet, flowset, block, templates, fields, ver) template_key = "#{host}|#{packet.source_id}|#{flowset.flowset_id}" # $log.warn 'handle_flowset_data template:', template_key template = templates[template_key] if !template # FIXED: repeating error message adds no value, added a count of missing packet until template is received if @missingTemplates[template_key].nil? || @missingTemplates[template_key] == 0 @missingTemplates[template_key] = 1 $log.warn 'No matching template for', host: host, source_id: packet.source_id, flowset_id: flowset.flowset_id else @missingTemplates[template_key] = @missingTemplates[template_key] + 1 end return end # if !template # $log.info "v #{packet.version} flowset ", $flowset length = flowset.flowset_length - 4 # length = flowset.flowset_length if packet.version == 9 # Template shouldn't be longer than the flowset and there should # be at most 3 padding bytes # if template.num_bytes > length or ! (length % template.num_bytes).between?(0, 3) # warn: v9 Template length doesn't fit cleanly into flowset template_id=1024 template_length=59 flowset_length=120 # p (124 # TODO: is this a bug ???? if template.num_bytes > flowset.flowset_length or ! (length % template.num_bytes).between?(0, 3) $log.warn "v9 Template length doesn't fit cleanly into flowset", template_id: flowset.flowset_id, template_length: template.num_bytes, flowset_length: length # return end array = BinData::Array.new(type: template, initial_length: length / template.num_bytes) elsif packet.version == 10 # array = BinData::Array.new(type: template, initial_length: length / template.num_bytes) array = BinData::Array.new(type: template, :read_until => :eof) end fields = array.read(flowset.flowset_data) fields.each do |r| #if is_sampler?(r) # sampler_key = "#{host}|#{pdu.source_id}|#{r.flow_sampler_id}" # register_sampler_v9 sampler_key, r # next #end time = packet.unix_sec # TODO: pending from netflow plugin: Fluent::EventTime (see: forV5) event = {} # Fewer fields in the v9 header FIELDS_FOR_COPY_v9_10.each do |f| event[f] = packet[f] end event['flowset_id'] = flowset.flowset_id r.each_pair { |k,v| event[k.to_s] = v } # TODO: bug - this causes crashes, need to debug unless @switched_times_from_uptime if packet.version == 9 # event['first_switched'] = format_for_switched(msec_from_boot_to_time(event['first_switched'], packet.uptime, time, 0)) # event['last_switched'] = format_for_switched(msec_from_boot_to_time(event['last_switched'] , packet.uptime, time, 0)) elsif packet.version == 10 # event['first_switched'] = format_for_switched(msec_from_boot_to_time(event['first_switched'], packet.unix_sec, time, 0)) # event['last_switched'] = format_for_switched(msec_from_boot_to_time(event['last_switched'] , packet.unix_sec, time, 0)) end end #if sampler_id = r['flow_sampler_id'] # sampler_key = "#{host}|#{pdu.source_id}|#{sampler_id}" # if sampler = @samplers_v9[sampler_key] # event['sampling_algorithm'] ||= sampler['flow_sampler_mode'] # event['sampling_interval'] ||= sampler['flow_sampler_random_interval'] # end #end # block.call(time, event) # if (defined?(block)).nil? # log.error "*** handle_flowset_data block is blank" # else # end block.call(time, event, host) end # fields = array.read end
handle_flowset_options_template(host, pdu, flowset, templates, p_fields)
click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 117 def handle_flowset_options_template(host, pdu, flowset, templates, p_fields) flowset.flowset_data.templates.each do |template| catch (:field) do # We get this far, we have a list of fields key = "#{host}|#{pdu.source_id}|#{template.template_id}" fields = [] NETFLOWIPFIX_FIELD_CATEGORIES.each do |category| template["#{category}_fields"].each do |field| entry = netflowipfix_field_for(field.field_type, field.field_length, p_fields, category, key) throw :field unless entry fields += entry end # do field end # do category templates[key, @cache_ttl] = BinData::Struct.new(endian: :big, fields: fields) # Purge any expired templates templates.cleanup! end # catch end # do templates end
handle_flowset_template(host, pdu, flowset, templates, p_fields)
click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 51 def handle_flowset_template(host, pdu, flowset, templates, p_fields) # $log.warn 'handle_flowset_template:', host, ';', pdu.version flowset.flowset_data.templates.each do |template| # $log.warn 'added template:', template.template_id, ',ver:',pdu.version key = "#{host}|#{pdu.source_id}|#{template.template_id}" catch (:field) do fields = [] template.template_fields.each do |field| # $log.warn 'v9 added field:', field.field_type entry = netflowipfix_field_for(field.field_type, field.field_length, p_fields, key) throw :field unless entry fields += entry end # do field if !@missingTemplates[key].nil? && @missingTemplates[key] > 0 $log.warn "Template received after missing #{@missingTemplates[key]} packets", host: host, source_id: pdu.source_id, flowset_id: template.template_id @missingTemplates[key] = 0 end # We get this far, we have a list of fields templates[key, @cache_ttl] = BinData::Struct.new(endian: :big, fields: fields) # $log.info("cache_ttl is #{@cache_ttl}") # $log.info("v9 added template,flowset.source_id|template.template_id is #{key}") # Purge any expired templates templates.cleanup! end # catch end # each do |template| end
is_sampler?(record)
click to toggle source
covers Netflow v9 and v10 (a.k.a IPFIX)
# File lib/fluent/plugin/parser_netflow_v9.rb, line 233 def is_sampler?(record) record['flow_sampler_id'] && record['flow_sampler_mode'] && record['flow_sampler_random_interval'] end
netflowipfix_field_for(type, length, p_fields, category='option', key)
click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 79 def netflowipfix_field_for(type, length, p_fields, category='option', key) unless field = p_fields[category][type] # TODO?: repeated message, but acceptable now # Skip unsupported field type=201 length=4 key="172.17.0.1|0|2049 fkey = "#{key}|#{type}|#{length}" if @skipUnsupportedField[fkey].nil? || @skipUnsupportedField[fkey] == 0 $log.warn "Skip unsupported field", type: type, length: length, key:key @skipUnsupportedField[fkey] = 1 else @skipUnsupportedField[fkey] = @skipUnsupportedField[fkey] + 1 end return [[:skip, nil, {length: length}]] end # unless unless field.is_a?(Array) $log.warn "Skip non-Array definition", fields: field return [[:skip, nil, {length: length}]] end # unless # Small bit of fixup for numeric value, :skip or :string field length, which are dynamic case field[0] when Integer [[uint_field(length, field[0]), field[1]]] when :skip [field + [nil, {length: length}]] when :string [field + [{length: length, trim_padding: true}]] when "octetArray" # $log.warn "v10_paddingOctets ", type:field[0], name:field[1], len:length oField = octetArray(length) [[oField, field[1]]] else [field] end # case end
octetArray(length)
click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 245 def octetArray(length) ("OctetArray" + length.to_s).to_sym case length when 1,"1" ("OctetArray1").to_sym when 2,"2" ("OctetArray2").to_sym else $log.error "No octet array of #{length} bytes" end end
uint_field(length, default)
click to toggle source
# File lib/fluent/plugin/parser_netflow_v9.rb, line 238 def uint_field(length, default) # If length is 4, return :uint32, etc. and use default if length is 0 # $log.warn ("uint" + (((length > 0) ? length : default) * 8).to_s) ("uint" + (((length > 0) ? length : default) * 8).to_s).to_sym end