module WaterDrop::Producer::Sync

Component for synchronous producer operations

Public Instance Methods

produce_many_sync(messages) click to toggle source

Produces many messages to Kafka and waits for them to be delivered

@param messages [Array<Hash>] array with messages that comply with the

{Contracts::Message} contract

@return [Array<Rdkafka::Producer::DeliveryReport>] delivery reports

@raise [Rdkafka::RdkafkaError] When adding the messages to rdkafka’s queue failed @raise [Rdkafka::Producer::WaitTimeoutError] When the timeout has been reached and some

handles are still pending

@raise [Errors::MessageInvalidError] When any of the provided messages details are invalid

and the message could not be sent to Kafka
# File lib/waterdrop/producer/sync.rb, line 59
def produce_many_sync(messages)
  messages = middleware.run_many(messages)
  messages.each { |message| validate_message!(message) }

  dispatched = []

  @monitor.instrument('messages.produced_sync', producer_id: id, messages: messages) do
    with_transaction_if_transactional do
      messages.each do |message|
        dispatched << produce(message)
      end
    end

    dispatched.map! do |handler|
      wait(handler)
    end

    dispatched
  end
rescue *SUPPORTED_FLOW_ERRORS => e
  re_raised = Errors::ProduceManyError.new(dispatched, e.inspect)

  @monitor.instrument(
    'error.occurred',
    producer_id: id,
    messages: messages,
    dispatched: dispatched,
    error: re_raised,
    type: 'messages.produce_many_sync'
  )

  raise re_raised
end
produce_sync(message) click to toggle source

Produces a message to Kafka and waits for it to be delivered

@param message [Hash] hash that complies with the {Contracts::Message} contract

@return [Rdkafka::Producer::DeliveryReport] delivery report

@raise [Rdkafka::RdkafkaError] When adding the message to rdkafka’s queue failed @raise [Rdkafka::Producer::WaitTimeoutError] When the timeout has been reached and the

handle is still pending

@raise [Errors::MessageInvalidError] When provided message details are invalid and the

message could not be sent to Kafka
# File lib/waterdrop/producer/sync.rb, line 18
def produce_sync(message)
  message = middleware.run(message)
  validate_message!(message)

  @monitor.instrument(
    'message.produced_sync',
    producer_id: id,
    message: message
  ) do
    wait(produce(message))
  end
rescue *SUPPORTED_FLOW_ERRORS => e
  # We use this syntax here because we want to preserve the original `#cause` when we
  # instrument the error and there is no way to manually assign `#cause` value
  begin
    raise Errors::ProduceError, e.inspect
  rescue Errors::ProduceError => ex
    @monitor.instrument(
      'error.occurred',
      producer_id: id,
      message: message,
      error: ex,
      type: 'message.produce_sync'
    )

    raise ex
  end
end