class Karafka::BaseConsumer

Base consumer from which all Karafka consumers should inherit

Attributes

params_batch[RW]

@return [Karafka::Params:ParamsBatch] current params batch

topic[R]

@return [Karafka::Routing::Topic] topic to which a given consumer is subscribed

Public Class Methods

new(topic) click to toggle source

Assigns a topic to a consumer and builds up proper consumer functionalities

so that it can cooperate with the topic settings

@param topic [Karafka::Routing::Topic]

# File lib/karafka/base_consumer.rb, line 31
def initialize(topic)
  @topic = topic
  Consumers::Includer.call(self)
end

Public Instance Methods

call() click to toggle source

Executes the default consumer flow.

# File lib/karafka/base_consumer.rb, line 37
def call
  process
end

Private Instance Methods

client() click to toggle source

@return [Karafka::Connection::Client] messages consuming client that can be used to

commit manually offset or pause / stop consumer based on the business logic
# File lib/karafka/base_consumer.rb, line 45
def client
  Persistence::Client.read
end
consume() click to toggle source

Method that will perform business logic and on data received from Kafka (it will consume

the data)

@note This method needs bo be implemented in a subclass. We stub it here as a failover if

someone forgets about it or makes on with typo
# File lib/karafka/base_consumer.rb, line 53
def consume
  raise NotImplementedError, 'Implement this in a subclass'
end