class FluentPluginKinesisAggregation::OutputFilter

Constants

DEFAULT_BUFFER_TYPE
FLUENTD_MAX_BUFFER_SIZE

200 is an arbitrary number more than the envelope overhead and big enough to store partition/hash key table in AggregatedRecords. Note that you shouldn’t really ever have the buffer this high, since you’re likely to fail the write if anyone else is writing to the shard at the time.

KPL_MAGIC_NUMBER

github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md

NAME
PUT_RECORD_MAX_DATA_SIZE

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 88
def configure(conf)
  compat_parameters_convert(conf, :buffer, :inject)
  super

  if @buffer.chunk_limit_size > FLUENTD_MAX_BUFFER_SIZE
    raise Fluent::ConfigError, "Kinesis buffer_chunk_limit is set to more than the 1mb shard limit (i.e. you won't be able to write your chunks!"
  end

  if @buffer.chunk_limit_size > FLUENTD_MAX_BUFFER_SIZE / 3
    log.warn 'Kinesis buffer_chunk_limit is set at more than 1/3 of the per second shard limit (1mb). This is not good if you have many producers.'
  end
end
format(tag, time, record) click to toggle source
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 106
def format(tag, time, record)
  record = inject_values_to_record(tag, time, record)

  return AggregatedRecord.encode(AggregatedRecord.new(
    records: [Record.new(
      partition_key_index: 1,
      data: Yajl.dump(record).b
    )]
  ))
end
start() click to toggle source
Calls superclass method
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 101
def start
  super
  load_client
end
write(chunk) click to toggle source
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 117
def write(chunk)
  records = chunk.read
  if records.length > FLUENTD_MAX_BUFFER_SIZE
    log.error "Can't emit aggregated #{@stream_name} stream record of length #{records.length} (more than #{FLUENTD_MAX_BUFFER_SIZE})"
    return # do not throw, since we can't retry
  end

  partition_key = @fixed_partition_key || SecureRandom.uuid

  # confusing magic. Because of the format of protobuf records,
  # it's valid (in this case) to concatenate the AggregatedRecords
  # to form one AggregatedRecord, since we only have a repeated field
  # in records.
  #
  # ALSO, since we use google's protobuf stuff (much better
  # memory usage due to C extension), we're stuck on proto3.
  # Unfortunately, KPL uses proto2 form, and partition_key_index
  # is a required field. If we set it to 0 in proto3, though,
  # it's helpfully ignored in the serialisation (default!).
  # Therefore we have to pass a partition_key_index of 1,
  # and put two things in our partition_key_table.
  message = AggregatedRecord.encode(AggregatedRecord.new(
    partition_key_table: ['a', partition_key]
  )) + records

  @client.put_record(
    stream_name: @stream_name,
    data: kpl_aggregation_pack(message),
    partition_key: partition_key
  )
end

Private Instance Methods

kpl_aggregation_pack(message) click to toggle source
# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 153
def kpl_aggregation_pack(message)
    [
      KPL_MAGIC_NUMBER, message, Digest::MD5.digest(message)
    ].pack("A4A*A16")
end
load_client() click to toggle source

This code is unchanged from github.com/awslabs/aws-fluent-plugin-kinesis

# File lib/fluent/plugin/out_kinesis-aggregation.rb, line 160
def load_client
  user_agent_suffix = "fluent-#{NAME}"

  options = {
    user_agent_suffix: user_agent_suffix
  }

  if @region
    options[:region] = @region
  end

  if @aws_key_id && @aws_sec_key
    options.update(
      access_key_id: @aws_key_id,
      secret_access_key: @aws_sec_key,
    )
  elsif @profile
    credentials_opts = {:profile_name => @profile}
    credentials_opts[:path] = @credentials_path if @credentials_path
    credentials = Aws::SharedCredentials.new(credentials_opts)
    options[:credentials] = credentials
  elsif @role_arn
    credentials = Aws::AssumeRoleCredentials.new(
      client: Aws::STS::Client.new(options),
      role_arn: @role_arn,
      role_session_name: "fluent-plugin-kinesis-aggregation",
      external_id: @external_id,
      duration_seconds: 60 * 60
    )
    options[:credentials] = credentials
  end

  if @debug
    options.update(
      logger: Logger.new(log.out),
      log_level: :debug
    )
  end

  if @http_proxy
    options[:http_proxy] = @http_proxy
  end

  @client = Aws::Kinesis::Client.new(options)
end