class Fluent::RecordModifierFilter

Constants

BUILTIN_CONFIGURATIONS
HOSTNAME_PLACEHOLDERS

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_record_modifier.rb, line 31
def configure(conf)
  super

  if conf.has_key?('include_tag_key')
    raise ConfigError, "include_tag_key and tag_key parameters are removed. Use 'tag ${tag}' in <record> section"
  end

  @map = {}
  conf.each_pair { |k, v|
    unless BUILTIN_CONFIGURATIONS.include?(k)
      check_config_placeholders(k, v);
      conf.has_key?(k)
      $log.warn "top level definition is deprecated. Please put parameters inside <record>: '#{k} #{v}'"
      @map[k] = DynamicExpander.new(k, v)
    end
  }

  if @char_encoding
    from, to = @char_encoding.split(':', 2)
    @from_enc = Encoding.find(from)
    @to_enc = Encoding.find(to) if to

    m = if @to_enc
          method(:convert_encoding)
        else
          method(:set_encoding)
        end

    (class << self; self; end).module_eval do
      define_method(:change_encoding, m)
    end
  end

  @has_tag_parts = false
  conf.elements.select { |element| element.name == 'record' }.each do |element|
    element.each_pair do |k, v|
      check_config_placeholders(k, v)
      element.has_key?(k) # to suppress unread configuration warning
      @has_tag_parts = true if v.include?('tag_parts')
      @map[k] = DynamicExpander.new(k, v)
    end
  end

  if @remove_keys and @whitelist_keys
    raise Fluent::ConfigError, "remove_keys and whitelist_keys are exclusive with each other."
  elsif @remove_keys
    @remove_keys = @remove_keys.split(',').map(&:strip)
  elsif @whitelist_keys
    @whitelist_keys = @whitelist_keys.split(',').map(&:strip)
  end

  # Collect DynamicExpander related garbage instructions
  GC.start
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter_record_modifier.rb, line 86
def filter_stream(tag, es)
  new_es = MultiEventStream.new
  tag_parts = @has_tag_parts ? tag.split('.') : nil

  es.each { |time, record|
    @map.each_pair { |k, v|
      record[k] = v.expand(tag, time, record, tag_parts)
    }

    if @remove_keys
      @remove_keys.each { |v|
        record.delete(v)
      }
    elsif @whitelist_keys
      modified = {}
      record.each do |k, v|
        modified[k] = v if @whitelist_keys.include?(k)
      end
      record = modified
    end

    record = change_encoding(record) if @char_encoding
    new_es.add(time, record)
  }
  new_es
end

Private Instance Methods

check_config_placeholders(k, v) click to toggle source
# File lib/fluent/plugin/filter_record_modifier.rb, line 138
def check_config_placeholders(k, v)
  HOSTNAME_PLACEHOLDERS.each { |ph|
    if v.include?(ph)
      raise ConfigError, %Q#{ph} placeholder in #{k} is removed. Use "\#{Socket.gethostname}" instead.!
    end
  }
end
convert_encoding(value) click to toggle source
# File lib/fluent/plugin/filter_record_modifier.rb, line 125
def convert_encoding(value)
  if value.is_a?(String)
    value.force_encoding(@from_enc) if value.encoding == Encoding::BINARY
    value.encode!(@to_enc, @from_enc, :invalid => :replace, :undef => :replace)
  elsif value.is_a?(Hash)
    value.each_pair { |k, v| convert_encoding(v) }
  elsif value.is_a?(Array)
    value.each { |v| convert_encoding(v) }
  end
end
set_encoding(value) click to toggle source
# File lib/fluent/plugin/filter_record_modifier.rb, line 115
def set_encoding(value)
  if value.is_a?(String)
    value.force_encoding(@from_enc)
  elsif value.is_a?(Hash)
    value.each_pair { |k, v| set_encoding(v) }
  elsif value.is_a?(Array)
    value.each { |v| set_encoding(v) }
  end
end