class Fluent::QQWryOutput

Constants

QQWRY_KEYS
REGEXP_JSON
REGEXP_PLACEHOLDER_SCAN
REGEXP_PLACEHOLDER_SINGLE

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qqwry.rb, line 30
def initialize
  require 'qqwry'
  require 'yajl'

  super
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qqwry.rb, line 37
def configure(conf)
  super

  @map = {}
  @qqwry_lookup_key = @qqwry_lookup_key.split(/\s*,\s*/)

  # enable_key_* format (legacy format)
  conf.keys.select{|k| k =~ /^enable_key_/}.each do |key|
    qqwry_key = key.sub('enable_key_','')
    raise Fluent::ConfigError, "qqwry: unsupported key #{qqwry_key}" unless QQWRY_KEYS.include?(qqwry_key)
    @qqwry_lookup_key.zip(conf[key].split(/\s*,\s*/)).each do |lookup_field,record_key|
      if record_key.nil?
        raise Fluent::ConfigError, "qqwry: missing value found at '#{key} #{lookup_field}'"
      end
      @map.store(record_key, "${#{qqwry_key}['#{lookup_field}']}")
    end
  end
  if conf.keys.select{|k| k =~ /^enable_key_/}.size > 0
    log.warn "qqwry: 'enable_key_*' config format is obsoleted. use <record></record> directive for now."
    log.warn "qqwry: for further details referable to https://github.com/fakechris/fluent-plugin-qqwry"
  end

  # <record></record> directive
  conf.elements.select { |element| element.name == 'record' }.each { |element|
    element.each_pair { |k, v|
      element.has_key?(k) # to suppress unread configuration warning
      @map[k] = v
      validate_json = Proc.new {
        begin
          dummy_text = Yajl::Encoder.encode('dummy_text')
          Yajl::Parser.parse(v.gsub(REGEXP_PLACEHOLDER_SCAN, dummy_text))
        rescue Yajl::ParseError => e
          raise Fluent::ConfigError, "qqwry: failed to parse '#{v}' as json."
        end
      }
      validate_json.call if v.match(REGEXP_JSON)
    }
  }
  @placeholder_keys = @map.values.join.scan(REGEXP_PLACEHOLDER_SCAN).map{ |placeholder| placeholder[0] }.uniq
  @placeholder_keys.each do |key|
    qqwry_key = key.match(REGEXP_PLACEHOLDER_SINGLE)[:qqwry_key]
    raise Fluent::ConfigError, "qqwry: unsupported key #{qqwry_key}" unless QQWRY_KEYS.include?(qqwry_key)
  end
  @placeholder_expander = PlaceholderExpander.new

  if ( !@tag && !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix )
    raise Fluent::ConfigError, "qqwry: required at least one option of 'tag', 'remove_tag_prefix', 'remove_tag_suffix', 'add_tag_prefix', 'add_tag_suffix'."
  end

  @qqwry = QQWry::Database.new(@qqwry_database)
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 93
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
shutdown() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qqwry.rb, line 97
def shutdown
  super
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qqwry.rb, line 89
def start
  super
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 101
def write(chunk)
  chunk.msgpack_each do |tag, time, record|
    Fluent::Engine.emit(tag, time, add_qqwry_field(record))
  end
end

Private Instance Methods

add_qqwry_field(record) click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 108
def add_qqwry_field(record)
  placeholder = create_placeholder(geolocate(get_address(record)))
  @map.each do |record_key, value|
    if value.match(REGEXP_PLACEHOLDER_SINGLE)
      rewrited = placeholder[value]
    elsif value.match(REGEXP_JSON)
      rewrited = value.gsub(REGEXP_PLACEHOLDER_SCAN) {|match|
        Yajl::Encoder.encode(placeholder[match])
      }
      rewrited = parse_json(rewrited)
    else
      rewrited = value.gsub(REGEXP_PLACEHOLDER_SCAN, placeholder)
    end
    record.store(record_key, rewrited)
  end
  return record
end
create_placeholder(geodata) click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 158
def create_placeholder(geodata)
  placeholder = {}
  @placeholder_keys.each do |placeholder_key|
    position = placeholder_key.match(REGEXP_PLACEHOLDER_SINGLE)
    next if position.nil? or geodata[position[:record_key]].nil?
    placeholder.store(placeholder_key, geodata[position[:record_key]].send(position[:qqwry_key].to_sym))
  end
  return placeholder
end
geolocate(addresses) click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 149
def geolocate(addresses)
  geodata = {}
  addresses.each do |field, ip|
    geo = ip.nil? ? nil : @qqwry.query(ip)
    geodata.store(field, geo)
  end
  return geodata
end
get_address(record) click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 135
def get_address(record)
  address = {}
  @qqwry_lookup_key.each do |field|
    key = field.split('.')
    obj = record
    key.each {|k|
      break obj = nil if not obj.has_key?(k)
      obj = obj[k]
    }
    address.store(field, obj)
  end
  return address
end
parse_json(message) click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 126
def parse_json(message)
  begin
    return Yajl::Parser.parse(message)
  rescue Yajl::ParseError => e
    log.info "qqwry: failed to parse '#{message}' as json.", :error_class => e.class, :error => e.message
    return nil
  end
end