class Karafka::Persistence::Topics

Local cache for routing topics We use it in order not to build string instances and remap incoming topic upon each message / message batches received

Constants

PERSISTENCE_SCOPE

Thread.current scope under which we store topics data

Public Class Methods

clear() click to toggle source

Clears the whole topics cache for all the threads This is used for in-development code reloading as we need to get rid of all the preloaded and cached instances of objects to make it work

# File lib/karafka/persistence/topics.rb, line 39
def clear
  Thread
    .list
    .select { |thread| thread[PERSISTENCE_SCOPE] }
    .each { |thread| thread[PERSISTENCE_SCOPE].clear }
end
current() click to toggle source

@return [Concurrent::Hash] hash with all the topics from given groups

# File lib/karafka/persistence/topics.rb, line 16
def current
  Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key|
    hash[key] = Concurrent::Hash.new
  end
end
fetch(group_id, raw_topic_name) click to toggle source

@param group_id [String] group id for which we fetch a topic representation @param raw_topic_name [String] raw topic name (before remapping) for which we fetch a

topic representation

@return [Karafka::Routing::Topics] remapped topic representation that can be used further

on when working with given parameters
# File lib/karafka/persistence/topics.rb, line 27
def fetch(group_id, raw_topic_name)
  current[group_id][raw_topic_name] ||= begin
    # We map from incoming topic name, as it might be namespaced, etc.
    # @see topic_mapper internal docs
    mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name)
    Routing::Router.find("#{group_id}_#{mapped_topic_name}")
  end
end