module Karafka::Connection::MessageDelegator
Class that delegates processing of a single received message for which we listen to a proper processor
Public Class Methods
call(group_id, kafka_message)
click to toggle source
Delegates message (does something with it) It will either schedule or run a proper processor action for the incoming message @param group_id [String] group_id of a group from which a given message came @param kafka_message [<Kafka::FetchedMessage>] raw message from kafka @note This should be looped to obtain a constant delegating of new messages
# File lib/karafka/connection/message_delegator.rb, line 14 def call(group_id, kafka_message) topic = Persistence::Topics.fetch(group_id, kafka_message.topic) consumer = Persistence::Consumers.fetch(topic, kafka_message.partition) Karafka.monitor.instrument( 'connection.message_delegator.call', caller: self, consumer: consumer, kafka_message: kafka_message ) do # @note We always get a single message within single delegator, which means that # we don't care if user marked it as a batch consumed or not. consumer.params_batch = Params::Builders::ParamsBatch.from_kafka_messages( [kafka_message], topic ) consumer.call end end