class Karafka::BaseWorker
Worker wrapper for Sidekiq workers
Public Class Methods
Returns the base worker class for application.
@return [Class] first worker that inherited from Karafka::BaseWorker
. Karafka
assumes that it is the base worker for an application.
@raise [Karafka::Errors::BaseWorkerDescentantMissing] raised when application
base worker was not defined.
# File lib/karafka/base_worker.rb, line 15 def base_worker @inherited || raise(Errors::BaseWorkerDescentantMissing) end
@param subclass [Class] subclass of the worker @return [Class] subclass of the worker that was selected
# File lib/karafka/base_worker.rb, line 21 def inherited(subclass) super @inherited ||= subclass end
Public Instance Methods
Executes the logic that lies in perform
Karafka
consumer method @param topic_id [String] Unique topic id that we will use to find a proper topic @param params_batch [Array<Hash>] Array with messages batch @param metadata [Hash, nil] hash with all the metadata or nil if not present
# File lib/karafka/base_worker.rb, line 31 def perform(topic_id, params_batch, metadata) consumer = consumer(topic_id, params_batch, metadata) Karafka.monitor.instrument( 'backends.sidekiq.base_worker.perform', caller: self, consumer: consumer ) { consumer.consume } end
Private Instance Methods
@see `#perform` for exact params descriptions @param topic_id [String] @param params_batch [Array<Hash>] @param metadata [Hash, nil] @return [Karafka::Consumer] descendant of Karafka::BaseConsumer that matches the topic
with params_batch assigned already (consumer is ready to use)
# File lib/karafka/base_worker.rb, line 49 def consumer(topic_id, params_batch, metadata) topic = Karafka::Routing::Router.find(topic_id) consumer = topic.consumer.new(topic) consumer.params_batch = Params::Builders::ParamsBatch.from_array( topic.interchanger.decode(params_batch), topic ) if topic.batch_fetching consumer.batch_metadata = Params::Builders::BatchMetadata.from_hash( metadata, topic ) end consumer end