class Fluent::QueryCombinerOutput

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_query_combiner.rb, line 23
def initialize
  super
  require 'redis'
  require 'msgpack'
  require 'json'
  require 'rubygems'
  require 'time'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_query_combiner.rb, line 32
def configure(conf)
  super
  @host = conf.has_key?('host') ? conf['host'] : 'localhost'
  @port = conf.has_key?('port') ? conf['port'].to_i : 6379
  @db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil

  @query_identify = @query_identify.split(',').map { |qid| qid.strip }

  # functions for time format
  def create_time_formatter(expr)
    begin
      f = eval('lambda {|__arg_time__| ' + expr.gsub("$time", "__arg_time__") + '}')
      return f
    rescue SyntaxError
      raise Fluent::ConfigError, "SyntaxError at time_format `#{expr}`"
    end
  end
  @_time_formatter = create_time_formatter(@time_format)

  @_time_keys = {}

  # Create functions for each conditions
  @_cond_funcs = {}
  @_replace_keys = {
    'catch' => {},
    'dump' => {},
  }

  def get_arguments(eval_str)
    eval_str.scan(/[\"\']?[a-zA-Z][\w\d\.\-\_]*[\"\']?/).uniq.select{|x|
      not (x.start_with?('\'') or x.start_with?('\"')) and \
      not %w{and or xor not}.include? x
    }
  end

  def parse_replace_expr(element_name, condition_name, str)
    result = {}
    str.split(',').each{|cond|
      before, after = cond.split('=>').map{|var| var.strip}
      result[before] = after
      if not (before.length > 0 and after.length > 0)
        raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}"
      end
    }
    if result.none?
      raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}"
    end
    result
  end

  def create_func(var, expr)
    begin
      f_argv = get_arguments(expr)
      f = eval('lambda {|' + f_argv.join(',') + '| ' + expr + '}')
      return [f, f_argv]
    rescue SyntaxError
      raise Fluent::ConfigError, "SyntaxError at condition `#{var}`: #{expr}"
    end
  end
  conf.elements.select { |element|
    %w{catch prolong dump release}.include? element.name
  }.each { |element|
    element.each_pair { |var, expr|
      element.has_key?(var)   # to suppress unread configuration warning

      if var == 'condition'
        formula, f_argv = create_func(var, expr)
        @_cond_funcs[element.name] = [f_argv, formula]

      elsif var == 'replace'
        if %w{catch dump}.include? element.name
          @_replace_keys[element.name] = parse_replace_expr(element.name, var, expr)
        else
          raise Fluent::ConfigError, "`replace` configuration in #{element.name}: only allowed in `catch` and `dump`"
        end

      elsif var == 'time'
        if %w{catch dump}.include? element.name
          @_time_keys[element.name] = expr
        else
          raise Fluent::ConfigError, "`time` configuration in #{element.name}: only allowed in `catch` and `dump`"
        end

      else
        raise Fluent::ConfigError, "Unknown configuration `#{var}` in #{element.name}"
      end
    }
  }

  if not (@_cond_funcs.has_key?('catch') and @_cond_funcs.has_key?('dump'))
    raise Fluent::ConfigError, "Must have <catch> and <dump> blocks"
  end
end
create_func(var, expr) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 82
def create_func(var, expr)
  begin
    f_argv = get_arguments(expr)
    f = eval('lambda {|' + f_argv.join(',') + '| ' + expr + '}')
    return [f, f_argv]
  rescue SyntaxError
    raise Fluent::ConfigError, "SyntaxError at condition `#{var}`: #{expr}"
  end
end
create_time_formatter(expr) click to toggle source

functions for time format

# File lib/fluent/plugin/out_query_combiner.rb, line 41
def create_time_formatter(expr)
  begin
    f = eval('lambda {|__arg_time__| ' + expr.gsub("$time", "__arg_time__") + '}')
    return f
  rescue SyntaxError
    raise Fluent::ConfigError, "SyntaxError at time_format `#{expr}`"
  end
end
do_catch(qid, record, time) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 187
def do_catch(qid, record, time)
  # replace record keys
  @_replace_keys['catch'].each_pair { |before, after|
    record[after] = record[before]
    record.delete(before)
  }
  # add time key if configured
  if @_time_keys.has_key? 'catch'
    record[@_time_keys['catch']] = @_time_formatter.call(time)
  end

  # save record
  tryOnRedis 'set',    @redis_key_prefix + qid, JSON.dump(record)
  # update qid's timestamp
  tryOnRedis 'zadd', @redis_key_prefix, time, qid
  tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl
