class Phobos::Actions::ProcessBatchInline
Attributes
metadata[R]
Public Class Methods
new(listener:, batch:, metadata:)
click to toggle source
# File lib/phobos/actions/process_batch_inline.rb, line 13 def initialize(listener:, batch:, metadata:) @listener = listener @batch = batch @listener = listener @batch = batch @metadata = metadata.merge( batch_size: batch.messages.count, partition: batch.partition, offset_lag: batch.offset_lag, retry_count: 0 ) end
Public Instance Methods
execute()
click to toggle source
# File lib/phobos/actions/process_batch_inline.rb, line 26 def execute batch = @batch.messages.map { |message| instantiate_batch_message(message) } begin process_batch(batch) rescue StandardError => e handle_error(e, 'listener.retry_handler_error_batch', "error processing inline batch, waiting #{backoff_interval}s") retry end end
Private Instance Methods
instantiate_batch_message(message)
click to toggle source
# File lib/phobos/actions/process_batch_inline.rb, line 40 def instantiate_batch_message(message) Phobos::BatchMessage.new( key: message.key, partition: message.partition, offset: message.offset, payload: force_encoding(message.value), headers: message.headers ) end
process_batch(batch)
click to toggle source
# File lib/phobos/actions/process_batch_inline.rb, line 50 def process_batch(batch) instrument('listener.process_batch_inline', @metadata) do |_metadata| handler = @listener.handler_class.new handler.around_consume_batch(batch, @metadata) do |around_batch, around_metadata| handler.consume_batch(around_batch, around_metadata) end end end