class Fluent::Plugin::Filter

Attributes

has_filter_with_time[R]

Public Class Methods

new() click to toggle source
Calls superclass method Fluent::PluginLoggerMixin::new
# File lib/fluent/plugin/filter.rb, line 35
def initialize
  super
  @has_filter_with_time = has_filter_with_time?
end

Public Instance Methods

filter(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter.rb, line 40
def filter(tag, time, record)
  raise NotImplementedError, "BUG: filter plugins MUST implement this method"
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter.rb, line 48
def filter_stream(tag, es)
  new_es = MultiEventStream.new
  if @has_filter_with_time
    es.each do |time, record|
      begin
        filtered_time, filtered_record = filter_with_time(tag, time, record)
        new_es.add(filtered_time, filtered_record) if filtered_time && filtered_record
      rescue => e
        router.emit_error_event(tag, time, record, e)
      end
    end
  else
    es.each do |time, record|
      begin
        filtered_record = filter(tag, time, record)
        new_es.add(time, filtered_record) if filtered_record
      rescue => e
        router.emit_error_event(tag, time, record, e)
      end
    end
  end
  new_es
end
filter_with_time(tag, time, record) click to toggle source
# File lib/fluent/plugin/filter.rb, line 44
def filter_with_time(tag, time, record)
  raise NotImplementedError, "BUG: filter plugins MUST implement this method"
end

Private Instance Methods

has_filter_with_time?() click to toggle source
# File lib/fluent/plugin/filter.rb, line 74
def has_filter_with_time?
  implmented_methods = self.class.instance_methods(false)
  # Plugins that override `filter_stream` don't need check,
  # because they may not call `filter` or `filter_with_time`
  # for example fluentd/lib/fluent/plugin/filter_record_transformer.rb
  return nil if implmented_methods.include?(:filter_stream)
  case
  when [:filter, :filter_with_time].all? { |e| implmented_methods.include?(e) }
    raise "BUG: Filter plugins MUST be implemented either `filter` or `filter_with_time`"
  when implmented_methods.include?(:filter)
    false
  when implmented_methods.include?(:filter_with_time)
    true
  else
    raise NotImplementedError, "BUG: Filter plugins MUST be implmented either `filter` or `filter_with_time`"
  end
end