module Karafka::Patches::RubyKafka
Patches
for Ruby Kafka gem
Public Instance Methods
consumer_loop() { || ... }
click to toggle source
This patch allows us to inject business logic in between fetches and before the consumer stop, so we can perform stop commit or anything else that we need since ruby-kafka fetch loop does not allow that directly We don't won't to use poll ruby-kafka api as it brings many more problems that we would have to take care of. That way, nothing like that ever happens but we get the control over the stopping process that we need (since we're the once that initiate it for each thread)
Calls superclass method
# File lib/karafka/patches/ruby_kafka.rb, line 15 def consumer_loop super do consumers = Karafka::Persistence::Consumers .current .values .flat_map(&:values) .select { |consumer| consumer.class.respond_to?(:after_fetch) } if Karafka::App.stopping? publish_event(consumers, 'before_stop') Karafka::Persistence::Client.read.stop else publish_event(consumers, 'before_poll') yield publish_event(consumers, 'after_poll') end end end
Private Instance Methods
publish_event(consumers, event_name)
click to toggle source
Notifies consumers about particular events happening @param consumers [Array<Object>] all consumers that want to be notified about an event @param event_name [String] name of the event that happened
# File lib/karafka/patches/ruby_kafka.rb, line 39 def publish_event(consumers, event_name) consumers.each do |consumer| key = "consumers.#{Helpers::Inflector.map(consumer.class.to_s)}.#{event_name}" Karafka::App.monitor.instrument(key, context: consumer) end end