module Karafka::AttributesMap
Both Karafka
and Ruby-Kafka contain a lot of settings that can be applied on multiple levels. In Karafka
that is on consumer group and on the topic level. In Ruby-Kafka it is on consumer, subscription and consumption levels. In order to maintain an order in managing those settings, this module was created. It contains details on what setting where should go and which layer (both on Karafka
and Ruby-Kafka) is responsible for setting it and sending it forward @note Settings presented here cover all the settings that are being used across Karafka
Public Class Methods
What settings should go where in ruby-kafka @return [Hash] hash with proper sections on what to proxy where in Ruby-Kafka @note All other settings will be passed to Kafka.new method invocation.
All elements in this hash are just edge cases
# File lib/karafka/attributes_map.rb, line 17 def api_adapter { consumer: %i[ session_timeout offset_commit_interval offset_commit_threshold offset_retention_time heartbeat_interval fetcher_max_queue_size assignment_strategy ], subscribe: %i[start_from_beginning max_bytes_per_partition], consumption: %i[min_bytes max_bytes max_wait_time], pause: %i[pause_timeout pause_max_timeout pause_exponential_backoff], # All the options that are under kafka config namespace, but are not used # directly with kafka api, but from the Karafka user perspective, they are # still related to kafka. They should not be proxied anywhere ignored: %i[reconnect_timeout automatically_mark_as_consumed] } end
@return [Array<Symbol>] properties that can be set on a per consumer group level @note Note that there are settings directly extracted from the config kafka namespace
I did this that way, so I won't have to repeat same setting keys over and over again Thanks to this solution, if any new setting is available for ruby-kafka, we just need to add it to our configuration class and it will be handled automatically.
# File lib/karafka/attributes_map.rb, line 50 def consumer_group # @note We don't ignore the api_adapter[:ignored] values as they should be ignored # only when proxying details go ruby-kafka. We use ignored fields internally in karafka ignored_settings = api_adapter[:subscribe] defined_settings = api_adapter.values.flatten karafka_settings = %i[batch_fetching] dynamically_proxied = Karafka::Setup::Config.config.kafka.to_h.keys (defined_settings + dynamically_proxied).uniq + karafka_settings - ignored_settings end
@return [Array<Symbol>] properties that can be set on a per topic level
# File lib/karafka/attributes_map.rb, line 35 def topic (api_adapter[:subscribe] + %i[ backend name deserializer responder batch_consuming ]).uniq end