class Karafka::Connection::Listener

A single listener that listens to incoming messages from a single route @note It does not loop on itself - it needs to be executed in a loop @note Listener itself does nothing with the message - it will return to the block

a raw Kafka::FetchedMessage

Public Class Methods

new(consumer_group) click to toggle source

@param consumer_group [Karafka::Routing::ConsumerGroup] consumer group that holds details

on what topics and with what settings should we listen

@return [Karafka::Connection::Listener] listener instance

# File lib/karafka/connection/listener.rb, line 13
def initialize(consumer_group)
  @consumer_group = consumer_group
end

Public Instance Methods

call() click to toggle source

Runs prefetch callbacks and executes the main listener fetch loop

# File lib/karafka/connection/listener.rb, line 18
def call
  Karafka.monitor.instrument(
    'connection.listener.before_fetch_loop',
    consumer_group: @consumer_group,
    client: client
  )
  fetch_loop
end

Private Instance Methods

client() click to toggle source

@return [Karafka::Connection::Client] wrapped kafka consuming client for a given topic

consumption
# File lib/karafka/connection/listener.rb, line 66
def client
  @client ||= Client.new(@consumer_group)
end
fetch_loop() click to toggle source

Opens connection, gets messages and calls a block for each of the incoming messages @note We catch all the errors here, so they don't affect other listeners (or this one)

so we will be able to listen and consume other incoming messages.
Since it is run inside Karafka::Connection::ActorCluster - catching all the exceptions
won't crash the whole cluster. Here we mostly focus on catching the exceptions related to
Kafka connections / Internet connection issues / Etc. Business logic problems should not
propagate this far
# File lib/karafka/connection/listener.rb, line 36
def fetch_loop
  # @note What happens here is a delegation of processing to a proper processor based
  #   on the incoming messages characteristics
  client.fetch_loop do |raw_data, type|
    Karafka.monitor.instrument('connection.listener.fetch_loop')

    case type
    when :message
      MessageDelegator.call(@consumer_group.id, raw_data)
    when :batch
      BatchDelegator.call(@consumer_group.id, raw_data)
    end
  end
  # This is on purpose - see the notes for this method
  # rubocop:disable Lint/RescueException
rescue Exception => e
  Karafka.monitor.instrument('connection.listener.fetch_loop.error', caller: self, error: e)
  # rubocop:enable Lint/RescueException
  # We can stop client without a problem, as it will reinitialize itself when running the
  # `fetch_loop` again
  @client.stop
  # We need to clear the consumers cache for current connection when fatal error happens and
  # we reset the connection. Otherwise for consumers with manual offset management, the
  # persistence might have stored some data that would be reprocessed
  Karafka::Persistence::Consumers.clear
  sleep(@consumer_group.reconnect_timeout) && retry
end