class Fluent::Plugin::NetflowParser

port from logstash's netflow parser

Constants

FIELDS_FOR_COPY_V9
NETFLOW_V5_HEADER_BYTES
NETFLOW_V5_HEADER_FORMAT
NETFLOW_V5_RECORD_BYTES
NETFLOW_V5_RECORD_FORMAT
NETFLOW_V9_FIELD_CATEGORIES

Public Instance Methods

call(payload, host=nil, &block) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 50
def call(payload, host=nil, &block)
  version,_ = payload[0,2].unpack('n')
  case version
  when 5
    forV5(payload, block)
  when 9
    # TODO: implement forV9
    pdu = Netflow9PDU.read(payload)
    handle_v9(host, pdu, block)
  else
    $log.warn "Unsupported Netflow version v#{version}: #{version.class}"
  end
end
configure(conf) click to toggle source

Cisco NetFlow Export Datagram Format www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html Cisco NetFlow Version 9 Flow-Record Format www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html

Calls superclass method
# File lib/fluent/plugin/parser_netflow.rb, line 25
def configure(conf)
  super

  @templates = Vash.new()
  @samplers_v9 = Vash.new()
  # Path to default Netflow v9 field definitions
  filename = File.expand_path('../netflow_fields.yaml', __FILE__)

  begin
    @template_fields = YAML.load_file(filename)
  rescue => e
    raise Fluent::ConfigError, "Bad syntax in definitions file #{filename}, error_class = #{e.class.name}, error = #{e.message}"
  end

  # Allow the user to augment/override/rename the supported Netflow fields
  if @definitions
    raise Fluent::ConfigError, "definitions file #{@definitions} doesn't exist" unless File.exist?(@definitions)
    begin
      @template_fields['option'].merge!(YAML.load_file(@definitions))
    rescue => e
      raise Fluent::ConfigError, "Bad syntax in definitions file #{@definitions}, error_class = #{e.class.name}, error = #{e.message}"
    end
  end
end

Private Instance Methods

forV5(payload, block) click to toggle source

V5 records array :records, initial_length: :flow_records do

ip4_addr :ipv4_src_addr # uint32 N
ip4_addr :ipv4_dst_addr # uint32 N
ip4_addr :ipv4_next_hop # uint32 N
uint16   :input_snmp    # n
uint16   :output_snmp   # n
uint32   :in_pkts       # N
uint32   :in_bytes      # N
uint32   :first_switched # N
uint32   :last_switched  # N
uint16   :l4_src_port    # n
uint16   :l4_dst_port    # n
skip     length: 1  # n -> (ignored)
uint8    :tcp_flags #   -> 0x00ff
uint8    :protocol  # n -> 0xff00
uint8    :src_tos   #   -> 0x00ff
uint16   :src_as   # n
uint16   :dst_as   # n
uint8    :src_mask # n -> 0xff00
uint8    :dst_mask #   -> 0x00ff
skip     length: 2 # xx

end

# File lib/fluent/plugin/parser_netflow.rb, line 141
def forV5(payload, block)
  version, flow_records, uptime, unix_sec, unix_nsec, flow_seq_num, engine, sampling = payload.unpack(NETFLOW_V5_HEADER_FORMAT)
  engine_type = (engine & 0xff00) >> 8
  engine_id = engine & 0x00ff
  sampling_algorithm = (sampling & 0b1100000000000000) >> 14
  sampling_interval = sampling & 0b0011111111111111

  time = Fluent::EventTime.new(unix_sec.to_i, unix_nsec.to_i)

  records_bytes = payload.bytesize - NETFLOW_V5_HEADER_BYTES

  if records_bytes / NETFLOW_V5_RECORD_BYTES != flow_records
    $log.warn "bytesize mismatch, records_bytes:#{records_bytes}, records:#{flow_records}"
    return
  end

  format_full = NETFLOW_V5_RECORD_FORMAT * flow_records
  objects = payload[NETFLOW_V5_HEADER_BYTES, records_bytes].unpack(format_full)

  while objects.size > 0
    src_addr, dst_addr, next_hop, input_snmp, output_snmp,
    in_pkts, in_bytes, first_switched, last_switched, l4_src_port, l4_dst_port,
    tcp_flags_16, protocol_src_tos, src_as, dst_as, src_dst_mask = objects.shift(16)
    record = {
      "version" => version,
      "uptime"  => uptime,
      "flow_records" => flow_records,
      "flow_seq_num" => flow_seq_num,
      "engine_type"  => engine_type,
      "engine_id"    => engine_id,
      "sampling_algorithm" => sampling_algorithm,
      "sampling_interval"  => sampling_interval,

      "ipv4_src_addr" => ipv4_addr_to_string(src_addr),
      "ipv4_dst_addr" => ipv4_addr_to_string(dst_addr),
      "ipv4_next_hop" => ipv4_addr_to_string(next_hop),
      "input_snmp"  => input_snmp,
      "output_snmp" => output_snmp,
      "in_pkts"  => in_pkts,
      "in_bytes" => in_bytes,
      "first_switched" => first_switched,
      "last_switched"  => last_switched,
      "l4_src_port" => l4_src_port,
      "l4_dst_port" => l4_dst_port,
      "tcp_flags" => tcp_flags_16 & 0x00ff,
      "protocol" => (protocol_src_tos & 0xff00) >> 8,
      "src_tos"  => (protocol_src_tos & 0x00ff),
      "src_as"   => src_as,
      "dst_as"   => dst_as,
      "src_mask" => (src_dst_mask & 0xff00) >> 8,
      "dst_mask" => (src_dst_mask & 0x00ff)
    }
    unless @switched_times_from_uptime
      record["first_switched"] = format_for_switched(msec_from_boot_to_time(record["first_switched"], uptime, unix_sec, unix_nsec))
      record["last_switched"]  = format_for_switched(msec_from_boot_to_time(record["last_switched"] , uptime, unix_sec, unix_nsec))
    end

    block.call(time, record)
  end
