class Fluent::Plugin::KinesisStreamsOutput
Constants
- BatchRequestLimitCount
- BatchRequestLimitSize
- RequestType
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::KinesisHelper::API::BatchRequest#configure
# File lib/fluent/plugin/out_kinesis_streams.rb, line 30 def configure(conf) super @key_formatter = key_formatter_create end
format(tag, time, record)
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 35 def format(tag, time, record) format_for_api do data = @data_formatter.call(tag, time, record) key = @key_formatter.call(record) [data, key] end end
write(chunk)
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 43 def write(chunk) stream_name = extract_placeholders(@stream_name, chunk) write_records_batch(chunk, stream_name) do |batch| records = batch.map{|(data, partition_key)| { data: data, partition_key: partition_key } } client.put_records( stream_name: stream_name, records: records, ) end end
Private Instance Methods
key_formatter_create()
click to toggle source
# File lib/fluent/plugin/out_kinesis_streams.rb, line 58 def key_formatter_create if @partition_key.nil? ->(record) { SecureRandom.hex(16) } else ->(record) { if !record.key?(@partition_key) raise KeyNotFoundError.new(@partition_key, record) end record[@partition_key] } end end