class Fluent::Plugin::SplitBySizeFilter

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_split_by_size.rb, line 50
def configure(conf)
  super
  @max_record_size = @max_event_size - 200 # Allow space to add the id field
end
filter_stream(tag, es) click to toggle source
# File lib/fluent/plugin/filter_split_by_size.rb, line 55
def filter_stream(tag, es)
  new_es = MultiEventStream.new
  es.each { |time, record|
    begin
      raise KeyNotFoundError.new(@id_field, record) if record[@id_field].nil?
      id = record.delete(@id_field)
      records = split_event(record)
      records.each { |rec|
        rec[@id_field] = id
        new_es.add(time, rec)
      }
    rescue => e
      router.emit_error_event(tag, time, record, e)
    end
  }
  new_es
end
size_of_values(record) click to toggle source
# File lib/fluent/plugin/filter_split_by_size.rb, line 73
def size_of_values(record)
  Yajl.dump(record).bytesize
end
split_event(record) click to toggle source
# File lib/fluent/plugin/filter_split_by_size.rb, line 77
def split_event(record)
  records = []
  if size_of_values(record) > @max_record_size
    if record.keys.count > 1
      split_records = record.split_into(2)
      split_records.each { |rec|
        records = records + split_event(rec)
      }
    else
      log.warn('Key/Value pair is too large: "%s:%s". Max size is: %s, dropping!' % [record.keys[0], record.values[0], @max_record_size])
      return []
    end
  else
    records.push(record)
  end
  records
end
split_into(divisions) click to toggle source
# File lib/fluent/plugin/filter_split_by_size.rb, line 96
def split_into(divisions)
  count = 0
  inject([]) do |final, key_value|
    final[count%divisions] ||= {}
    final[count%divisions].merge!({key_value[0] => key_value[1]})
    count += 1
    final
  end
end