class Fluent::QQWryOutput
Constants
- QQWRY_KEYS
- REGEXP_JSON
- REGEXP_PLACEHOLDER_SCAN
- REGEXP_PLACEHOLDER_SINGLE
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qqwry.rb, line 30 def initialize require 'qqwry' require 'yajl' super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qqwry.rb, line 37 def configure(conf) super @map = {} @qqwry_lookup_key = @qqwry_lookup_key.split(/\s*,\s*/) # enable_key_* format (legacy format) conf.keys.select{|k| k =~ /^enable_key_/}.each do |key| qqwry_key = key.sub('enable_key_','') raise Fluent::ConfigError, "qqwry: unsupported key #{qqwry_key}" unless QQWRY_KEYS.include?(qqwry_key) @qqwry_lookup_key.zip(conf[key].split(/\s*,\s*/)).each do |lookup_field,record_key| if record_key.nil? raise Fluent::ConfigError, "qqwry: missing value found at '#{key} #{lookup_field}'" end @map.store(record_key, "${#{qqwry_key}['#{lookup_field}']}") end end if conf.keys.select{|k| k =~ /^enable_key_/}.size > 0 log.warn "qqwry: 'enable_key_*' config format is obsoleted. use <record></record> directive for now." log.warn "qqwry: for further details referable to https://github.com/fakechris/fluent-plugin-qqwry" end # <record></record> directive conf.elements.select { |element| element.name == 'record' }.each { |element| element.each_pair { |k, v| element.has_key?(k) # to suppress unread configuration warning @map[k] = v validate_json = Proc.new { begin dummy_text = Yajl::Encoder.encode('dummy_text') Yajl::Parser.parse(v.gsub(REGEXP_PLACEHOLDER_SCAN, dummy_text)) rescue Yajl::ParseError => e raise Fluent::ConfigError, "qqwry: failed to parse '#{v}' as json." end } validate_json.call if v.match(REGEXP_JSON) } } @placeholder_keys = @map.values.join.scan(REGEXP_PLACEHOLDER_SCAN).map{ |placeholder| placeholder[0] }.uniq @placeholder_keys.each do |key| qqwry_key = key.match(REGEXP_PLACEHOLDER_SINGLE)[:qqwry_key] raise Fluent::ConfigError, "qqwry: unsupported key #{qqwry_key}" unless QQWRY_KEYS.include?(qqwry_key) end @placeholder_expander = PlaceholderExpander.new if ( !@tag && !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix ) raise Fluent::ConfigError, "qqwry: required at least one option of 'tag', 'remove_tag_prefix', 'remove_tag_suffix', 'add_tag_prefix', 'add_tag_suffix'." end @qqwry = QQWry::Database.new(@qqwry_database) end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 93 def format(tag, time, record) [tag, time, record].to_msgpack end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qqwry.rb, line 97 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_qqwry.rb, line 89 def start super end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 101 def write(chunk) chunk.msgpack_each do |tag, time, record| Fluent::Engine.emit(tag, time, add_qqwry_field(record)) end end
Private Instance Methods
add_qqwry_field(record)
click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 108 def add_qqwry_field(record) placeholder = create_placeholder(geolocate(get_address(record))) @map.each do |record_key, value| if value.match(REGEXP_PLACEHOLDER_SINGLE) rewrited = placeholder[value] elsif value.match(REGEXP_JSON) rewrited = value.gsub(REGEXP_PLACEHOLDER_SCAN) {|match| Yajl::Encoder.encode(placeholder[match]) } rewrited = parse_json(rewrited) else rewrited = value.gsub(REGEXP_PLACEHOLDER_SCAN, placeholder) end record.store(record_key, rewrited) end return record end
create_placeholder(geodata)
click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 158 def create_placeholder(geodata) placeholder = {} @placeholder_keys.each do |placeholder_key| position = placeholder_key.match(REGEXP_PLACEHOLDER_SINGLE) next if position.nil? or geodata[position[:record_key]].nil? placeholder.store(placeholder_key, geodata[position[:record_key]].send(position[:qqwry_key].to_sym)) end return placeholder end
geolocate(addresses)
click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 149 def geolocate(addresses) geodata = {} addresses.each do |field, ip| geo = ip.nil? ? nil : @qqwry.query(ip) geodata.store(field, geo) end return geodata end
get_address(record)
click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 135 def get_address(record) address = {} @qqwry_lookup_key.each do |field| key = field.split('.') obj = record key.each {|k| break obj = nil if not obj.has_key?(k) obj = obj[k] } address.store(field, obj) end return address end
parse_json(message)
click to toggle source
# File lib/fluent/plugin/out_qqwry.rb, line 126 def parse_json(message) begin return Yajl::Parser.parse(message) rescue Yajl::ParseError => e log.info "qqwry: failed to parse '#{message}' as json.", :error_class => e.class, :error => e.message return nil end end