class Karafka::Persistence::Topics
Local cache for routing topics We use it in order not to build string instances and remap incoming topic upon each message / message batches received
Constants
- PERSISTENCE_SCOPE
Thread.current scope under which we store topics data
Public Class Methods
clear()
click to toggle source
Clears the whole topics cache for all the threads This is used for in-development code reloading as we need to get rid of all the preloaded and cached instances of objects to make it work
# File lib/karafka/persistence/topics.rb, line 39 def clear Thread .list .select { |thread| thread[PERSISTENCE_SCOPE] } .each { |thread| thread[PERSISTENCE_SCOPE].clear } end
current()
click to toggle source
@return [Concurrent::Hash] hash with all the topics from given groups
# File lib/karafka/persistence/topics.rb, line 16 def current Thread.current[PERSISTENCE_SCOPE] ||= Concurrent::Hash.new do |hash, key| hash[key] = Concurrent::Hash.new end end
fetch(group_id, raw_topic_name)
click to toggle source
@param group_id [String] group id for which we fetch a topic representation @param raw_topic_name [String] raw topic name (before remapping) for which we fetch a
topic representation
@return [Karafka::Routing::Topics] remapped topic representation that can be used further
on when working with given parameters
# File lib/karafka/persistence/topics.rb, line 27 def fetch(group_id, raw_topic_name) current[group_id][raw_topic_name] ||= begin # We map from incoming topic name, as it might be namespaced, etc. # @see topic_mapper internal docs mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name) Routing::Router.find("#{group_id}_#{mapped_topic_name}") end end