class Fluent::Plugin::KinesisStreamsAggregatedOutput

Constants

BatchRequestLimitCount
BatchRequestLimitSize
RequestType

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 32
def configure(conf)
  super
  @partition_key_generator = create_partition_key_generator
  @batch_request_max_size -= offset
  @max_record_size -= offset
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 39
def format(tag, time, record)
  format_for_api do
    [@data_formatter.call(tag, time, record)]
  end
end
offset() click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 60
def offset
  @offset ||= AggregateOffset + @partition_key_generator.call.size*2
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 45
def write(chunk)
  stream_name = extract_placeholders(@stream_name, chunk)
  write_records_batch(chunk, stream_name) do |batch|
    key = @partition_key_generator.call
    records = batch.map{|(data)|data}
    client.put_records(
      stream_name: stream_name,
      records: [{
        partition_key: key,
        data: aggregator.aggregate(records, key),
      }],
    )
  end
end

Private Instance Methods

create_partition_key_generator() click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 70
def create_partition_key_generator
  if @fixed_partition_key.nil?
    ->() { SecureRandom.hex(16) }
  else
    ->() { @fixed_partition_key }
  end
end
size_of_values(record) click to toggle source
# File lib/fluent/plugin/out_kinesis_streams_aggregated.rb, line 66
def size_of_values(record)
  super(record) + RecordOffset
end