class Karafka::BaseConsumer
Base consumer from which all Karafka
consumers should inherit
Attributes
params_batch[RW]
@return [Karafka::Params:ParamsBatch] current params batch
topic[R]
@return [Karafka::Routing::Topic] topic to which a given consumer is subscribed
Public Class Methods
new(topic)
click to toggle source
Assigns a topic to a consumer and builds up proper consumer functionalities
so that it can cooperate with the topic settings
@param topic [Karafka::Routing::Topic]
# File lib/karafka/base_consumer.rb, line 31 def initialize(topic) @topic = topic Consumers::Includer.call(self) end
Public Instance Methods
call()
click to toggle source
Executes the default consumer flow.
# File lib/karafka/base_consumer.rb, line 37 def call process end
Private Instance Methods
client()
click to toggle source
@return [Karafka::Connection::Client] messages consuming client that can be used to
commit manually offset or pause / stop consumer based on the business logic
# File lib/karafka/base_consumer.rb, line 45 def client Persistence::Client.read end
consume()
click to toggle source
Method that will perform business logic and on data received from Kafka (it will consume
the data)
@note This method needs bo be implemented in a subclass. We stub it here as a failover if
someone forgets about it or makes on with typo
# File lib/karafka/base_consumer.rb, line 53 def consume raise NotImplementedError, 'Implement this in a subclass' end