class Racecar::ConsumerSet
Constants
- MAX_POLL_TRIES
Public Class Methods
# File lib/racecar/consumer_set.rb, line 7 def initialize(config, logger, instrumenter = NullInstrumenter) @config, @logger = config, logger @instrumenter = instrumenter raise ArgumentError, "Subscriptions must not be empty when subscribing" if @config.subscriptions.empty? @consumers = [] @consumer_id_iterator = (0...@config.subscriptions.size).cycle @previous_retries = 0 @last_poll_read_nil_message = false end
Public Instance Methods
batch_poll
collects messages until any of the following occurs:
-
max_wait_time_ms time has passed
-
max_messages have been collected
-
a nil message was polled (end of topic, Kafka stalled, etc.)
The messages are from a single topic, but potentially from more than one partition.
Any errors during polling are retried in an exponential backoff fashion. If an error occurs, but there is no time left for a backoff and retry, it will return the already collected messages and only retry on the next call.
# File lib/racecar/consumer_set.rb, line 34 def batch_poll(max_wait_time_ms = @config.max_wait_time_ms, max_messages = @config.fetch_messages) started_at = Time.now remain_ms = max_wait_time_ms maybe_select_next_consumer messages = [] while remain_ms > 0 && messages.size < max_messages remain_ms = remaining_time_ms(max_wait_time_ms, started_at) msg = poll_with_retries(remain_ms) break if msg.nil? messages << msg end messages end
# File lib/racecar/consumer_set.rb, line 60 def close each_subscribed(&:close) end
# File lib/racecar/consumer_set.rb, line 54 def commit each_subscribed do |consumer| commit_rescue_no_offset(consumer) end end
# File lib/racecar/consumer_set.rb, line 64 def current @consumers[@consumer_id_iterator.peek] ||= begin consumer = Rdkafka::Config.new(rdkafka_config(current_subscription)).consumer @instrumenter.instrument('join_group') do consumer.subscribe current_subscription.topic end consumer end end
# File lib/racecar/consumer_set.rb, line 74 def each_subscribed if block_given? @consumers.each { |c| yield c } else @consumers.each end end
# File lib/racecar/consumer_set.rb, line 82 def pause(topic, partition, offset) consumer, filtered_tpl = find_consumer_by(topic, partition) if !consumer @logger.info "Attempted to pause #{topic}/#{partition}, but we're not subscribed to it" return end consumer.pause(filtered_tpl) fake_msg = OpenStruct.new(topic: topic, partition: partition, offset: offset) consumer.seek(fake_msg) end
# File lib/racecar/consumer_set.rb, line 20 def poll(max_wait_time_ms = @config.max_wait_time_ms) batch_poll(max_wait_time_ms, 1).first end
# File lib/racecar/consumer_set.rb, line 94 def resume(topic, partition) consumer, filtered_tpl = find_consumer_by(topic, partition) if !consumer @logger.info "Attempted to resume #{topic}/#{partition}, but we're not subscribed to it" return end consumer.resume(filtered_tpl) end
# File lib/racecar/consumer_set.rb, line 50 def store_offset(message) current.store_offset(message) end
Subscribe to all topics eagerly, even if there's still messages elsewhere. Usually that's not needed and Kafka might rebalance if topics are not polled frequently enough.
# File lib/racecar/consumer_set.rb, line 109 def subscribe_all @config.subscriptions.size.times do current select_next_consumer end end
Private Instance Methods
# File lib/racecar/consumer_set.rb, line 202 def commit_rescue_no_offset(consumer) consumer.commit(nil, !@config.synchronous_commits) rescue Rdkafka::RdkafkaError => e raise e if e.code != :no_offset @logger.debug "Nothing to commit." end
# File lib/racecar/consumer_set.rb, line 179 def current_subscription @config.subscriptions[@consumer_id_iterator.peek] end
# File lib/racecar/consumer_set.rb, line 167 def find_consumer_by(topic, partition) each do |consumer| tpl = consumer.assignment.to_h rdkafka_partition = tpl[topic]&.detect { |part| part.partition == partition } next unless rdkafka_partition filtered_tpl = Rdkafka::Consumer::TopicPartitionList.new({ topic => [rdkafka_partition] }) return consumer, filtered_tpl end return nil, nil end
# File lib/racecar/consumer_set.rb, line 192 def maybe_select_next_consumer return unless @last_poll_read_nil_message @last_poll_read_nil_message = false select_next_consumer end
polls a message for the current consumer, handling any API edge cases.
# File lib/racecar/consumer_set.rb, line 155 def poll_current_consumer(max_wait_time_ms) msg = current.poll(max_wait_time_ms) rescue Rdkafka::RdkafkaError => e case e.code when :max_poll_exceeded, :transport, :not_coordinator # -147, -195, 16 reset_current_consumer end raise ensure @last_poll_read_nil_message = msg.nil? end
polls a single message from the current consumer, retrying errors with exponential backoff. The sleep time is capped by max_wait_time_ms. If there's enough time budget left, it will retry before returning. If there isn't, the retry will only occur on the next call. It tries up to MAX_POLL_TRIES
before passing on the exception.
# File lib/racecar/consumer_set.rb, line 122 def poll_with_retries(max_wait_time_ms) try ||= @previous_retries @previous_retries = 0 started_at ||= Time.now remain_ms = remaining_time_ms(max_wait_time_ms, started_at) wait_ms = try == 0 ? 0 : 50 * (2**try) # 0ms, 100ms, 200ms, 400ms, … if wait_ms >= max_wait_time_ms && remain_ms > 1 @logger.debug "Capping #{wait_ms}ms to #{max_wait_time_ms-1}ms." sleep (max_wait_time_ms-1)/1000.0 remain_ms = 1 elsif try == 0 && remain_ms == 0 @logger.debug "No time remains for polling messages. Will try on next call." return nil elsif wait_ms >= remain_ms @logger.error "Only #{remain_ms}ms left, but want to wait for #{wait_ms}ms before poll. Will retry on next call." @previous_retries = try return nil elsif wait_ms > 0 sleep wait_ms/1000.0 remain_ms -= wait_ms end poll_current_consumer(remain_ms) rescue Rdkafka::RdkafkaError => e try += 1 @instrumenter.instrument("poll_retry", try: try, rdkafka_time_limit: remain_ms, exception: e) @logger.error "(try #{try}/#{MAX_POLL_TRIES}): Error for topic subscription #{current_subscription}: #{e}" raise if try >= MAX_POLL_TRIES retry end
# File lib/racecar/consumer_set.rb, line 209 def rdkafka_config(subscription) # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md config = { "auto.commit.interval.ms" => @config.offset_commit_interval * 1000, "auto.offset.reset" => subscription.start_from_beginning ? "earliest" : "largest", "bootstrap.servers" => @config.brokers.join(","), "client.id" => @config.client_id, "enable.partition.eof" => false, "fetch.max.bytes" => @config.max_bytes, "message.max.bytes" => subscription.max_bytes_per_partition, "fetch.min.bytes" => @config.fetch_min_bytes, "fetch.wait.max.ms" => @config.max_wait_time_ms, "group.id" => @config.group_id, "heartbeat.interval.ms" => @config.heartbeat_interval * 1000, "max.poll.interval.ms" => @config.max_poll_interval * 1000, "queued.min.messages" => @config.min_message_queue_size, "session.timeout.ms" => @config.session_timeout * 1000, "socket.timeout.ms" => @config.socket_timeout * 1000, "statistics.interval.ms" => @config.statistics_interval_ms } config.merge! @config.rdkafka_consumer config.merge! subscription.additional_config config end
# File lib/racecar/consumer_set.rb, line 234 def remaining_time_ms(limit_ms, started_at_time) r = limit_ms - ((Time.now - started_at_time)*1000).round r <= 0 ? 0 : r end
# File lib/racecar/consumer_set.rb, line 183 def reset_current_consumer current_consumer_id = @consumer_id_iterator.peek @logger.info "Resetting consumer with id: #{current_consumer_id}" consumer = @consumers[current_consumer_id] consumer.close unless consumer.nil? @consumers[current_consumer_id] = nil end
# File lib/racecar/consumer_set.rb, line 198 def select_next_consumer @consumer_id_iterator.next end