class Fluent::Plugin::MapSupport

Public Class Methods

new(map, plugin) click to toggle source
# File lib/fluent/plugin/map_support.rb, line 22
      def initialize(map, plugin)
        @map = map
        @plugin = plugin
        @checker = Fluent::Plugin::Parser::TimeoutChecker.new(@plugin.timeout)
        @checker.start

        if plugin.is_a?(Fluent::Plugin::Filter)
          singleton_class.module_eval(<<-CODE)
          def map_func(time, record)
            #{@map}
          end
        CODE
          class << self
            alias_method :generate_tuples, :generate_tuples_filter
            alias_method :do_map, :do_map_filter
          end
        elsif plugin.is_a?(Fluent::Plugin::Output)
          singleton_class.module_eval(<<-CODE)
          def map_func(tag, time, record)
            #{@map}
          end
        CODE
          class << self
            alias_method :generate_tuples, :generate_tuples_output
            alias_method :do_map, :do_map_output
          end
        end
      end

Public Instance Methods

do_map(tag, es) click to toggle source
# File lib/fluent/plugin/map_support.rb, line 51
def do_map(tag, es)
  # This method will be overwritten in #initailize.
end
do_map_filter(tag, es) click to toggle source
# File lib/fluent/plugin/map_support.rb, line 55
def do_map_filter(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end
do_map_output(tag, es) click to toggle source
# File lib/fluent/plugin/map_support.rb, line 69
def do_map_output(tag, es)
  tuples = generate_tuples(tag, es)

  tag_output_es = Hash.new{|h, key| h[key] = Fluent::MultiEventStream.new}
  tuples.each do |tag, time, record|
    if time == nil || record == nil
      raise SyntaxError.new
    end
    tag_output_es[tag].add(time, record)
    @plugin.log.trace { [tag, time, record].inspect }
  end
  tag_output_es
end
generate_tuples() click to toggle source
# File lib/fluent/plugin/map_support.rb, line 83
def generate_tuples
  # This method will be overwritten in #initailize.
end
generate_tuples_filter(tag, es) click to toggle source
# File lib/fluent/plugin/map_support.rb, line 87
def generate_tuples_filter(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end
generate_tuples_output(tag, es) click to toggle source
# File lib/fluent/plugin/map_support.rb, line 98
def generate_tuples_output(tag, es)
  tuples = []
  es.each {|time, record|
    timeout_block do
      new_tuple = map_func(tag, time, record)
      tuples.concat new_tuple
    end
  }
  tuples
end
stop() click to toggle source
# File lib/fluent/plugin/map_support.rb, line 119
def stop
  @checker.stop
end
timeout_block() { || ... } click to toggle source
# File lib/fluent/plugin/map_support.rb, line 109
def timeout_block
  begin
    @checker.execute {
      yield
    }
  rescue Timeout::Error
    @plugin.log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
  end
end