class Fluent::EventCounterOutput

Attributes

counts[RW]

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_eventcounter.rb, line 9
def initialize
  super
  require 'redis'
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_eventcounter.rb, line 34
def configure(conf)
  super
  @capture_extra_replace = Regexp.new(@capture_extra_replace) if @capture_extra_replace.length > 0
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_eventcounter.rb, line 65
def format(tag, time, record)
  return '' unless record[@count_key]

  if @capture_extra_if && record[@capture_extra_if]
    extra = record[@capture_extra_if].gsub(@capture_extra_replace, '')
    [tag.gsub(@input_tag_exclude,""), [record[@count_key], extra].compact.join(':')].to_json + "\n"
  else
    [tag.gsub(@input_tag_exclude,""), record[@count_key]].to_json + "\n"
  end
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_eventcounter.rb, line 39
def start
  super
  unless @emit_only
    @redis = begin
      if @redis_sentinels.length > 0
        sentinels = @redis_sentinels.map {|host| {host: host, port: @redis_port} }
        Redis.new(
          url: "redis://#{@redis_master_group_name}",
          sentinels: sentinels,
          password: @redis_password,
          thread_safe: true,
          role: :master
        )
      else
         Redis.new(
          host: @redis_host,
          port: @redis_port,
          password: @redis_password,
          thread_safe: true,
          db: @redis_db_number
        )
      end
    end
  end
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_eventcounter.rb, line 76
def write(chunk)
  counts = Hash.new {|hash, key| hash[key] = Hash.new {|h,k| h[k] = 0 } }
  chunk.open do |io|
    items = io.read.split("\n")
    items.each do |item|
      key, event = JSON.parse(item)
      counts[key][event] += 1
    end
  end

  @redis.pipelined do
    counts.each do |tag,events|
      events.each do |event, c|
        redis_key = [@redis_output_key,tag].join(':')
        @redis.hincrby(redis_key, event, c.to_i)
      end
    end
  end unless @emit_only

  if @emit_only || @debug_emit
    counts.each do |tag, events|
      router.emit(@emit_to, Time.now, tag => events)
    end
  end

end