class Fluent::GeoFilter

Constants

DEFAULT_CITY
DEFAULT_CONNECTION_TYPE
DEFAULT_CONTINENT
DEFAULT_COUNTRY
DEFAULT_DATABASE_PATH
DEFAULT_DOWNLOAD_URL
DEFAULT_ENABLE_DOWNLOAD
DEFAULT_FIELD_DELIMITER
DEFAULT_FLATTEN
DEFAULT_GEOHIDDEN
DEFAULT_LOCALE
DEFAULT_LOCATION
DEFAULT_LOOKUP_FIELD
DEFAULT_MD5_PATH
DEFAULT_MD5_URL
DEFAULT_OUTPU_FIELD
DEFAULT_POSTAL
DEFAULT_REGISTERED_COUNTRY
DEFAULT_REPRESENTED_COUNTRY
DEFAULT_SUBDIVISIONS
DEFAULT_TRAITS

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_geo.rb, line 99
def initialize
  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_geo.rb, line 103
def configure(conf)
  super

  if enable_auto_download then
    download_database @download_url, @md5_url, @database_path, @md5_path
  end

  @database = MaxMindDB.new(@database_path)
end
download_database(download_url, md5_url, database_path, md5_path) click to toggle source
# File lib/fluent/plugin/filter_geo.rb, line 387
def download_database(download_url, md5_url, database_path, md5_path)
  # database directory
  database_dir = File.dirname database_path
  md5_dir = File.dirname md5_path

  # create database directory if directory does not exist.
  FileUtils.mkdir_p(database_dir) unless File.exist?(database_dir)
  FileUtils.mkdir_p(md5_dir) unless File.exist?(md5_dir)

  # create empty md5 file if file does not exist.
  File.open(md5_path, 'wb').close() unless File.exist?(md5_path)

  # read saved md5
  current_md5 = nil
  begin
    open(md5_path, 'rb') do |data|
      current_md5 = data.read
    end
    log.info "Current MD5: %s" % current_md5
  rescue => e
    log.warn e.message
  end

  # fetch md5
  fetched_md5 = nil
  begin
    open(md5_url, 'rb') do |data|
      fetched_md5 = data.read
    end
    log.info "Fetched MD5: %s" % fetched_md5
  rescue => e
    log.warn e.message
  end

  # check md5
  unless current_md5 == fetched_md5 then
    # download new database
    download_path = database_dir + '/' + File.basename(download_url)
    begin
      log.info "Download: %s" % download_url
      open(download_path, 'wb') do |output|
        open(download_url, 'rb') do |data|
          output.write(data.read)
        end
      end
      log.info "Download done: %s" % download_path
    rescue => e
      log.warn e.message
    end

    # unzip new database temporaly
    tmp_database_path = database_dir + '/tmp_' + File.basename(database_path)
    begin
      log.info "Unzip: %s" % download_path
      open(tmp_database_path, 'wb') do |output|
        Zlib::GzipReader.open(download_path) do |gz|
          output.write(gz.read)
        end
      end
      log.info "Unzip done: %s" % tmp_database_path
    rescue => e
      puts e.message
    end

    # check mkd5
    temp_md5 = Digest::MD5.hexdigest(File.open(tmp_database_path, 'rb').read)
    log.info "New MD5: %s" % temp_md5
    if fetched_md5 == temp_md5 then
      log.info "Rename: %s to %s" % [tmp_database_path, database_path]
      FileUtils.mv(tmp_database_path, database_path)
      log.info "Rename done: %s to %s" % [tmp_database_path, database_path]

      # record new md5
      log.info "Save: %s" % md5_path
      File.write(md5_path, fetched_md5)
      log.info "Save done: %s" % md5_path
    else
      log.info "MD5 missmatch: Fetched MD5 (%s) != New MD5 (%s) ; " % [fetched_md5, temp_md5]
    end
  end
