module WaterDrop::Producer::Transactions

Transactions related producer functionalities

Constants

CONTRACT

Contract to validate that input for transactional offset storage is correct

NON_RELOADABLE_ERRORS

We should never reload producer if it was fenced, otherwise we could end up with some sort of weird race-conditions

Public Instance Methods

transaction() { || ... } click to toggle source

Creates a transaction.

Karafka transactions work in a similar manner to SQL db transactions though there are some crucial differences. When you start a transaction, all messages produced during it will be delivered together or will fail together. The difference is, that messages from within a single transaction can be delivered and will have a delivery handle but will be then compacted prior to moving the LSO forward. This means, that not every delivery handle for async dispatches will emit a queue purge error. None for sync as the delivery has happened but they will never be visible by the transactional consumers.

Transactions are thread-safe however they lock a mutex. This means, that for high-throughput transactional messages production in multiple threads (for example in Karafka), it may be much better to use few instances that can work in parallel.

Please note, that if a producer is configured as transactional, it cannot produce messages outside of transactions, that is why by default all dispatches will be wrapped with a transaction. One transaction per single dispatch and for ‘produce_many` it will be a single transaction wrapping all messages dispatches (not one per message).

@param block [Proc] block of code that should run @return Block result or ‘nil` in case of early break/return

@example Simple transaction

producer.transaction do
  producer.produce_async(topic: 'topic', payload: 'data')
end

@example Aborted transaction - messages producer won’t be visible by consumers

producer.transaction do
  producer.produce_sync(topic: 'topic', payload: 'data')
  throw(:abort)
end

@example Use block result last handler to wait on all messages ack

handler = producer.transaction do
            producer.produce_async(topic: 'topic', payload: 'data')
          end

handler.wait
# File lib/waterdrop/producer/transactions.rb, line 58
def transaction(&block)
  # This will safely allow us to support one operation transactions so a transactional
  # producer can work without the transactional block if needed
  return yield if @transaction_mutex.owned?

  @transaction_mutex.synchronize do
    transactional_instrument(:finished) do
      with_transactional_error_handling(:begin) do
        transactional_instrument(:started) { client.begin_transaction }
      end

      result, commit = transactional_execute(&block)

      commit || raise(WaterDrop::Errors::AbortTransaction)

      with_transactional_error_handling(:commit) do
        transactional_instrument(:committed) { client.commit_transaction }
      end

      result
    # We need to handle any interrupt including critical in order not to have the transaction
    # running. This will also handle things like `IRB::Abort`
    rescue Exception => e
      # This code is a bit tricky. We have an error and when it happens we try to rollback
      # the transaction. However we may end up in a state where transaction aborting itself
      # produces error. In such case we also want to handle it as fatal and reload client.
      # This is why we catch this here
      begin
        with_transactional_error_handling(:abort) do
          transactional_instrument(:aborted) do
            client.abort_transaction
          end
        end
      rescue StandardError => e
        # If something from rdkafka leaks here, it means there was a non-retryable error that
        # bubbled up. In such cases if we should, we do reload the underling client
        transactional_reload_client_if_needed(e)

        raise
      end

      transactional_reload_client_if_needed(e)

      raise unless e.is_a?(WaterDrop::Errors::AbortTransaction)
    end
  end
end
transaction?() click to toggle source

@return [Boolean] true if we are in an active transaction

# File lib/waterdrop/producer/transactions.rb, line 107
def transaction?
  @transaction_mutex.owned?
end
transaction_mark_as_consumed(consumer, message, offset_metadata = nil) click to toggle source

Marks given message as consumed inside of a transaction.

@param consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain

the librdkafka consumer group metadata pointer

@param message [Karafka::Messages::Message] karafka message @param offset_metadata [String] offset metadata or nil if none

# File lib/waterdrop/producer/transactions.rb, line 124
def transaction_mark_as_consumed(consumer, message, offset_metadata = nil)
  raise Errors::TransactionRequiredError unless @transaction_mutex.owned?

  CONTRACT.validate!(
    {
      consumer: consumer,
      message: message,
      offset_metadata: offset_metadata
    },
    Errors::TransactionalOffsetInvalidError
  )

  details = { message: message, offset_metadata: offset_metadata }

  transactional_instrument(:marked_as_consumed, details) do
    tpl = Rdkafka::Consumer::TopicPartitionList.new
    partition = Rdkafka::Consumer::Partition.new(
      message.partition,
      # +1 because this is next offset from which we will start processing from
      message.offset + 1,
      0,
      offset_metadata
    )

    tpl.add_topic_and_partitions_with_offsets(message.topic, [partition])

    with_transactional_error_handling(:store_offset) do
      client.send_offsets_to_transaction(
        consumer,
        tpl,
        current_variant.max_wait_timeout
      )
    end
  end
end
transactional?() click to toggle source

