module Karafka::Connection::ApiAdapter

Mapper used to convert our internal settings into ruby-kafka settings based on their API requirements. Since ruby-kafka has more and more options and there are few “levels” on which we have to apply them (despite the fact, that in Karafka you configure all of it in one place), we have to remap it into what ruby-kafka driver requires @note The good thing about Kafka.new method is that it ignores all options that

do nothing. So we don't have to worry about injecting our internal settings
into the client and breaking stuff

Public Class Methods

client(consumer_group) click to toggle source

Builds all the configuration settings for Kafka.new method @param consumer_group [Karafka::Routing::ConsumerGroup] consumer group details @return [Array<Hash>] Array with all the client arguments including hash with all

the settings required by Kafka.new method

@note We return array, so we can inject any arguments we want, in case of changes in the

raw driver
# File lib/karafka/connection/api_adapter.rb, line 22
def client(consumer_group)
  # This one is a default that takes all the settings except special
  # cases defined in the map
  settings = {
    logger: ::Karafka.logger,
    client_id: ::Karafka::App.config.client_id
  }

  kafka_configs.each_key do |setting_name|
    # All options for config adapter should be ignored as we're just interested
    # in what is left, as we want to pass all the options that are "typical"
    # and not listed in the api_adapter special cases mapping. All the values
    # from the api_adapter mapping go somewhere else, not to the client directly
    next if AttributesMap.api_adapter.values.flatten.include?(setting_name)

    # Settings for each consumer group are either defined per consumer group or are
    # inherited from the global/general settings level, thus we don't have to fetch them
    # from the kafka settings as they are already on a consumer group level
    settings[setting_name] = consumer_group.public_send(setting_name)
  end

  settings_hash = sanitize(settings)

  # Normalization for the way Kafka::Client accepts arguments from  0.5.3
  [settings_hash.delete(:seed_brokers), settings_hash]
end
consumer(consumer_group) click to toggle source

Builds all the configuration settings for kafka#consumer method @param consumer_group [Karafka::Routing::ConsumerGroup] consumer group details @return [Hash] all the consumer keyword arguments including hash with all

the settings required by Kafka#consumer
# File lib/karafka/connection/api_adapter.rb, line 53
def consumer(consumer_group)
  settings = { group_id: consumer_group.id }
  settings = fetch_for(:consumer, consumer_group, settings)
  sanitize(settings)
end
consumption(consumer_group) click to toggle source

Builds all the configuration settings for kafka consumer consume_each_batch and

consume_each_message methods

@param consumer_group [Karafka::Routing::ConsumerGroup] consumer group details @return [Hash] hash with all the arguments required by consuming method

including all the settings required by
Kafka::Consumer#consume_each_message and Kafka::Consumer#consume_each_batch method
# File lib/karafka/connection/api_adapter.rb, line 65
def consumption(consumer_group)
  sanitize(
    fetch_for(
      :consumption,
      consumer_group,
      automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed
    )
  )
end
mark_message_as_processed(params) click to toggle source

Remaps topic details taking the topic mapper feature into consideration. @param params [Karafka::Params::Params] params instance @return [Array] array with all the details needed by ruby-kafka to mark message

as processed

@note When default empty topic mapper is used, no need for any conversion as the

internal and external format are exactly the same
# File lib/karafka/connection/api_adapter.rb, line 105
def mark_message_as_processed(params)
  # Majority of users don't use custom topic mappers. No need to change anything when it
  # is a default mapper that does not change anything. Only some cloud providers require
  # topics to be remapped
  return [params.metadata] if Karafka::App.config.topic_mapper.is_a?(
    Karafka::Routing::TopicMapper
  )

  # @note We don't use tap as it is around 13% slower than non-dup version
  dupped = params.metadata.dup
  dupped['topic'] = Karafka::App.config.topic_mapper.outgoing(params.metadata.topic)
  [dupped]
end
pause(topic, partition, consumer_group) click to toggle source

Builds all the configuration settings required by kafka consumer#pause method @param topic [String] topic that we want to pause @param partition [Integer] number partition that we want to pause @param consumer_group [Karafka::Routing::ConsumerGroup] consumer group details @return [Hash] hash with all the details required to pause kafka consumer

# File lib/karafka/connection/api_adapter.rb, line 88
def pause(topic, partition, consumer_group)
  {
    args: [Karafka::App.config.topic_mapper.outgoing(topic), partition],
    kwargs: {
      timeout: consumer_group.pause_timeout,
      max_timeout: consumer_group.pause_max_timeout,
      exponential_backoff: consumer_group.pause_exponential_backoff
    }
  }
end
subscribe(topic) click to toggle source

Builds all the configuration settings for kafka consumer#subscribe method @param topic [Karafka::Routing::Topic] topic that holds details for a given subscription @return [Hash] hash with all the settings required by kafka consumer#subscribe method

# File lib/karafka/connection/api_adapter.rb, line 78
def subscribe(topic)
  settings = fetch_for(:subscribe, topic)
  [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)]
end

Private Class Methods

fetch_for(namespace_key, route_layer, preexisting_settings = {}) click to toggle source

Fetches proper settings for a given map namespace @param namespace_key [Symbol] namespace from attributes map config adapter hash @param route_layer [Object] route topic or consumer group @param preexisting_settings [Hash] hash with some preexisting settings that might have

been loaded in a different way
# File lib/karafka/connection/api_adapter.rb, line 126
def fetch_for(namespace_key, route_layer, preexisting_settings = {})
  kafka_configs.each_key do |setting_name|
    # Ignore settings that are not related to our namespace
    next unless AttributesMap.api_adapter[namespace_key].include?(setting_name)

    # Ignore settings that are already initialized
    # In case they are in preexisting settings fetched differently
    next if preexisting_settings.key?(setting_name)

    # Fetch all the settings from a given layer object. Objects can handle the fallback
    # to the kafka settings, so
    preexisting_settings[setting_name] = route_layer.send(setting_name)
  end

  preexisting_settings
end
kafka_configs() click to toggle source

@return [Hash] Kafka config details as a hash

# File lib/karafka/connection/api_adapter.rb, line 152
def kafka_configs
  ::Karafka::App.config.kafka.to_h
end
sanitize(settings) click to toggle source

Removes nil containing keys from the final settings so it can use Kafkas driver

defaults for those

@param settings [Hash] settings that may contain nil values @return [Hash] settings without nil using keys (non of karafka options should be nil)

# File lib/karafka/connection/api_adapter.rb, line 147
def sanitize(settings)
  settings.reject { |_key, value| value.nil? }
end