end
do_dump(qid, record, time) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 213
def do_dump(qid, record, time)
  if (tryOnRedis 'exists', @redis_key_prefix + qid)
    # replace record keys
    @_replace_keys['dump'].each_pair { |before, after|
      record[after] = record[before]
      record.delete(before)
    }

    # add time key if configured
    if @_time_keys.has_key? 'dump'
      record[@_time_keys['dump']] = @_time_formatter.call(time)
    end

    # emit
    catched_record = JSON.load(tryOnRedis('get', @redis_key_prefix + qid))
    combined_record = catched_record.merge(record)
    Fluent::Engine.emit @tag, Fluent::Engine.now, combined_record

    # remove qid
    if not @continuous_dump
      do_release(qid)
    else
      # continuous_dump will prolong qid's TTL.
      tryOnRedis 'zadd', @redis_key_prefix, time, qid
      tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl
    end

  end
end
do_prolong(qid, time) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 205
def do_prolong(qid, time)
  if (tryOnRedis 'exists', @redis_key_prefix + qid)
    # update qid's timestamp
    tryOnRedis 'zadd', @redis_key_prefix, time, qid
    tryOnRedis 'expire', @redis_key_prefix + qid, @query_ttl
  end
end
do_release(qid) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 243
def do_release(qid)
  tryOnRedis 'del', @redis_key_prefix + qid
  tryOnRedis 'zrem', @redis_key_prefix, qid
end
exec_func(record, f_argv, formula) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 135
def exec_func(record, f_argv, formula)
  argv = []
  f_argv.each {|v|
    argv.push(record[v])
  }
  return formula.call(*argv)
end
extract_qid(record) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 248
def extract_qid(record)
  qid = []
  @query_identify.each { |attr|
    if record.has_key?(attr)
      qid.push(record[attr])
    else
      return nil
    end
  }
  qid.join(':')
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 183
def format(tag, time, record)
  [tag, time, record].to_msgpack
end
get_arguments(eval_str) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 60
def get_arguments(eval_str)
  eval_str.scan(/[\"\']?[a-zA-Z][\w\d\.\-\_]*[\"\']?/).uniq.select{|x|
    not (x.start_with?('\'') or x.start_with?('\"')) and \
    not %w{and or xor not}.include? x
  }
end
has_all_keys?(record, argv) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 126
def has_all_keys?(record, argv)
  argv.each {|var|
    if not record.has_key?(var)
      return false
    end
  }
  true
end
parse_replace_expr(element_name, condition_name, str) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 67
def parse_replace_expr(element_name, condition_name, str)
  result = {}
  str.split(',').each{|cond|
    before, after = cond.split('=>').map{|var| var.strip}
    result[before] = after
    if not (before.length > 0 and after.length > 0)
      raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}"
    end
  }
  if result.none?
    raise Fluent::ConfigError, "SyntaxError at replace condition `#{element_name}`: #{condition_name}"
  end
  result
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 162
def shutdown
  @redis.quit
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_query_combiner.rb, line 143
def start
  super

  begin
    gem "hiredis"
    @redis = Redis.new(
        :host => @host, :port => @port, :driver => :hiredis,
        :thread_safe => true, :db => @db_index
    )
  rescue LoadError
    @redis = Redis.new(
        :host => @host, :port => @port,
        :thread_safe => true, :db => @db_index
    )
  end

  start_watch
end
start_watch() click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 179
def start_watch
  @watcher = Thread.new(&method(:watch))
end
tryOnRedis(method, *args) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 166
def tryOnRedis(method, *args)
  tries = 0
  begin
    @redis.send(method, *args) if @redis.respond_to? method
  rescue Redis::CommandError => e
    tries += 1
    # retry 3 times
    retry if tries <= @redis_retry
    $log.warn %Q[redis command retry failed : #{method}(#{args.join(', ')})]
    raise e.message
  end
end
watch() click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 288
def watch
  @last_checked = Fluent::Engine.now
  tick = @remove_interval
  while true
    sleep 0.5
    if Fluent::Engine.now - @last_checked >= tick
      now = Fluent::Engine.now
      to_expire = now - @query_ttl

      # Delete expired qids
      tryOnRedis 'zremrangebyscore', @redis_key_prefix, '-inf', to_expire

      # Delete buffer_size over qids
      tryOnRedis 'zremrangebyrank', @redis_key_prefix, 0, -@buffer_size

      @last_checked = now
    end
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_query_combiner.rb, line 260
def write(chunk)

  begin
    chunk.msgpack_each do |(tag, time, record)|
      if (qid = extract_qid record)

        @_cond_funcs.each_pair { |cond, argv_and_func|
          argv, func = argv_and_func
          if exec_func(record, argv, func)
            case cond
            when "catch"
              do_catch(qid, record, time)
            when "prolong"
              do_prolong(qid, time)
            when "dump"
              do_dump(qid, record, time)
            when "release"
              do_release(qid)
            end
            break   # very important!
          end
        }
      end
    end

  end
end