class Karafka::Routing::Topic
Topic
stores all the details on how we should interact with Kafka given topic It belongs to a consumer group as from 0.6 all the topics can work in the same consumer group It is a part of Karafka's DSL
Attributes
Public Class Methods
@param [String, Symbol] name of a topic on which we want to listen @param consumer_group
[Karafka::Routing::ConsumerGroup] owning consumer group of this topic
# File lib/karafka/routing/topic.rb, line 19 def initialize(name, consumer_group) @name = name.to_s @consumer_group = consumer_group @attributes = {} # @note We use identifier related to the consumer group that owns a topic, because from # Karafka 0.6 we can handle multiple Kafka instances with the same process and we can # have same topic name across multiple Kafkas @id = "#{consumer_group.id}_#{@name}" end
Public Instance Methods
Initializes default values for all the options that support defaults if their values are not yet specified. This is need to be done (cannot be lazy loaded on first use) because everywhere except Karafka
server command, those would not be initialized on time - for example for Sidekiq
# File lib/karafka/routing/topic.rb, line 33 def build Karafka::AttributesMap.topic.each { |attr| send(attr) } self end
@return [Class, nil] Class (not an instance) of a responder that should respond from
consumer back to Kafka (useful for piping data flows)
# File lib/karafka/routing/topic.rb, line 40 def responder @responder ||= Karafka::Responders::Builder.new(consumer).build end
@return [Hash] hash with all the topic attributes @note This is being used when we validate the consumer_group
and its topics
# File lib/karafka/routing/topic.rb, line 50 def to_h map = Karafka::AttributesMap.topic.map do |attribute| [attribute, public_send(attribute)] end Hash[map].merge!( id: id, consumer: consumer ) end