module WaterDrop::Producer::Async

Component for asynchronous producer operations

Public Instance Methods

produce_async(message) click to toggle source

Produces a message to Kafka and does not wait for results

@param message [Hash] hash that complies with the {Contracts::Message} contract

@return [Rdkafka::Producer::DeliveryHandle] delivery handle that might return the report

@raise [Rdkafka::RdkafkaError] When adding the message to rdkafka’s queue failed @raise [Errors::MessageInvalidError] When provided message details are invalid and the

message could not be sent to Kafka
# File lib/waterdrop/producer/async.rb, line 16
def produce_async(message)
  message = middleware.run(message)
  validate_message!(message)

  @monitor.instrument(
    'message.produced_async',
    producer_id: id,
    message: message
  ) { produce(message) }
rescue *SUPPORTED_FLOW_ERRORS => e
  # We use this syntax here because we want to preserve the original `#cause` when we
  # instrument the error and there is no way to manually assign `#cause` value
  begin
    raise Errors::ProduceError, e.inspect
  rescue Errors::ProduceError => ex
    @monitor.instrument(
      'error.occurred',
      producer_id: id,
      message: message,
      error: ex,
      type: 'message.produce_async'
    )

    raise ex
  end
end
produce_many_async(messages) click to toggle source

Produces many messages to Kafka and does not wait for them to be delivered

@param messages [Array<Hash>] array with messages that comply with the

{Contracts::Message} contract

@return [Array<Rdkafka::Producer::DeliveryHandle>] deliveries handles

@raise [Rdkafka::RdkafkaError] When adding the messages to rdkafka’s queue failed @raise [Errors::MessageInvalidError] When any of the provided messages details are invalid

and the message could not be sent to Kafka
# File lib/waterdrop/producer/async.rb, line 53
def produce_many_async(messages)
  dispatched = []
  messages = middleware.run_many(messages)
  messages.each { |message| validate_message!(message) }

  @monitor.instrument(
    'messages.produced_async',
    producer_id: id,
    messages: messages
  ) do
    with_transaction_if_transactional do
      messages.each do |message|
        dispatched << produce(message)
      end
    end

    dispatched
  end
rescue *SUPPORTED_FLOW_ERRORS => e
  re_raised = Errors::ProduceManyError.new(dispatched, e.inspect)

  @monitor.instrument(
    'error.occurred',
    producer_id: id,
    messages: messages,
    dispatched: dispatched,
    error: re_raised,
    type: 'messages.produce_many_async'
  )

  raise re_raised
end