module Fluent::Plugin::KinesisHelper::API::BatchRequest
Public Class Methods
included(mod)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 52 def self.included(mod) mod.include BatchRequestParams end
Public Instance Methods
configure(conf)
click to toggle source
Calls superclass method
# File lib/fluent/plugin/kinesis_helper/api.rb, line 56 def configure(conf) super if @batch_request_max_count.nil? @batch_request_max_count = self.class::BatchRequestLimitCount elsif @batch_request_max_count > self.class::BatchRequestLimitCount raise ConfigError, "batch_request_max_count can't be grater than #{self.class::BatchRequestLimitCount}." end if @batch_request_max_size.nil? @batch_request_max_size = self.class::BatchRequestLimitSize elsif @batch_request_max_size > self.class::BatchRequestLimitSize raise ConfigError, "batch_request_max_size can't be grater than #{self.class::BatchRequestLimitSize}." end end
size_of_values(record)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 70 def size_of_values(record) record.compact.map(&:size).inject(:+) || 0 end
Private Instance Methods
any_records_shipped?(res)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 124 def any_records_shipped?(res) results(res).size > failed_count(res) end
batch_request_with_retry(batch, retry_count=0, backoff: nil) { |batch| ... }
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 92 def batch_request_with_retry(batch, retry_count=0, backoff: nil, &block) backoff ||= Backoff.new res = yield(batch) if failed_count(res) > 0 failed_records = collect_failed_records(batch, res) if retry_count < @retries_on_batch_request backoff.reset if @reset_backoff_if_success and any_records_shipped?(res) wait_second = backoff.next msg = 'Retrying to request batch. Retry count: %3d, Retry records: %3d, Wait seconds %3.2f' % [retry_count+1, failed_records.size, wait_second] log.warn(truncate msg) # Increment num_errors to monitor batch request retries from "monitor_agent" or "fluent-plugin-prometheus" increment_num_errors if @monitor_num_of_batch_request_retries reliable_sleep(wait_second) batch_request_with_retry(retry_records(failed_records), retry_count+1, backoff: backoff, &block) else give_up_retries(failed_records) end end end
collect_failed_records(records, res)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 128 def collect_failed_records(records, res) failed_records = [] results(res).each_with_index do |record, index| next unless record[:error_code] original = case request_type when :streams, :firehose; records[index] when :streams_aggregated; records end failed_records.push( original: original, error_code: record[:error_code], error_message: record[:error_message] ) end failed_records end
failed_count(res)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 154 def failed_count(res) failed_field = case request_type when :streams; :failed_record_count when :streams_aggregated; :failed_record_count when :firehose; :failed_put_count end res[failed_field] end
give_up_retries(failed_records)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 172 def give_up_retries(failed_records) failed_records.each {|record| log.error(truncate 'Could not put record, Error: %s/%s, Record: %s' % [ record[:error_code], record[:error_message], record[:original] ]) } if @drop_failed_records_after_batch_request_retries # Increment num_errors to monitor batch request failure from "monitor_agent" or "fluent-plugin-prometheus" increment_num_errors else # Raise error and return chunk to Fluentd for retrying case request_type # @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Client.html#put_records-instance_method # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Kinesis/Errors.html when :streams, :streams_aggregated provisioned_throughput_exceeded_records = failed_records.select { |record| record[:error_code] == 'ProvisionedThroughputExceededException' } target_failed_record = provisioned_throughput_exceeded_records.first || failed_records.first target_error = provisioned_throughput_exceeded_records.empty? ? Aws::Kinesis::Errors::ServiceError : Aws::Kinesis::Errors::ProvisionedThroughputExceededException # @see https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Client.html#put_record_batch-instance_method # @see https://docs.aws.amazon.com/sdk-for-ruby/v3/api/Aws/Firehose/Errors.html when :firehose service_unavailable_exception_records = failed_records.select { |record| record[:error_code] == 'ServiceUnavailableException' } target_failed_record = service_unavailable_exception_records.first || failed_records.first target_error = service_unavailable_exception_records.empty? ? Aws::Firehose::Errors::ServiceError : Aws::Firehose::Errors::ServiceUnavailableException end log.error("Raise #{target_failed_record[:error_code]} and return chunk to Fluentd buffer for retrying") raise target_error.new(Seahorse::Client::RequestContext.new, target_failed_record[:error_message]) end end
increment_num_errors()
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 211 def increment_num_errors # Prepare Fluent::Plugin::Output instance variables to count errors in this method. # These instance variables are initialized here for possible future breaking changes of Fluentd. @num_errors ||= 0 # @see https://github.com/fluent/fluentd/commit/d245454658d16170431d276fcd5849fb0d88ab2b if Gem::Version.new(Fluent::VERSION) >= Gem::Version.new('1.7.0') @counter_mutex ||= Mutex.new @counter_mutex.synchronize{ @num_errors += 1 } else @counters_monitor ||= Monitor.new @counters_monitor.synchronize{ @num_errors += 1 } end end
reliable_sleep(wait_second)
click to toggle source
Sleep seems to not sleep as long as we ask it, our guess is that something wakes up the thread, so we keep on going to sleep if that happens. TODO: find out who is causing the sleep to be too short and try to make them stop it instead
# File lib/fluent/plugin/kinesis_helper/api.rb, line 115 def reliable_sleep(wait_second) loop do actual = Benchmark.realtime { sleep(wait_second) } break if actual >= wait_second log.error("#{Thread.current.object_id} sleep failed expected #{wait_second} but slept #{actual}") wait_second -= actual end end
results(res)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 163 def results(res) result_field = case request_type when :streams; :records when :streams_aggregated; :records when :firehose; :request_responses end res[result_field] end
retry_records(failed_records)
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 145 def retry_records(failed_records) case request_type when :streams, :firehose failed_records.map{|r| r[:original] } when :streams_aggregated failed_records.first[:original] end end
split_to_batches(records) { |batch, size| ... }
click to toggle source
# File lib/fluent/plugin/kinesis_helper/api.rb, line 76 def split_to_batches(records, &block) batch = [] size = 0 records.each do |record| record_size = size_of_values(record) if batch.size+1 > @batch_request_max_count or size+record_size > @batch_request_max_size yield(batch, size) batch = [] size = 0 end batch << record size += record_size end yield(batch, size) if batch.size > 0 end