class Fluent::IncrementalOutput
Public Class Methods
new()
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_incremental.rb, line 11 def initialize Encoding.default_internal = "UTF-8" super end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_incremental.rb, line 16 def configure(conf) super if @unit.nil? || @unit.empty? raise ConfigError, "incremental configure requires 'unit'" else if @unit != "hour" and @unit != "day" and @unit != "month" and @unit != "year" and @unit != "min" raise ConfigError, "incremental configure unit allow year or month or day or hour or min" end end if @incremental_file_path.nil? || @incremental_file_path.empty? raise ConfigError, "incremental configure requires 'incremental_file_path'" end if @add_tag_prefix.nil? || @add_tag_prefix.empty? raise ConfigError, "incremental configure requires 'add_tag_prefix'" end if @remove_tag_prefix.nil? || @remove_tag_prefix.empty? raise ConfigError, "incremental configure requires 'remove_tag_prefix'" end @time_format = "%Y-%m-%d %H:%M:%S" @timestamp_key = "last_updated" end
emit(tag, es, chain)
click to toggle source
# File lib/fluent/plugin/out_incremental.rb, line 38 def emit(tag, es, chain) filepath = @incremental_file_path + "." + tag result = get_result(filepath) es.each do | time, record| record.each {|key,value| next if (value =~ /^[+-]?\d+$/) == nil unless @name_key_pattern.nil? next if key !~ /#{@name_key_pattern}/ end result[key] = (result[key].nil? ? 0 : result[key]) + value.to_i } end result[@timestamp_key] = Time.now.strftime(@time_format) write_file(result,filepath) result.delete(@timestamp_key) Fluent::Engine.emit(tag.gsub(@remove_tag_prefix,@add_tag_prefix), Fluent::Engine.now, result) end
Private Instance Methods
get_result(filepath)
click to toggle source
# File lib/fluent/plugin/out_incremental.rb, line 66 def get_result(filepath) if File.exist?(filepath) if File.read(filepath).size == 0 result = Hash.new else result = Marshal.load(File.read(filepath)) if @unit == 'year' if Time.now.year != Time.strptime(result[@timestamp_key],@time_format).year result = Hash.new end elsif @unit == 'month' if Time.now.month != Time.strptime(result[@timestamp_key],@time_format).month result = Hash.new end elsif @unit == 'day' if Time.now.day != Time.strptime(result[@timestamp_key],@time_format).day result = Hash.new end elsif @unit == 'hour' if Time.now.hour != Time.strptime(result[@timestamp_key],@time_format).hour result = Hash.new end elsif @unit == 'min' if Time.now.min != Time.strptime(result[@timestamp_key],@time_format).min result = Hash.new end end end else # not exist file result = Hash.new end return result end
write_file(result,filepath)
click to toggle source
# File lib/fluent/plugin/out_incremental.rb, line 58 def write_file(result,filepath) dump = Marshal.dump(result) File.open(filepath,'w') { |file| file.print dump file.close } end