class Fluent::Plugin::SplitBySizeFilter
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/filter_split_by_size.rb, line 50 def configure(conf) super @max_record_size = @max_event_size - 200 # Allow space to add the id field end
filter_stream(tag, es)
click to toggle source
# File lib/fluent/plugin/filter_split_by_size.rb, line 55 def filter_stream(tag, es) new_es = MultiEventStream.new es.each { |time, record| begin raise KeyNotFoundError.new(@id_field, record) if record[@id_field].nil? id = record.delete(@id_field) records = split_event(record) records.each { |rec| rec[@id_field] = id new_es.add(time, rec) } rescue => e router.emit_error_event(tag, time, record, e) end } new_es end
size_of_values(record)
click to toggle source
# File lib/fluent/plugin/filter_split_by_size.rb, line 73 def size_of_values(record) Yajl.dump(record).bytesize end
split_event(record)
click to toggle source
# File lib/fluent/plugin/filter_split_by_size.rb, line 77 def split_event(record) records = [] if size_of_values(record) > @max_record_size if record.keys.count > 1 split_records = record.split_into(2) split_records.each { |rec| records = records + split_event(rec) } else log.warn('Key/Value pair is too large: "%s:%s". Max size is: %s, dropping!' % [record.keys[0], record.values[0], @max_record_size]) return [] end else records.push(record) end records end
split_into(divisions)
click to toggle source
# File lib/fluent/plugin/filter_split_by_size.rb, line 96 def split_into(divisions) count = 0 inject([]) do |final, key_value| final[count%divisions] ||= {} final[count%divisions].merge!({key_value[0] => key_value[1]}) count += 1 final end end