class Fluent::Plugin::KinesisOutput

Public Instance Methods

configure(conf) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 79
def configure(conf)
  super
  @data_formatter = data_formatter_create(conf)
end
formatted_to_msgpack_binary?() click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 88
def formatted_to_msgpack_binary?
  true
end
multi_workers_ready?() click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 84
def multi_workers_ready?
  true
end

Private Instance Methods

compressor_create() click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 122
def compressor_create
  log.info("@compression: #{@compression}")
  case @compression
  when "zlib"
    ->(data) { Zlib::Deflate.deflate(data) }
  when "gzip"
    ->(data) { Gzip.compress(data) }
  else
    ->(data) { data }
  end
end
data_formatter_create(conf) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 94
def data_formatter_create(conf)
  formatter = formatter_create
  compressor = compressor_create
  if @data_key.nil?
    if @chomp_record
      ->(tag, time, record) {
        record = inject_values_to_record(tag, time, record)
        # Formatter calls chomp and removes separator from the end of each record.
        # This option is for compatible format with plugin v2.
        # https://github.com/awslabs/aws-fluent-plugin-kinesis/issues/142
        compressor.call(formatter.format(tag, time, record).chomp.b)
      }
    else
      ->(tag, time, record) {
        record = inject_values_to_record(tag, time, record)
        compressor.call(formatter.format(tag, time, record).b)
      }
    end
  else
    ->(tag, time, record) {
      raise InvalidRecordError, record unless record.is_a? Hash
      raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil?
      log.info("record: #{record}")
      compressor.call(record[@data_key].to_s.b)
    }
  end
end
format_for_api(&block) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 134
def format_for_api(&block)
  converted = block.call
  size = size_of_values(converted)
  if size > @max_record_size
    raise ExceedMaxRecordSizeError.new(size, converted)
  end
  converted.to_msgpack
rescue SkipRecordError => e
  log.error(truncate e)
  ''
end
msgpack_unpacker(*args) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 147
def msgpack_unpacker(*args)
  Fluent::MessagePackFactory.msgpack_unpacker(*args)
end
request_type() click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 164
def request_type
  self.class::RequestType
end
truncate(msg) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 168
def truncate(msg)
  if @log_truncate_max_size == 0 or (msg.to_s.size <= @log_truncate_max_size)
    msg.to_s
  else
    msg.to_s[0...@log_truncate_max_size]
  end
end
write_records_batch(chunk, stream_name, &block) click to toggle source
# File lib/fluent/plugin/kinesis.rb, line 154
def write_records_batch(chunk, stream_name, &block)
  unique_id = chunk.dump_unique_id_hex(chunk.unique_id)
  records = chunk.to_enum(:msgpack_each)
  split_to_batches(records) do |batch, size|
    log.debug(sprintf "%s: Write chunk %s / %3d records / %4d KB", stream_name, unique_id, batch.size, size/1024)
    batch_request_with_retry(batch, &block)
    log.debug(sprintf "%s: Finish writing chunk", stream_name)
  end
end