class Karafka::Server
Karafka
consuming server class
Constants
- FORCEFUL_EXIT_CODE
What system exit code should we use when we terminated forcefully
- SUPERVISION_CHECK_FACTOR
This factor allows us to calculate how many times we have to sleep before a forceful shutdown
- SUPERVISION_SLEEP
How long should we sleep between checks on shutting down consumers
Attributes
Writer for list of consumer groups that we want to consume in our current process context
Set of consuming threads. Each consumer thread contains a single consumer
Public Class Methods
@return [Array<String>] array with names of consumer groups that should be consumed in a
current server context
# File lib/karafka/server.rb, line 35 def consumer_groups # If not specified, a server will listed on all the topics @consumer_groups ||= Karafka::App.consumer_groups.map(&:name).freeze end
Method which runs app
# File lib/karafka/server.rb, line 26 def run process.on_sigint { stop_supervised } process.on_sigquit { stop_supervised } process.on_sigterm { stop_supervised } run_supervised end
Private Class Methods
@return [Karafka::Process] process wrapper instance used to catch system signal calls
# File lib/karafka/server.rb, line 43 def process Karafka::App.config.internal.process end
Starts Karafka
with a supervision @note We don't need to sleep because Karafka::Fetcher
is locking and waiting to finish loop (and it won't happen until we explicitly want to stop)
# File lib/karafka/server.rb, line 50 def run_supervised process.supervise Karafka::App.run! Karafka::App.config.internal.fetcher.call end
Stops Karafka
with a supervision (as long as there is a shutdown timeout) If consumers won't stop in a given time frame, it will force them to exit
# File lib/karafka/server.rb, line 58 def stop_supervised Karafka::App.stop! # See https://github.com/dry-rb/dry-configurable/issues/93 timeout = Thread.new { Karafka::App.config.shutdown_timeout }.join.value # We check from time to time (for the timeout period) if all the threads finished # their work and if so, we can just return and normal shutdown process will take place (timeout * SUPERVISION_CHECK_FACTOR).to_i.times do if consumer_threads.count(&:alive?).zero? Thread.new { Karafka.monitor.instrument('app.stopped') }.join return end sleep SUPERVISION_SLEEP end raise Errors::ForcefulShutdownError rescue Errors::ForcefulShutdownError => e Thread.new { Karafka.monitor.instrument('app.stopping.error', error: e) }.join # We're done waiting, lets kill them! consumer_threads.each(&:terminate) # exit! is not within the instrumentation as it would not trigger due to exit Kernel.exit! FORCEFUL_EXIT_CODE end