end
filter(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter_geo.rb, line 113
def filter(tag, time, record)
  ip = record[@lookup_field]

  unless ip.nil? then
    geoip = @database.lookup(ip)

    if geoip.found? then
      if @continent then
        continent_hash = {}

        unless geoip.continent.code.nil? then
          continent_hash['code'] = geoip.continent.code
        end
        if @geohidden == true then
          unless geoip.continent.geoname_id.nil? then
            continent_hash['geoname_id'] = geoip.continent.geoname_id
          end
        end
        unless geoip.continent.iso_code.nil? then
          continent_hash['iso_code'] = geoip.continent.iso_code
        end
        unless geoip.continent.name(@locale).nil? then
          continent_hash['name'] = geoip.continent.name(@locale)
        end

        unless continent_hash.empty? then
          if @flatten then
            record.merge!(to_flatten(continent_hash, [@output_field, 'continent'], @field_delimiter))
          else
            record[[@output_field, 'continent'].join(@field_delimiter)] = continent_hash.to_json
          end
        end
      end

      if @country then
        country_hash = {}

        unless geoip.country.code.nil? then
          country_hash['code'] = geoip.country.code
        end
        if @geohidden == true then
          unless geoip.country.geoname_id.nil? then
            country_hash['geoname_id'] = geoip.country.geoname_id
          end
        end
        unless geoip.country.iso_code.nil? then
          country_hash['iso_code'] = geoip.country.iso_code
        end
        unless geoip.country.name(@locale).nil? then
          country_hash['name'] = geoip.country.name(@locale)
        end

        unless country_hash.empty? then
          if @flatten then
            record.merge!(to_flatten(country_hash, [@output_field, 'country'], @field_delimiter))
          else
            record[[@output_field, 'country'].join(@field_delimiter)] = country_hash.to_json
          end
        end
      end

      if @city then
        city_hash = {}

        unless geoip.city.code.nil? then
          city_hash['code'] = geoip.city.code
        end
        if @geohidden == true then
          unless geoip.city.geoname_id.nil? then
            city_hash['geoname_id'] = geoip.city.geoname_id
          end
        end
        unless geoip.city.iso_code.nil? then
          city_hash['iso_code'] = geoip.city.iso_code
        end
        unless geoip.city.name(@locale).nil? then
          city_hash['name'] = geoip.city.name(@locale)
        end

        unless city_hash.empty? then
          if @flatten then
            record.merge!(to_flatten(city_hash, [@output_field, 'city'], @field_delimiter))
          else
            record[[@output_field, 'city'].join(@field_delimiter)] = city_hash.to_json
          end
        end
      end

      if @location then
        location_hash = {}

        unless geoip.location.latitude.nil? | geoip.location.longitude.nil? then
          location_hash['location'] = geoip.location.longitude, geoip.location.latitude
        end
        if @geohidden == true then
          unless geoip.location.metro_code.nil? then
            location_hash['metro_code'] = geoip.location.metro_code
          end
        end
        unless geoip.location.time_zone.nil? then
          location_hash['time_zone'] = geoip.location.time_zone
        end

        unless location_hash.empty? then
          if @flatten then
            record.merge!(to_flatten(location_hash, [@output_field, 'location'], @field_delimiter))
          else
            record[[@output_field, 'location'].join(@field_delimiter)] = location_hash.to_json
          end
        end
      end

      if @postal then
        postal_hash = {}

        unless geoip.postal.code.nil? then
          postal_hash['code'] = geoip.postal.code
        end

        unless postal_hash.empty? then
          if @flatten then
            record.merge!(to_flatten(postal_hash, [@output_field, 'postal'], @field_delimiter))
          else
            record[[@output_field, 'postal'].join(@field_delimiter)] = postal_hash.to_json
          end
        end
      end

      if @registered_country then
        registered_country_hash = {}

        unless geoip.registered_country.code.nil? then
          registered_country_hash['code'] = geoip.registered_country.code
        end
        if @geohidden == true then
          unless geoip.registered_country.geoname_id.nil? then
            registered_country_hash['geoname_id'] = geoip.registered_country.geoname_id
          end
        end
        unless geoip.registered_country.iso_code.nil? then
          registered_country_hash['iso_code'] = geoip.registered_country.iso_code
        end
        unless geoip.registered_country.name(@locale).nil? then
          registered_country_hash['name'] = geoip.registered_country.name(@locale)
        end

        unless registered_country_hash.empty? then
          if @flatten then
            record.merge!(to_flatten(registered_country_hash, [@output_field, 'registered_country'], @field_delimiter))
          else
            record[[@output_field, 'registered_country'].join(@field_delimiter)] = registered_country_hash.to_json
          end
        end
      end

      if @represented_country then
        represented_country_hash = {}

        unless geoip.represented_country.code.nil? then
          represented_country_hash['code'] = geoip.represented_country.code
        end
        if @geohidden == true then
          unless geoip.represented_country.geoname_id.nil? then
            represented_country_hash['geoname_id'] = geoip.represented_country.geoname_id
          end
        end
        unless geoip.represented_country.iso_code.nil? then
          represented_country_hash['iso_code'] = geoip.represented_country.iso_code
        end
        unless geoip.represented_country.name(@locale).nil? then
          represented_country_hash['name'] = geoip.represented_country.name(@locale)
        end

        unless represented_country_hash.empty? then
          if @flatten then
            record.merge!(to_flatten(represented_country_hash, [@output_field, 'represented_country'], @field_delimiter))
          else
            record[[@output_field, 'represented_country'].join(@field_delimiter)] = represented_country_hash.to_json
          end
        end
      end

      if @subdivisions then
        subdivision_arry = []

        i = 0
        geoip.subdivisions.each do |subdivision|
          subdivision_hash = {}

          unless subdivision.code.nil? then
            subdivision_hash['code'] = subdivision.code
          end
          if @geohidden == true then
            unless subdivision.geoname_id.nil? then
              subdivision_hash['geoname_id'] = subdivision.geoname_id
            end
          end
          unless subdivision.iso_code.nil? then
            subdivision_hash['iso_code'] = subdivision.iso_code
          end
          unless subdivision.name(@locale).nil? then
            subdivision_hash['name'] = subdivision.name(@locale)
          end

          unless subdivision_hash.empty? then
            subdivision_arry.push(subdivision_hash)
          end

          i = i + 1
        end

        unless subdivision_arry.empty? then
          if @flatten then
            i = 0
            subdivision_arry.each do |subdivision|
              record.merge!(to_flatten(subdivision, [@output_field, 'subdivisions', i.to_s], @field_delimiter))
              i = i + 1
            end
          else
            record[[@output_field, 'subdivisions'].join(@field_delimiter)] = subdivision_arry.to_json
          end
        end
      end

      if @traits then
        traits_hash = {}

        unless geoip.traits.is_anonymous_proxy.nil? then
          traits_hash['is_anonymous_proxy'] = geoip.traits.is_anonymous_proxy
        end
        unless geoip.traits.is_satellite_provider.nil? then
          traits_hash['is_satellite_provider'] = geoip.traits.is_satellite_provider
        end

        unless traits_hash.empty? then
          if @flatten then
            record.merge!(to_flatten(traits_hash, [@output_field, 'traits'], @field_delimiter))
          else
            record[[@output_field, 'traits'].join(@field_delimiter)] = traits_hash.to_json
          end
        end
      end

      if @connection_type then
        unless geoip.connection_type.nil? then
          record[[@output_field, 'connection_type'].join(@field_delimiter)] = geoip.connection_type
        end
      end
    else
      log.warn "It was not possible to look up the #{ip}."
    end
  end

  return record
end
to_flatten(hash, stack=[], delimiter='/') click to toggle source
# File lib/fluent/plugin/filter_geo.rb, line 369
def to_flatten(hash, stack=[], delimiter='/')
  output = {}

  hash.keys.each do |key|
    stack.push key

    if hash[key].instance_of?(Hash) then
      output.merge!(to_flatten(hash[key], stack, delimiter))
    else
      output[stack.join(delimiter)] = hash[key]
    end

    stack.pop
  end

  return output
end