module Fluent::Plugin::KinesisHelper::API::BatchRequest

Public Class Methods

included(mod) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 52
def self.included(mod)
  mod.include BatchRequestParams
end

Public Instance Methods

configure(conf) click to toggle source
Calls superclass method
# File lib/fluent/plugin/kinesis_helper/api.rb, line 56
def configure(conf)
  super
  if @batch_request_max_count.nil?
    @batch_request_max_count = self.class::BatchRequestLimitCount
  elsif @batch_request_max_count > self.class::BatchRequestLimitCount
    raise ConfigError, "batch_request_max_count can't be grater than #{self.class::BatchRequestLimitCount}."
  end
  if @batch_request_max_size.nil?
    @batch_request_max_size = self.class::BatchRequestLimitSize
  elsif @batch_request_max_size > self.class::BatchRequestLimitSize
    raise ConfigError, "batch_request_max_size can't be grater than #{self.class::BatchRequestLimitSize}."
  end
end
size_of_values(record) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 70
def size_of_values(record)
  record.compact.map(&:size).inject(:+) || 0
end

Private Instance Methods

any_records_shipped?(res) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 124
def any_records_shipped?(res)
  results(res).size > failed_count(res)
end
batch_request_with_retry(batch, retry_count=0, backoff: nil) { |batch| ... } click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 92
def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block)
  backoff ||= Backoff.new
  res = yield(batch)
  if failed_count(res) > 0
    failed_records = collect_failed_records(batch, res)
    if retry_count < @retries_on_batch_request
      backoff.reset if @reset_backoff_if_success and any_records_shipped?(res)
      wait_second = backoff.next
      msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second]
      log.warn(truncate msg)
      # Increment num_errors to monitor batch request retries from "monitor_agent" or "fluent-plugin-prometheus"
      increment_num_errors if @monitor_num_of_batch_request_retries
      reliable_sleep(wait_second)
      batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block)
    else
      give_up_retries(failed_records)
    end
  end
end
collect_failed_records(records, res) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 128
def collect_failed_records(records, res)
  failed_records = []
  results(res).each_with_index do |record, index|
    next unless record[:error_code]
    original = case request_type
               when :streams, :firehose; records[index]
               when :streams_aggregated; records
               end
    failed_records.push(
      original:      original,
      error_code:    record[:error_code],
      error_message: record[:error_message]
    )
  end
  failed_records
end
failed_count(res) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 154
def failed_count(res)
  failed_field = case request_type
                 when :streams;            :failed_record_count
                 when :streams_aggregated; :failed_record_count
                 when :firehose;           :failed_put_count
                 end
  res[failed_field]
end
give_up_retries(failed_records) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 172
def give_up_retries(failed_records)
  failed_records.each {|record|
    log.error(truncate 'Could not put record, Error: %s/%s, Record: %s' % [
      record[:error_code],
      record[:error_message],
      record[:original]
    ])
  }

  if @drop_failed_records_after_batch_request_retries
    # Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus"
    increment_num_errors
  else
    # Raise error and return chunk to Fluentd for retrying
    case request_type
    # @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
    # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Client.html#put_records-instance_method
    # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Errors.html
    when :streams, :streams_aggregated
      provisioned_throughput_exceeded_records = failed_records.select { |record| record[:error_code] == 'ProvisionedThroughputExceededException' }
      target_failed_record = provisioned_throughput_exceeded_records.first || failed_records.first
      target_error = provisioned_throughput_exceeded_records.empty? ?
        Aws::Kinesis::Errors::ServiceError :
        Aws::Kinesis::Errors::ProvisionedThroughputExceededException
    # @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
    # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Client.html#put_record_batch-instance_method
    # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Errors.html
    when :firehose
      service_unavailable_exception_records = failed_records.select { |record| record[:error_code] == 'ServiceUnavailableException' }
      target_failed_record = service_unavailable_exception_records.first || failed_records.first
      target_error = service_unavailable_exception_records.empty? ?
        Aws::Firehose::Errors::ServiceError :
        Aws::Firehose::Errors::ServiceUnavailableException
    end
    log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying")
    raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message])
  end
end
increment_num_errors() click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 211
def increment_num_errors
  # Prepare Fluent::Plugin::Output instance variables to count errors in this method.
  # These instance variables are initialized here for possible future breaking changes of Fluentd.
  @num_errors ||= 0
  # @see https://github.com/fluent/fluentd/commit/d245454658d16170431d276fcd5849fb0d88ab2b
  if Gem::Version.new(Fluent::VERSION) >= Gem::Version.new('1.7.0')
    @counter_mutex ||= Mutex.new
    @counter_mutex.synchronize{ @num_errors += 1 }
  else
    @counters_monitor ||= Monitor.new
    @counters_monitor.synchronize{ @num_errors += 1 }
  end
end
reliable_sleep(wait_second) click to toggle source

Sleep seems to not sleep as long as we ask it, our guess is that something wakes up the thread, so we keep on going to sleep if that happens. TODO: find out who is causing the sleep to be too short and try to make them stop it instead

# File lib/fluent/plugin/kinesis_helper/api.rb, line 115
def reliable_sleep(wait_second)
  loop do
    actual = Benchmark.realtime { sleep(wait_second) }
    break if actual >= wait_second
    log.error("#{Thread.current.object_id} sleep failed expected #{wait_second} but slept #{actual}")
    wait_second -= actual
  end
end
results(res) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 163
def results(res)
  result_field = case request_type
                 when :streams;            :records
                 when :streams_aggregated; :records
                 when :firehose;           :request_responses
                 end
  res[result_field]
end
retry_records(failed_records) click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 145
def retry_records(failed_records)
  case request_type
  when :streams, :firehose
    failed_records.map{|r| r[:original] }
  when :streams_aggregated
    failed_records.first[:original]
  end
end
split_to_batches(records) { |batch, size| ... } click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 76
def split_to_batches(records, &block)
  batch = []
  size = 0
  records.each do |record|
    record_size = size_of_values(record)
    if batch.size+1 > @batch_request_max_count or size+record_size > @batch_request_max_size
      yield(batch, size)
      batch = []
      size = 0
    end
    batch << record
    size += record_size
  end
  yield(batch, size) if batch.size > 0
end