class Phobos::Actions::ProcessBatch
Attributes
metadata[R]
Public Class Methods
new(listener:, batch:, listener_metadata:)
click to toggle source
# File lib/phobos/actions/process_batch.rb, line 10 def initialize(listener:, batch:, listener_metadata:) @listener = listener @batch = batch @listener_metadata = listener_metadata @metadata = listener_metadata.merge( batch_size: batch.messages.count, partition: batch.partition, offset_lag: batch.offset_lag ) end
Public Instance Methods
execute()
click to toggle source
# File lib/phobos/actions/process_batch.rb, line 21 def execute instrument('listener.process_batch', @metadata) do |_metadata| @batch.messages.each do |message| Phobos::Actions::ProcessMessage.new( listener: @listener, message: message, listener_metadata: @listener_metadata ).execute @listener.consumer.trigger_heartbeat end end end