class Fluent::TimeSlicedFilterOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_time_sliced_filter.rb, line 16 def configure(conf) super unless File.exist?(@filter_path) raise Fluent::ConfigError, "No such file: #{@filter_path}" end begin @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path) rescue => e raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}" end unless @filter.respond_to?(:call) raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}" end end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_time_sliced_filter.rb, line 34 def format(tag, time, record) # XXX: format_stream is not called # https://github.com/fluent/fluentd/blob/v0.10.43/lib/fluent/output.rb#L516 filter_record(tag, time, record) [tag, time, record].to_msgpack end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_time_sliced_filter.rb, line 41 def write(chunk) enum = chunk.to_enum(:msgpack_each) rows = @pass_hash_row ? enum.map {|_, _, record| record } : enum.to_a records = @filter.call(rows) records = [records] unless records.kind_of?(Array) time = enum.map {|_, time, _| time }.min if @emit_each_tag tags = enum.map {|tag, _, _| tag } tags.each {|tag| emit_records(tag, time, records) } else tag = enum.first[0] emit_records(tag, time, records) end end
Private Instance Methods
emit_records(tag, time, records)
click to toggle source
# File lib/fluent/plugin/out_time_sliced_filter.rb, line 59 def emit_records(tag, time, records) records.each do |record| if record.kind_of?(Hash) Fluent::Engine.emit("#{@prefix}.#{tag}", time, record) else log.warn("Record must be Hash: #{record.inspect} (#{record.class})") end end end