class Fluent::TimeSlicedFilterOutput

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_time_sliced_filter.rb, line 16
def configure(conf)
  super

  unless File.exist?(@filter_path)
    raise Fluent::ConfigError, "No such file: #{@filter_path}"
  end

  begin
    @filter = Object.new.instance_eval(File.read(@filter_path), @filter_path)
  rescue => e
    raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}"
  end

  unless @filter.respond_to?(:call)
    raise Fluent::ConfigError, "`call` method not implemented in filter: #{@filter_path}"
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_time_sliced_filter.rb, line 34
def format(tag, time, record)
  # XXX: format_stream is not called
  # https://github.com/fluent/fluentd/blob/v0.10.43/lib/fluent/output.rb#L516
  filter_record(tag, time, record)
  [tag, time, record].to_msgpack
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_time_sliced_filter.rb, line 41
def write(chunk)
  enum = chunk.to_enum(:msgpack_each)
  rows = @pass_hash_row ? enum.map {|_, _, record| record } : enum.to_a
  records = @filter.call(rows)
  records = [records] unless records.kind_of?(Array)
  time = enum.map {|_, time, _| time }.min

  if @emit_each_tag
    tags = enum.map {|tag, _, _| tag }
    tags.each {|tag| emit_records(tag, time, records) }
  else
    tag = enum.first[0]
    emit_records(tag, time, records)
  end
end

Private Instance Methods

emit_records(tag, time, records) click to toggle source
# File lib/fluent/plugin/out_time_sliced_filter.rb, line 59
def emit_records(tag, time, records)
  records.each do |record|
    if record.kind_of?(Hash)
      Fluent::Engine.emit("#{@prefix}.#{tag}", time, record)
    else
      log.warn("Record must be Hash: #{record.inspect} (#{record.class})")
    end
  end
end