class Fluent::Plugin::KinesisOutput
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
Fluent::Plugin::KinesisHelper::API#configure
# File lib/fluent/plugin/kinesis.rb, line 79 def configure(conf) super @data_formatter = data_formatter_create(conf) end
formatted_to_msgpack_binary?()
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 88 def formatted_to_msgpack_binary? true end
multi_workers_ready?()
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 84 def multi_workers_ready? true end
Private Instance Methods
compressor_create()
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 122 def compressor_create log.info("@compression: #{@compression}") case @compression when "zlib" ->(data) { Zlib::Deflate.deflate(data) } when "gzip" ->(data) { Gzip.compress(data) } else ->(data) { data } end end
data_formatter_create(conf)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 94 def data_formatter_create(conf) formatter = formatter_create compressor = compressor_create if @data_key.nil? if @chomp_record ->(tag, time, record) { record = inject_values_to_record(tag, time, record) # Formatter calls chomp and removes separator from the end of each record. # This option is for compatible format with plugin v2. # https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/142 compressor.call(formatter.format(tag, time, record).chomp.b) } else ->(tag, time, record) { record = inject_values_to_record(tag, time, record) compressor.call(formatter.format(tag, time, record).b) } end else ->(tag, time, record) { raise InvalidRecordError, record unless record.is_a? Hash raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil? log.info("record: #{record}") compressor.call(record[@data_key].to_s.b) } end end
format_for_api(&block)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 134 def format_for_api(&block) converted = block.call size = size_of_values(converted) if size > @max_record_size raise ExceedMaxRecordSizeError.new(size, converted) end converted.to_msgpack rescue SkipRecordError => e log.error(truncate e) '' end
msgpack_unpacker(*args)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 147 def msgpack_unpacker(*args) Fluent::MessagePackFactory.msgpack_unpacker(*args) end
request_type()
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 164 def request_type self.class::RequestType end
truncate(msg)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 168 def truncate(msg) if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size) msg.to_s else msg.to_s[0...@log_truncate_max_size] end end
write_records_batch(chunk, stream_name, &block)
click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 154 def write_records_batch(chunk, stream_name, &block) unique_id = chunk.dump_unique_id_hex(chunk.unique_id) records = chunk.to_enum(:msgpack_each) split_to_batches(records) do |batch, size| log.debug(sprintf "%s: Write chunk %s / %3d records / %4d KB", stream_name, unique_id, batch.size, size/1024) batch_request_with_retry(batch, &block) log.debug(sprintf "%s: Finish writing chunk", stream_name) end end