class Fluent::QueryCombinerOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_query_combiner.rb, line 23 def initialize super require 'redis' require 'msgpack' require 'json' require 'rubygems' require 'time' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_query_combiner.rb, line 32 def configure(conf) super @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'].to_i : 6379 @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil @query_identify = @query_identify.split(',').map { |qid| qid.strip } # functions for time format def create_time_formatter(expr) begin f = eval('lambda {|__arg_time__| ' + expr.gsub("$time", "__arg_time__") + '}') return f rescue SyntaxError raise Fluent::ConfigError, "SyntaxError at time_format `#{expr}`" end end @_time_formatter = create_time_formatter(@time_format) @_time_keys = {} # Create functions for each conditions @_cond_funcs = {} @_replace_keys = { 'catch' => {}, 'dump' => {}, } def get_arguments(eval_str) eval_str.scan(/[\"\']?[a-zA-Z][\w\d\.\-\_]*[\"\']?/).uniq.select{|x| not (x.start_with?('\'') or x.start_with?('\"')) and \ not %w{and or xor not}.include? x } end def parse_replace_expr(element_name, condition_name, str) result = {} str.split(',').each{|cond| before, after = cond.split('=>').map{|var| var.strip} result[before] = after if not (before.length > 0 and after.length > 0) raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}" end } if result.none? raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}" end result end def create_func(var, expr) begin f_argv = get_arguments(expr) f = eval('lambda {|' + f_argv.join(',') + '| ' + expr + '}') return [f, f_argv] rescue SyntaxError raise Fluent::ConfigError, "SyntaxError at condition `#{var}`: #{expr}" end end conf.elements.select { |element| %w{catch prolong dump release}.include? element.name }.each { |element| element.each_pair { |var, expr| element.has_key?(var) # to suppress unread configuration warning if var == 'condition' formula, f_argv = create_func(var, expr) @_cond_funcs[element.name] = [f_argv, formula] elsif var == 'replace' if %w{catch dump}.include? element.name @_replace_keys[element.name] = parse_replace_expr(element.name, var, expr) else raise Fluent::ConfigError, "`replace` configuration in #{element.name}: only allowed in `catch` and `dump`" end elsif var == 'time' if %w{catch dump}.include? element.name @_time_keys[element.name] = expr else raise Fluent::ConfigError, "`time` configuration in #{element.name}: only allowed in `catch` and `dump`" end else raise Fluent::ConfigError, "Unknown configuration `#{var}` in #{element.name}" end } } if not (@_cond_funcs.has_key?('catch') and @_cond_funcs.has_key?('dump')) raise Fluent::ConfigError, "Must have <catch> and <dump> blocks" end end
create_func(var, expr)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 82 def create_func(var, expr) begin f_argv = get_arguments(expr) f = eval('lambda {|' + f_argv.join(',') + '| ' + expr + '}') return [f, f_argv] rescue SyntaxError raise Fluent::ConfigError, "SyntaxError at condition `#{var}`: #{expr}" end end
create_time_formatter(expr)
click to toggle source
functions for time format
# File lib/fluent/plugin/out_query_combiner.rb, line 41 def create_time_formatter(expr) begin f = eval('lambda {|__arg_time__| ' + expr.gsub("$time", "__arg_time__") + '}') return f rescue SyntaxError raise Fluent::ConfigError, "SyntaxError at time_format `#{expr}`" end end
do_catch(qid, record, time)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 187 def do_catch(qid, record, time) # replace record keys @_replace_keys['catch'].each_pair { |before, after| record[after] = record[before] record.delete(before) } # add time key if configured if @_time_keys.has_key? 'catch' record[@_time_keys['catch']] = @_time_formatter.call(time) end # save record tryOnRedis 'set', @redis_key_prefix + qid, JSON.dump(record) # update qid's timestamp tryOnRedis 'zadd', @redis_key_prefix, time, qid tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl end
do_dump(qid, record, time)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 213 def do_dump(qid, record, time) if (tryOnRedis 'exists', @redis_key_prefix + qid) # replace record keys @_replace_keys['dump'].each_pair { |before, after| record[after] = record[before] record.delete(before) } # add time key if configured if @_time_keys.has_key? 'dump' record[@_time_keys['dump']] = @_time_formatter.call(time) end # emit catched_record = JSON.load(tryOnRedis('get', @redis_key_prefix + qid)) combined_record = catched_record.merge(record) Fluent::Engine.emit @tag, Fluent::Engine.now, combined_record # remove qid if not @continuous_dump do_release(qid) else # continuous_dump will prolong qid's TTL. tryOnRedis 'zadd', @redis_key_prefix, time, qid tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl end end end
do_prolong(qid, time)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 205 def do_prolong(qid, time) if (tryOnRedis 'exists', @redis_key_prefix + qid) # update qid's timestamp tryOnRedis 'zadd', @redis_key_prefix, time, qid tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl end end
do_release(qid)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 243 def do_release(qid) tryOnRedis 'del', @redis_key_prefix + qid tryOnRedis 'zrem', @redis_key_prefix, qid end
exec_func(record, f_argv, formula)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 135 def exec_func(record, f_argv, formula) argv = [] f_argv.each {|v| argv.push(record[v]) } return formula.call(*argv) end
extract_qid(record)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 248 def extract_qid(record) qid = [] @query_identify.each { |attr| if record.has_key?(attr) qid.push(record[attr]) else return nil end } qid.join(':') end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 183 def format(tag, time, record) [tag, time, record].to_msgpack end
get_arguments(eval_str)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 60 def get_arguments(eval_str) eval_str.scan(/[\"\']?[a-zA-Z][\w\d\.\-\_]*[\"\']?/).uniq.select{|x| not (x.start_with?('\'') or x.start_with?('\"')) and \ not %w{and or xor not}.include? x } end
has_all_keys?(record, argv)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 126 def has_all_keys?(record, argv) argv.each {|var| if not record.has_key?(var) return false end } true end
parse_replace_expr(element_name, condition_name, str)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 67 def parse_replace_expr(element_name, condition_name, str) result = {} str.split(',').each{|cond| before, after = cond.split('=>').map{|var| var.strip} result[before] = after if not (before.length > 0 and after.length > 0) raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}" end } if result.none? raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}" end result end
shutdown()
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 162 def shutdown @redis.quit end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_query_combiner.rb, line 143 def start super begin gem "hiredis" @redis = Redis.new( :host => @host, :port => @port, :driver => :hiredis, :thread_safe => true, :db => @db_index ) rescue LoadError @redis = Redis.new( :host => @host, :port => @port, :thread_safe => true, :db => @db_index ) end start_watch end
start_watch()
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 179 def start_watch @watcher = Thread.new(&method(:watch)) end
tryOnRedis(method, *args)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 166 def tryOnRedis(method, *args) tries = 0 begin @redis.send(method, *args) if @redis.respond_to? method rescue Redis::CommandError => e tries += 1 # retry 3 times retry if tries <= @redis_retry $log.warn %Q[redis command retry failed : #{method}(#{args.join(', ')})] raise e.message end end
watch()
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 288 def watch @last_checked = Fluent::Engine.now tick = @remove_interval while true sleep 0.5 if Fluent::Engine.now - @last_checked >= tick now = Fluent::Engine.now to_expire = now - @query_ttl # Delete expired qids tryOnRedis 'zremrangebyscore', @redis_key_prefix, '-inf', to_expire # Delete buffer_size over qids tryOnRedis 'zremrangebyrank', @redis_key_prefix, 0, -@buffer_size @last_checked = now end end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 260 def write(chunk) begin chunk.msgpack_each do |(tag, time, record)| if (qid = extract_qid record) @_cond_funcs.each_pair { |cond, argv_and_func| argv, func = argv_and_func if exec_func(record, argv, func) case cond when "catch" do_catch(qid, record, time) when "prolong" do_prolong(qid, time) when "dump" do_dump(qid, record, time) when "release" do_release(qid) end break # very important! end } end end end end