class Fluent::ElapsedTimeOutput

Attributes

outputs[R]

for test

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elapsed_time.rb, line 33
def initialize
  super
  @outputs = []
  @elapsed = {}
  @emit_procs = []
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_elapsed_time.rb, line 47
def configure(conf)
  super
  conf.elements.select {|e|
    e.name == 'store'
  }.each {|e|
    type = e['type']
    unless type
      raise ConfigError, "Missing 'type' parameter on <store> directive"
    end
    log.debug "adding store type=#{type.dump}"

    output = Plugin.new_output(type)
    output.configure(e)
    emit_proc = if output.respond_to?(:emit_events)
                  Proc.new {|output, tag, es, _chain| output.emit_events(tag, es)}
                else
                  Proc.new {|output, tag, es, _chain| output.emit(tag, es, NullOutputChain.instance)}
                end
    @emit_procs << emit_proc
    @outputs << output
  }

  case @aggregate
  when 'all'
    raise ConfigError, "out_elapsed_time: `tag` must be specified with aggregate all" if @tag.nil?
  when 'tag'
    raise ConfigError, "out_elapsed_time: `add_tag_prefix` or `remove_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? and @remove_tag_prefix.nil?
  else
    raise ConfigError, "out_elapsed_time: aggregate allows `tag` or `all`"
  end

  @tag_proc = tag_proc

  @emit_proc =
    if @each == :message
      self.method(:emit_message)
    else
      self.method(:emit_es)
    end
end
elapsed(tag = "elapsed") click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 43
def elapsed(tag = "elapsed") # default: @tag
  @elapsed[tag] ||= []
end
emit(tag, es, chain) click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 88
def emit(tag, es, chain)
  @emit_proc.call(tag, es)
  chain.next
end
emit_es(tag, es) click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 107
def emit_es(tag, es)
  chain = NullOutputChain.instance
  t = Time.now
  @outputs.each_with_index {|output, idx|
    @emit_procs[idx].call(output, tag, es,chain)
  }
  emit_tag = @tag_proc.call(tag)
  elapsed(emit_tag) << (Time.now - t).to_f
end
emit_message(tag, es) click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 93
def emit_message(tag, es)
  chain = NullOutputChain.instance
  start = Time.now
  es.each do |time, record|
    @outputs.each_with_index {|output, idx|
      @emit_procs[idx].call(output, tag, OneEventStream.new(time, record), chain)
    }
    finish = Time.now
    emit_tag = @tag_proc.call(tag)
    elapsed(emit_tag) << (finish - start).to_f
    start = finish
  end
end
flush_emit() click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 153
def flush_emit
  flushed_elapsed, @elapsed = @elapsed, initial_elapsed(@elapsed)
  messages = {}
  flushed_elapsed.each do |tag, elapsed|
    num = elapsed.size
    max = num == 0 ? 0 : elapsed.max
    avg = num == 0 ? 0 : elapsed.map(&:to_f).inject(:+) / num.to_f
    messages[tag] = {"max" => max, "avg" => avg, "num" => num}
  end
  messages.each {|tag, message| router.emit(tag, Engine.now, message) }
end
initial_elapsed(prev_elapsed = nil) click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 117
def initial_elapsed(prev_elapsed = nil)
  return {} if !@zero_emit or prev_elapsed.nil?
  elapsed = {}
  prev_elapsed.keys.each do |tag|
    next if prev_elapsed[tag].empty? # Prohibit to emit anymore
    elapsed[tag] = []
  end
  elapsed
end
run() click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 142
def run
  @last_checked ||= Engine.now
  while (sleep 0.1)
    now = Engine.now
    if now - @last_checked >= @interval
      flush_emit
      @last_checked = now
    end
  end
end
shutdown() click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 134
def shutdown
  @outputs.each {|o|
    o.shutdown
  }
  @thread.terminate
  @thread.join
end
start() click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 127
def start
  @outputs.each {|o|
    o.start
  }
  @thread = Thread.new(&method(:run))
end

Private Instance Methods

tag_proc() click to toggle source
# File lib/fluent/plugin/out_elapsed_time.rb, line 167
def tag_proc
  tag_slice_proc =
    if @remove_tag_slice
      lindex, rindex = @remove_tag_slice.split('..', 2)
      if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/
        raise Fluent::ConfigError, "out_elapsed_time: remove_tag_slice must be formatted like [num]..[num]"
      end
      l, r = lindex.to_i, rindex.to_i
      Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') }
    else
      Proc.new {|tag| tag }
    end

  rstrip = Proc.new {|str, substr| str.chomp(substr) }
  lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str }
  tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix
  tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix
  tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix
  tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix
  tag_fixed = @tag if @tag
  if tag_prefix_match and tag_suffix_match
    Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag_slice_proc.call(tag), tag_prefix_match), tag_suffix_match)}#{tag_suffix}" }
  elsif tag_prefix_match
    Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag_slice_proc.call(tag), tag_prefix_match)}#{tag_suffix}" }
  elsif tag_suffix_match
    Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag_slice_proc.call(tag), tag_suffix_match)}#{tag_suffix}" }
  elsif tag_prefix || @remove_tag_slice || tag_suffix
    Proc.new {|tag| "#{tag_prefix}#{tag_slice_proc.call(tag)}#{tag_suffix}" }
  else
    Proc.new {|tag| tag_fixed }
  end
end