class Phobos::Listener

rubocop:disable Metrics/ParameterLists, Metrics/ClassLength

Constants

DEFAULT_MAX_BYTES_PER_PARTITION
DELIVERY_OPTS

Attributes

consumer[R]
encoding[R]
group_id[R]
handler_class[R]
id[R]
topic[R]

Public Class Methods

new(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil, force_encoding: nil, start_from_beginning: true, backoff: nil, delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION, session_timeout: nil, offset_commit_interval: nil, heartbeat_interval: nil, offset_commit_threshold: nil, offset_retention_time: nil) click to toggle source

rubocop:disable Metrics/MethodLength

# File lib/phobos/listener.rb, line 16
def initialize(handler:, group_id:, topic:, min_bytes: nil, max_wait_time: nil,
               force_encoding: nil, start_from_beginning: true, backoff: nil,
               delivery: 'batch', max_bytes_per_partition: DEFAULT_MAX_BYTES_PER_PARTITION,
               session_timeout: nil, offset_commit_interval: nil,
               heartbeat_interval: nil, offset_commit_threshold: nil,
               offset_retention_time: nil)
  @id = SecureRandom.hex[0...6]
  @handler_class = handler
  @group_id = group_id
  @topic = topic
  @backoff = backoff
  @delivery = delivery.to_s
  @subscribe_opts = {
    start_from_beginning: start_from_beginning, max_bytes_per_partition: max_bytes_per_partition
  }
  @kafka_consumer_opts = compact(
    session_timeout: session_timeout, offset_retention_time: offset_retention_time,
    offset_commit_interval: offset_commit_interval, heartbeat_interval: heartbeat_interval,
    offset_commit_threshold: offset_commit_threshold
  )
  @encoding = Encoding.const_get(force_encoding.to_sym) if force_encoding
  @message_processing_opts = compact(min_bytes: min_bytes, max_wait_time: max_wait_time)
  @kafka_client = Phobos.create_kafka_client(:consumer)
  @producer_enabled = @handler_class.ancestors.include?(Phobos::Producer)
end

Public Instance Methods

create_exponential_backoff() click to toggle source
# File lib/phobos/listener.rb, line 73
def create_exponential_backoff
  Phobos.create_exponential_backoff(@backoff)
end
send_heartbeat_if_necessary() click to toggle source
# File lib/phobos/listener.rb, line 81
def send_heartbeat_if_necessary
  raise Phobos::AbortError if should_stop?

  @consumer&.send_heartbeat_if_necessary
end
should_stop?() click to toggle source
# File lib/phobos/listener.rb, line 77
def should_stop?
  @signal_to_stop == true
end
start() click to toggle source

rubocop:enable Metrics/MethodLength

# File lib/phobos/listener.rb, line 43
def start
  @signal_to_stop = false

  start_listener

  begin
    start_consumer_loop
  rescue Kafka::ProcessingError, Phobos::AbortError
    # Abort is an exception to prevent the consumer from committing the offset.
    # Since "listener" had a message being retried while "stop" was called
    # it's wise to not commit the batch offset to avoid data loss. This will
    # cause some messages to be reprocessed
    instrument('listener.retry_aborted', listener_metadata) do
      log_info('Retry loop aborted, listener is shutting down', listener_metadata)
    end
  end
ensure
  stop_listener
end
stop() click to toggle source
# File lib/phobos/listener.rb, line 63
def stop
  return if should_stop?

  instrument('listener.stopping', listener_metadata) do
    log_info('Listener stopping', listener_metadata)
    @consumer&.stop
    @signal_to_stop = true
  end
end

Private Instance Methods

compact(hash) click to toggle source
# File lib/phobos/listener.rb, line 187
def compact(hash)
  hash.delete_if { |_, v| v.nil? }
end
consume_each_batch() click to toggle source
# File lib/phobos/listener.rb, line 137
def consume_each_batch
  @consumer.each_batch(**@message_processing_opts) do |batch|
    batch_processor = Phobos::Actions::ProcessBatch.new(
      listener: self,
      batch: batch,
      listener_metadata: listener_metadata
    )

    batch_processor.execute
    log_debug('Committed offset', batch_processor.metadata)
    return nil if should_stop?
  end
end
consume_each_batch_inline() click to toggle source
# File lib/phobos/listener.rb, line 151
def consume_each_batch_inline
  @consumer.each_batch(**@message_processing_opts) do |batch|
    batch_processor = Phobos::Actions::ProcessBatchInline.new(
      listener: self,
      batch: batch,
      metadata: listener_metadata
    )

    batch_processor.execute
    log_debug('Committed offset', batch_processor.metadata)
    return nil if should_stop?
  end
end
consume_each_message() click to toggle source
# File lib/phobos/listener.rb, line 165
def consume_each_message
  @consumer.each_message(**@message_processing_opts) do |message|
    message_processor = Phobos::Actions::ProcessMessage.new(
      listener: self,
      message: message,
      listener_metadata: listener_metadata
    )

    message_processor.execute
    log_debug('Committed offset', message_processor.metadata)
    return nil if should_stop?
  end
end
create_kafka_consumer() click to toggle source
# File lib/phobos/listener.rb, line 179
def create_kafka_consumer
  configs = Phobos.config.consumer_hash.select do |k|
    Constants::KAFKA_CONSUMER_OPTS.include?(k)
  end
  configs.merge!(@kafka_consumer_opts)
  @kafka_client.consumer(**{ group_id: group_id }.merge(configs))
end
listener_metadata() click to toggle source
# File lib/phobos/listener.rb, line 89
def listener_metadata
  { listener_id: id, group_id: group_id, topic: topic, handler: handler_class.to_s }
end
start_consumer_loop() click to toggle source
# File lib/phobos/listener.rb, line 125
def start_consumer_loop
  # validate batch handling
  case @delivery
  when 'batch'
    consume_each_batch
  when 'inline_batch'
    consume_each_batch_inline
  else
    consume_each_message
  end
end
start_listener() click to toggle source
# File lib/phobos/listener.rb, line 93
def start_listener
  instrument('listener.start', listener_metadata) do
    @consumer = create_kafka_consumer
    @consumer.subscribe(topic, **@subscribe_opts)

    # This is done here because the producer client is bound to the current thread and
    # since "start" blocks a thread might be used to call it
    @handler_class.producer.configure_kafka_client(@kafka_client) if @producer_enabled

    instrument('listener.start_handler', listener_metadata) do
      @handler_class.start(@kafka_client)
    end
    log_info('Listener started', listener_metadata)
  end
end
stop_listener() click to toggle source
# File lib/phobos/listener.rb, line 109
def stop_listener
  instrument('listener.stop', listener_metadata) do
    instrument('listener.stop_handler', listener_metadata) { @handler_class.stop }

    @consumer&.stop

    if @producer_enabled
      @handler_class.producer.async_producer_shutdown
      @handler_class.producer.configure_kafka_client(nil)
    end

    @kafka_client.close
    log_info('Listener stopped', listener_metadata) if should_stop?
  end
end