end
format_for_flowMicroSeconds(time) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 93
def format_for_flowMicroSeconds(time)
  time.utc.strftime("%Y-%m-%dT%H:%M:%S.%6NZ".freeze)
end
format_for_flowMilliSeconds(time) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 89
def format_for_flowMilliSeconds(time)
  time.utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ".freeze)
end
format_for_flowNanoSeconds(time) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 97
def format_for_flowNanoSeconds(time)
  time.utc.strftime("%Y-%m-%dT%H:%M:%S.%9NZ".freeze)
end
format_for_flowSeconds(time) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 85
def format_for_flowSeconds(time)
  time.utc.strftime("%Y-%m-%dT%H:%M:%S".freeze)
end
format_for_switched(time) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 81
def format_for_switched(time)
  time.utc.strftime("%Y-%m-%dT%H:%M:%S.%3NZ".freeze)
end
handle_v9(host, pdu, block) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 202
def handle_v9(host, pdu, block)
  pdu.records.each do |flowset|
    case flowset.flowset_id
    when 0
      handle_v9_flowset_template(host, pdu, flowset)
    when 1
      handle_v9_flowset_options_template(host, pdu, flowset)
    when 256..65535
      handle_v9_flowset_data(host, pdu, flowset, block)
    else
      $log.warn 'Unsupported flowset', flowset_id: flowset.flowset_id
    end
  end
end
handle_v9_flowset_data(host, pdu, flowset, block) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 263
def handle_v9_flowset_data(host, pdu, flowset, block)
  template_key = "#{host}|#{pdu.source_id}|#{flowset.flowset_id}"
  template = @templates[template_key]
  if ! template
    $log.warn 'No matching template for',
              host: host, source_id: pdu.source_id, flowset_id: flowset.flowset_id
    return
  end

  length = flowset.flowset_length - 4

  # Template shouldn't be longer than the flowset and there should
  # be at most 3 padding bytes
  if template.num_bytes > length or ! (length % template.num_bytes).between?(0, 3)
    $log.warn "Template length doesn't fit cleanly into flowset",
              template_id: flowset.flowset_id, template_length: template.num_bytes, flowset_length: length
    return
  end

  array = BinData::Array.new(type: template, initial_length: length / template.num_bytes)

  template_fields = array.read(flowset.flowset_data)
  template_fields.each do |r|
    if is_sampler?(r)
      sampler_key = "#{host}|#{pdu.source_id}|#{r.flow_sampler_id}"
      register_sampler_v9 sampler_key, r
      next
    end

    time = Fluent::EventTime.new(pdu.unix_sec.to_i)
    event = {}

    # Fewer fields in the v9 header
    FIELDS_FOR_COPY_V9.each do |f|
      event[f] = pdu[f]
    end

    event['flowset_id'] = flowset.flowset_id

    r.each_pair do |k, v|
      case k
      when :first_switched
        unless @switched_times_from_uptime
          event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0))
        end
      when :last_switched
        unless @switched_times_from_uptime
          event[k.to_s] = format_for_switched(msec_from_boot_to_time(v.snapshot, pdu.uptime, time, 0))
        end
      when :flowStartSeconds
        event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0))
      when :flowEndSeconds
        event[k.to_s] = format_for_flowSeconds(Time.at(v.snapshot, 0))
      when :flowStartMilliseconds
        divisor = 1_000
        microseconds = (v.snapshot % 1_000) * 1_000
        event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds))
      when :flowEndMilliseconds
        divisor = 1_000
        microseconds = (v.snapshot % 1_000) * 1_000
        event[k.to_s] = format_for_flowMilliSeconds(Time.at(v.snapshot / divisor, microseconds))
      when :flowStartMicroseconds
        divisor = 1_000_000
        microseconds = (v.snapshot % 1_000_000)
        event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds))
      when :flowEndMicroseconds
        divisor = 1_000_000
        microseconds = (v.snapshot % 1_000_000)
        event[k.to_s] = format_for_flowMicroSeconds(Time.at(v.snapshot / divisor, microseconds))
      when :flowStartNanoseconds
        divisor = 1_000_000_000
        microseconds = (v.snapshot % 1_000_000_000) / 1_000
        nanoseconds = v.snapshot % 1_000_000_000
        time_with_nano = Time.at(v.snapshot / divisor, microseconds)
        time_with_nano.nsec = nanoseconds
        event[k.to_s]  = format_for_flowNanoSeconds(time_with_nano)
      when :flowEndNanoseconds
        divisor = 1_000_000_000
        microseconds = (v.snapshot % 1_000_000_000) / 1_000
        nanoseconds = v.snapshot % 1_000_000_000
        time_with_nano = Time.at(v.snapshot / divisor, microseconds)
        time_with_nano.nsec = nanoseconds
        event[k.to_s]  = format_for_flowNanoSeconds(time_with_nano)
      else
        event[k.to_s] = v.snapshot
      end
    end

    if sampler_id = r['flow_sampler_id']
      sampler_key = "#{host}|#{pdu.source_id}|#{sampler_id}"
      if sampler = @samplers_v9[sampler_key]
        event['sampling_algorithm'] ||= sampler['flow_sampler_mode']
        event['sampling_interval'] ||= sampler['flow_sampler_random_interval']
      end
    end

    block.call(time, event)
  end
