class Fluent::EventCounterOutput
Attributes
counts[RW]
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_eventcounter.rb, line 9 def initialize super require 'redis' end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_eventcounter.rb, line 34 def configure(conf) super @capture_extra_replace = Regexp.new(@capture_extra_replace) if @capture_extra_replace.length > 0 end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_eventcounter.rb, line 65 def format(tag, time, record) return '' unless record[@count_key] if @capture_extra_if && record[@capture_extra_if] extra = record[@capture_extra_if].gsub(@capture_extra_replace, '') [tag.gsub(@input_tag_exclude,""), [record[@count_key], extra].compact.join(':')].to_json + "\n" else [tag.gsub(@input_tag_exclude,""), record[@count_key]].to_json + "\n" end end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_eventcounter.rb, line 39 def start super unless @emit_only @redis = begin if @redis_sentinels.length > 0 sentinels = @redis_sentinels.map {|host| {host: host, port: @redis_port} } Redis.new( url: "redis://#{@redis_master_group_name}", sentinels: sentinels, password: @redis_password, thread_safe: true, role: :master ) else Redis.new( host: @redis_host, port: @redis_port, password: @redis_password, thread_safe: true, db: @redis_db_number ) end end end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_eventcounter.rb, line 76 def write(chunk) counts = Hash.new {|hash, key| hash[key] = Hash.new {|h,k| h[k] = 0 } } chunk.open do |io| items = io.read.split("\n") items.each do |item| key, event = JSON.parse(item) counts[key][event] += 1 end end @redis.pipelined do counts.each do |tag,events| events.each do |event, c| redis_key = [@redis_output_key,tag].join(':') @redis.hincrby(redis_key, event, c.to_i) end end end unless @emit_only if @emit_only || @debug_emit counts.each do |tag, events| router.emit(@emit_to, Time.now, tag => events) end end end