@return [Boolean] Is this producer a transactional one

# File lib/waterdrop/producer/transactions.rb, line 112
def transactional?
  return @transactional if instance_variable_defined?(:'@transactional')

  @transactional = config.kafka.to_h.key?(:'transactional.id')
end

Private Instance Methods

transactional_execute() { || ... } click to toggle source

Executes the requested code in a transaction with error handling and ensures, that upon early break we rollback the transaction instead of having it dangling and causing an issue where transactional producer would end up in an error state.

# File lib/waterdrop/producer/transactions.rb, line 174
def transactional_execute
  result = nil
  commit = false

  catch(:abort) do
    result = yield
    commit = true
  end

  [result, commit]
rescue Exception => e
  errored = true

  raise e
ensure
  return [result, commit] unless errored
end
transactional_instrument(key, details = EMPTY_HASH, &block) click to toggle source

Instruments the transactional operation with producer id

@param key [Symbol] transaction operation key @param details [Hash] additional instrumentation details @param block [Proc] block to run inside the instrumentation or nothing if not given

# File lib/waterdrop/producer/transactions.rb, line 197
def transactional_instrument(key, details = EMPTY_HASH, &block)
  @monitor.instrument("transaction.#{key}", details.merge(producer_id: id), &block)
end
transactional_reload_client_if_needed(error) click to toggle source

Reloads the underlying client instance if needed and allowed

This should be used only in transactions as only then we can get fatal transactional errors and we can safely reload the client.

@param error [Exception] any error that was raised

@note We only reload on rdkafka errors that are a cause on messages dispatches. Because we reload on any errors where cause is ‘Rdkafka::RdkafkaError` (minus exclusions) this in theory can cause reload if it was something else that raised those in transactions, for example Karafka. This is a trade-off. Since any error anyhow will cause a rollback, putting aside performance implication of closing and reconnecting, this should not be an issue.

# File lib/waterdrop/producer/transactions.rb, line 273
def transactional_reload_client_if_needed(error)
  rd_error = error.is_a?(Rdkafka::RdkafkaError) ? error : error.cause

  return unless rd_error.is_a?(Rdkafka::RdkafkaError)
  return unless config.reload_on_transaction_fatal_error
  return if NON_RELOADABLE_ERRORS.include?(rd_error.code)

  @operating_mutex.synchronize do
    @monitor.instrument(
      'producer.reloaded',
      producer_id: id
    ) do
      @client.flush(current_variant.max_wait_timeout)
      purge
      @client.close
      @client = Builder.new.call(self, @config)
    end
  end
end
with_transaction_if_transactional() { || ... } click to toggle source

Runs provided code with a transaction wrapper if transactions are enabled. This allows us to simplify the async and sync batch dispatchers because we can ensure that their internal dispatches will be wrapped only with a single transaction and not a transaction per message @param block [Proc] code we want to run

# File lib/waterdrop/producer/transactions.rb, line 167
def with_transaction_if_transactional(&block)
  transactional? ? transaction(&block) : yield
end
with_transactional_error_handling(action, allow_abortable: true) { || ... } click to toggle source

Error handling for transactional operations is a bit special. There are three types of errors coming from librdkafka:

- retryable - indicates that a given operation (like offset commit) can be retried after
  a backoff and that is should be operating later as expected. We try to retry those
  few times before finally failing.
- fatal - errors that will not recover no matter what (for example being fenced out)
- abortable - error from which we cannot recover but for which we should abort the
  current transaction.

The code below handles this logic also publishing the appropriate notifications via our notifications pipeline.

@param action [Symbol] action type @param allow_abortable [Boolean] should we allow for the abortable flow. This is set to

false internally to prevent attempts to abort from failed abort operations
# File lib/waterdrop/producer/transactions.rb, line 216
def with_transactional_error_handling(action, allow_abortable: true)
  attempt ||= 0
  attempt += 1

  yield
rescue ::Rdkafka::RdkafkaError => e
  # Decide if there is a chance to retry given error
  do_retry = e.retryable? && attempt < config.max_attempts_on_transaction_command

  @monitor.instrument(
    'error.occurred',
    producer_id: id,
    caller: self,
    error: e,
    type: "transaction.#{action}",
    retry: do_retry,
    attempt: attempt
  )

  if e.fatal?
    # Reload the client on fatal errors if requested
    transactional_reload_client_if_needed(e)

    raise
  end

  if do_retry
    # Backoff more and more before retries
    sleep((config.wait_backoff_on_transaction_command / 1_000.0) * attempt)

    retry
  end

  if e.abortable? && allow_abortable
    # Always attempt to abort but if aborting fails with an abortable error, do not attempt
    # to abort from abort as this could create an infinite loop
    with_transactional_error_handling(:abort, allow_abortable: false) do
      transactional_instrument(:aborted) { client.abort_transaction }
    end
  end

  raise
end