end
handle_v9_flowset_options_template(host, pdu, flowset) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 238
def handle_v9_flowset_options_template(host, pdu, flowset)
  flowset.flowset_data.templates.each do |template|
    catch (:field) do
      template_fields = []

      NETFLOW_V9_FIELD_CATEGORIES.each do |category|
        template["#{category}_fields"].each do |field|
          entry = netflow_field_for(field.field_type, field.field_length, category)
          throw :field unless entry

          template_fields << entry
        end
      end

      # We get this far, we have a list of fields
      key = "#{host}|#{pdu.source_id}|#{template.template_id}"
      @templates[key, @cache_ttl] = BinData::Struct.new(endian: :big, fields: template_fields)
      # Purge any expired templates
      @templates.cleanup!
    end
  end
end
handle_v9_flowset_template(host, pdu, flowset) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 217
def handle_v9_flowset_template(host, pdu, flowset)
  flowset.flowset_data.templates.each do |template|
    catch (:field) do
      template_fields = []
      template.template_fields.each do |field|
        entry = netflow_field_for(field.field_type, field.field_length)
        throw :field unless entry

        template_fields << entry
      end
      # We get this far, we have a list of fields
      key = "#{host}|#{pdu.source_id}|#{template.template_id}"
      @templates[key, @cache_ttl] = BinData::Struct.new(endian: :big, fields: template_fields)
      # Purge any expired templates
      @templates.cleanup!
    end
  end
end
ipv4_addr_to_string(uint32) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 66
def ipv4_addr_to_string(uint32)
  "#{(uint32 & 0xff000000) >> 24}.#{(uint32 & 0x00ff0000) >> 16}.#{(uint32 & 0x0000ff00) >> 8}.#{uint32 & 0x000000ff}"
end
is_sampler?(record) click to toggle source

covers Netflow v9 and v10 (a.k.a IPFIX)

# File lib/fluent/plugin/parser_netflow.rb, line 393
def is_sampler?(record)
  record['flow_sampler_id'] && record['flow_sampler_mode'] && record['flow_sampler_random_interval']
end
msec_from_boot_to_time(msec, uptime, current_unix_time, current_nsec) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 70
def msec_from_boot_to_time(msec, uptime, current_unix_time, current_nsec)
  millis = uptime - msec
  seconds = current_unix_time - (millis / 1000)
  micros = (current_nsec / 1000) - ((millis % 1000) * 1000)
  if micros < 0
    seconds -= 1
    micros += 1000000
  end
  Time.at(seconds, micros)
end
netflow_field_for(type, length, category = 'option'.freeze) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 368
def netflow_field_for(type, length, category = 'option'.freeze)
  unless field = @template_fields[category][type]
    $log.warn "Skip unsupported field", type: type, length: length
    return [:skip, nil, {length: length}]
  end

  unless field.is_a?(Array)
    $log.warn "Skip non-Array definition", field: field
    return [:skip, nil, {length: length}]
  end

  # Small bit of fixup for numeric value, :skip or :string field length, which are dynamic
  case field[0]
  when Integer
    [uint_field(length, field[0]), field[1]]
  when :skip
    field + [nil, {length: length}]
  when :string
    field + [{length: length, trim_padding: true}]
  else
    field
  end
end
register_sampler_v9(key, sampler) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 397
def register_sampler_v9(key, sampler)
  @samplers_v9[key, @cache_ttl] = sampler
  @samplers_v9.cleanup!
end
uint_field(length, default) click to toggle source
# File lib/fluent/plugin/parser_netflow.rb, line 363
def uint_field(length, default)
  # If length is 4, return :uint32, etc. and use default if length is 0
  ("uint" + (((length > 0) ? length : default) * 8).to_s).to_sym
end