class Karafka::BaseWorker

Worker wrapper for Sidekiq workers

Public Class Methods

base_worker() click to toggle source

Returns the base worker class for application.

@return [Class] first worker that inherited from Karafka::BaseWorker. Karafka

assumes that it is the base worker for an application.

@raise [Karafka::Errors::BaseWorkerDescentantMissing] raised when application

base worker was not defined.
# File lib/karafka/base_worker.rb, line 15
def base_worker
  @inherited || raise(Errors::BaseWorkerDescentantMissing)
end
inherited(subclass) click to toggle source

@param subclass [Class] subclass of the worker @return [Class] subclass of the worker that was selected

Calls superclass method
# File lib/karafka/base_worker.rb, line 21
def inherited(subclass)
  super
  @inherited ||= subclass
end

Public Instance Methods

perform(topic_id, params_batch, metadata) click to toggle source

Executes the logic that lies in perform Karafka consumer method @param topic_id [String] Unique topic id that we will use to find a proper topic @param params_batch [Array<Hash>] Array with messages batch @param metadata [Hash, nil] hash with all the metadata or nil if not present

# File lib/karafka/base_worker.rb, line 31
def perform(topic_id, params_batch, metadata)
  consumer = consumer(topic_id, params_batch, metadata)

  Karafka.monitor.instrument(
    'backends.sidekiq.base_worker.perform',
    caller: self,
    consumer: consumer
  ) { consumer.consume }
end

Private Instance Methods

consumer(topic_id, params_batch, metadata) click to toggle source

@see `#perform` for exact params descriptions @param topic_id [String] @param params_batch [Array<Hash>] @param metadata [Hash, nil] @return [Karafka::Consumer] descendant of Karafka::BaseConsumer that matches the topic

with params_batch assigned already (consumer is ready to use)
# File lib/karafka/base_worker.rb, line 49
def consumer(topic_id, params_batch, metadata)
  topic = Karafka::Routing::Router.find(topic_id)
  consumer = topic.consumer.new(topic)
  consumer.params_batch = Params::Builders::ParamsBatch.from_array(
    topic.interchanger.decode(params_batch),
    topic
  )

  if topic.batch_fetching
    consumer.batch_metadata = Params::Builders::BatchMetadata.from_hash(
      metadata,
      topic
    )
  end

  consumer
end