module Fluent::FlowcounterSimple
Attributes
last_checked[RW]
Public Class Methods
included(klass)
click to toggle source
# File lib/fluent/plugin/flowcounter_simple.rb, line 7 def self.included(klass) klass.helpers :thread klass.config_param :indicator, :string, :default => 'num' klass.config_param :unit, :string, :default => 'second' klass.config_param :comment, :string, :default => nil end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/flowcounter_simple.rb, line 14 def configure(conf) super @indicator_proc = case @indicator when 'num' then Proc.new { |es| es.size } when 'byte' then Proc.new { |es| count = 0 es.each { |time, record| count += record.to_msgpack.size } count } else raise Fluent::ConfigError, "flowcounter-simple count allows num/byte" end @unit = case @unit when 'second' then :second when 'minute' then :minute when 'hour' then :hour when 'day' then :day else raise Fluent::ConfigError, "flowcounter-simple unit allows second/minute/hour/day" end @tick = case @unit when :second then 1 when :minute then 60 when :hour then 3600 when :day then 86400 else raise Fluent::ConfigError, "@unit must be one of second/minute/hour/day" end @type_str = self.is_a?(Fluent::Plugin::Filter) ? 'filter' : 'out' @output_proc = if @comment Proc.new { |count| "plugin:#{@type_str}_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" } else Proc.new { |count| "plugin:#{@type_str}_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" } end @count = 0 @mutex = Mutex.new end
countup(count)
click to toggle source
# File lib/fluent/plugin/flowcounter_simple.rb, line 70 def countup(count) @mutex.synchronize { @count = (@count || 0) + count } end
flush_emit(step)
click to toggle source
# File lib/fluent/plugin/flowcounter_simple.rb, line 76 def flush_emit(step) count, @count = @count, 0 if count > 0 log.info @output_proc.call(count) end end
process_count(tag, es)
click to toggle source
# File lib/fluent/plugin/flowcounter_simple.rb, line 96 def process_count(tag, es) countup(@indicator_proc.call(es)) end
shutdown()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/flowcounter_simple.rb, line 66 def shutdown super end
start()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/flowcounter_simple.rb, line 61 def start super thread_create(:flowcounter_simple_watch, &method(:watch)) end
watch()
click to toggle source
# File lib/fluent/plugin/flowcounter_simple.rb, line 83 def watch # instance variable, and public accessable, for test @last_checked = Fluent::EventTime.now while thread_current_running? sleep 0.1 if Fluent::EventTime.now - @last_checked >= @tick now = Fluent::EventTime.now flush_emit(now - @last_checked) @last_checked = now end end end