class Karafka::Connection::Listener
A single listener that listens to incoming messages from a single route @note It does not loop on itself - it needs to be executed in a loop @note Listener
itself does nothing with the message - it will return to the block
a raw Kafka::FetchedMessage
Public Class Methods
new(consumer_group)
click to toggle source
@param consumer_group [Karafka::Routing::ConsumerGroup] consumer group that holds details
on what topics and with what settings should we listen
@return [Karafka::Connection::Listener] listener instance
# File lib/karafka/connection/listener.rb, line 13 def initialize(consumer_group) @consumer_group = consumer_group end
Public Instance Methods
call()
click to toggle source
Runs prefetch callbacks and executes the main listener fetch loop
# File lib/karafka/connection/listener.rb, line 18 def call Karafka.monitor.instrument( 'connection.listener.before_fetch_loop', consumer_group: @consumer_group, client: client ) fetch_loop end
Private Instance Methods
client()
click to toggle source
@return [Karafka::Connection::Client] wrapped kafka consuming client for a given topic
consumption
# File lib/karafka/connection/listener.rb, line 66 def client @client ||= Client.new(@consumer_group) end
fetch_loop()
click to toggle source
Opens connection, gets messages and calls a block for each of the incoming messages @note We catch all the errors here, so they don't affect other listeners (or this one)
so we will be able to listen and consume other incoming messages. Since it is run inside Karafka::Connection::ActorCluster - catching all the exceptions won't crash the whole cluster. Here we mostly focus on catching the exceptions related to Kafka connections / Internet connection issues / Etc. Business logic problems should not propagate this far
# File lib/karafka/connection/listener.rb, line 36 def fetch_loop # @note What happens here is a delegation of processing to a proper processor based # on the incoming messages characteristics client.fetch_loop do |raw_data, type| Karafka.monitor.instrument('connection.listener.fetch_loop') case type when :message MessageDelegator.call(@consumer_group.id, raw_data) when :batch BatchDelegator.call(@consumer_group.id, raw_data) end end # This is on purpose - see the notes for this method # rubocop:disable Lint/RescueException rescue Exception => e Karafka.monitor.instrument('connection.listener.fetch_loop.error', caller: self, error: e) # rubocop:enable Lint/RescueException # We can stop client without a problem, as it will reinitialize itself when running the # `fetch_loop` again @client.stop # We need to clear the consumers cache for current connection when fatal error happens and # we reset the connection. Otherwise for consumers with manual offset management, the # persistence might have stored some data that would be reprocessed Karafka::Persistence::Consumers.clear sleep(@consumer_group.reconnect_timeout) && retry end