class WaterDrop::Producer

Main WaterDrop messages producer

Constants

EMPTY_HASH

Empty has to save on memory allocations

SUPPORTED_FLOW_ERRORS

Which of the inline flow errors do we want to intercept and re-bind

Attributes

config[R]

@return [Object] dry-configurable config object

id[R]

@return [String] uuid of the current producer

messages[R]

@return [Array] internal messages buffer

monitor[R]

@return [Object] monitor we want to use

status[R]

@return [Status] producer status object

Public Class Methods

new(&block) click to toggle source

Creates a not-yet-configured instance of the producer @param block [Proc] configuration block @return [Producer] producer instance

# File lib/waterdrop/producer.rb, line 40
def initialize(&block)
  @operations_in_progress = Helpers::Counter.new
  @buffer_mutex = Mutex.new
  @connecting_mutex = Mutex.new
  @operating_mutex = Mutex.new
  @transaction_mutex = Mutex.new

  @status = Status.new
  @messages = []

  return unless block

  setup(&block)
end

Public Instance Methods

client() click to toggle source

@return [Rdkafka::Producer] raw rdkafka producer @note Client is lazy initialized, keeping in mind also the fact of a potential fork that

can happen any time.

@note It is not recommended to fork a producer that is already in use so in case of

bootstrapping a cluster, it's much better to fork configured but not used producers
# File lib/waterdrop/producer.rb, line 77
def client
  return @client if @client && @pid == Process.pid

  # Don't allow to obtain a client reference for a producer that was not configured
  raise Errors::ProducerNotConfiguredError, id if @status.initial?

  @connecting_mutex.synchronize do
    return @client if @client && @pid == Process.pid

    # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent
    # process don't leak
    ObjectSpace.undefine_finalizer(id)

    # We should raise an error when trying to use a producer with client from a fork. Always.
    if @client
      # We need to reset the client, otherwise there might be attempt to close the parent
      # client
      @client = nil
      raise Errors::ProducerUsedInParentProcess, Process.pid
    end

    # Finalizer tracking is needed for handling shutdowns gracefully.
    # I don't expect everyone to remember about closing all the producers all the time, thus
    # this approach is better. Although it is still worth keeping in mind, that this will
    # block GC from removing a no longer used producer unless closed properly but at least
    # won't crash the VM upon closing the process
    ObjectSpace.define_finalizer(id, proc { close })

    @pid = Process.pid
    @client = Builder.new.call(self, @config)

    @status.connected!
    @monitor.instrument('producer.connected', producer_id: id)
  end

  @client
end
close(force: false) click to toggle source

Flushes the buffers in a sync way and closes the producer @param force [Boolean] should we force closing even with outstanding messages after the

max wait timeout
# File lib/waterdrop/producer.rb, line 176
def close(force: false)
  @operating_mutex.synchronize do
    return unless @status.active?

    @monitor.instrument(
      'producer.closed',
      producer_id: id
    ) do
      @status.closing!
      @monitor.instrument('producer.closing', producer_id: id)

      # No need for auto-gc if everything got closed by us
      # This should be used only in case a producer was not closed properly and forgotten
      ObjectSpace.undefine_finalizer(id)

      # We save this thread id because we need to bypass the activity verification on the
      # producer for final flush of buffers.
      @closing_thread_id = Thread.current.object_id

      # Wait until all the outgoing operations are done. Only when no one is using the
      # underlying client running operations we can close
      sleep(0.001) until @operations_in_progress.value.zero?

      # Flush has its own buffer mutex but even if it is blocked, flushing can still happen
      # as we close the client after the flushing (even if blocked by the mutex)
      flush(true)

      # We should not close the client in several threads the same time
      # It is safe to run it several times but not exactly the same moment
      # We also mark it as closed only if it was connected, if not, it would trigger a new
      # connection that anyhow would be immediately closed
      if @client
        # Why do we trigger it early instead of just having `#close` do it?
        # The linger.ms time will be ignored for the duration of the call,
        # queued messages will be sent to the broker as soon as possible.
        begin
          @client.flush(current_variant.max_wait_timeout) unless @client.closed?
        # We can safely ignore timeouts here because any left outstanding requests
        # will anyhow force wait on close if not forced.
        # If forced, we will purge the queue and just close
        rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError
          nil
        ensure
          # Purge fully the local queue in case of a forceful shutdown just to be sure, that
          # there are no dangling messages. In case flush was successful, there should be
          # none but we do it just in case it timed out
          purge if force
        end

        @client.close

        @client = nil
      end

      # Remove callbacks runners that were registered
      ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id)
      ::Karafka::Core::Instrumentation.error_callbacks.delete(@id)
      ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id)

      @status.closed!
    end
  end
end
close!() click to toggle source

Closes the producer with forced close after timeout, purging any outgoing data

# File lib/waterdrop/producer.rb, line 241
def close!
  close(force: true)
end
idempotent?() click to toggle source

@return [Boolean] true if current producer is idempotent

# File lib/waterdrop/producer.rb, line 159
def idempotent?
  # Every transactional producer is idempotent by default always
  return true if transactional?
  return @idempotent if instance_variable_defined?(:'@idempotent')

  @idempotent = config.kafka.to_h.key?(:'enable.idempotence')
end
middleware() click to toggle source

Returns and caches the middleware object that may be used @return [WaterDrop::Producer::Middleware]

# File lib/waterdrop/producer.rb, line 169
def middleware
  @middleware ||= config.middleware
end
partition_count(topic) click to toggle source

Fetches and caches the partition count of a topic

@param topic [String] topic for which we want to get the number of partitions @return [Integer] number of partitions of the requested topic or -1 if number could not be

retrieved.

@note It uses the underlying ‘rdkafka-ruby` partition count fetch and cache.

# File lib/waterdrop/producer.rb, line 122
def partition_count(topic)
  client.partition_count(topic.to_s)
end
purge() click to toggle source

Purges data from both the buffer queue as well as the librdkafka queue.

@note This is an operation that can cause data loss. Keep that in mind. It will not only

purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as
will cancel any outgoing messages dispatches.
# File lib/waterdrop/producer.rb, line 131
def purge
  @monitor.instrument('buffer.purged', producer_id: id) do
    @buffer_mutex.synchronize do
      @messages = []
    end

    # We should not purge if there is no client initialized
    # It may not be initialized if we created a new producer that never connected to kafka,
    # we used buffer and purged. In cases like this client won't exist
    @connecting_mutex.synchronize do
      @client&.purge
    end
  end
end
setup(&block) click to toggle source

Sets up the whole configuration and initializes all that is needed @param block [Block] configuration block

# File lib/waterdrop/producer.rb, line 57
def setup(&block)
  raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial?

  @config = Config
            .new
            .setup(&block)
            .config

  @id = @config.id
  @monitor = @config.monitor
  @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size)
  @default_variant = Variant.new(self, default: true)
  @status.configured!
end
variant(**args)
Alias for: with
with(**args) click to toggle source

Builds the variant alteration and returns it.

@param args [Object] anything ‘Producer::Variant` initializer accepts @return [WaterDrop::Producer::Variant] variant proxy to use with alterations

# File lib/waterdrop/producer.rb, line 150
def with(**args)
  ensure_active!

  Variant.new(self, **args)
end
Also aliased as: variant

Private Instance Methods

current_variant() click to toggle source

@return [Producer::Context] the variant config. Either custom if built using ‘#with` or

a default one.
# File lib/waterdrop/producer.rb, line 281
def current_variant
  Thread.current[id] || @default_variant
end
ensure_active!() click to toggle source

Ensures that we don’t run any operations when the producer is not configured or when it was already closed

# File lib/waterdrop/producer.rb, line 249
def ensure_active!
  return if @status.active?
  return if @status.closing? && @operating_mutex.owned?

  raise Errors::ProducerNotConfiguredError, id if @status.initial?
  raise Errors::ProducerClosedError, id if @status.closing?
  raise Errors::ProducerClosedError, id if @status.closed?

  # This should never happen
  raise Errors::StatusInvalidError, [id, @status.to_s]
end
produce(message) click to toggle source

Runs the client produce method with a given message

@param message [Hash] message we want to send

# File lib/waterdrop/producer.rb, line 288
def produce(message)
  produce_time ||= monotonic_now

  # This can happen only during flushing on closing, in case like this we don't have to
  # synchronize because we already own the lock
  if @operating_mutex.owned?
    @operations_in_progress.increment
  else
    @operating_mutex.synchronize { @operations_in_progress.increment }
    ensure_active!
  end

  # We basically only duplicate the message hash only if it is needed.
  # It is needed when user is using a custom settings variant or when symbol is provided as
  # the topic name. We should never mutate user input message as it may be a hash that the
  # user is using for some other operations
  if message[:topic].is_a?(Symbol) || !current_variant.default?
    message = message.dup
    # In case someone defines topic as a symbol, we need to convert it into a string as
    # librdkafka does not accept symbols
    message[:topic] = message[:topic].to_s
    message[:topic_config] = current_variant.topic_config
  end

  if transactional?
    transaction { client.produce(**message) }
  else
    client.produce(**message)
  end
rescue SUPPORTED_FLOW_ERRORS.first => e
  # Unless we want to wait and retry and it's a full queue, we raise normally
  raise unless @config.wait_on_queue_full
  raise unless e.code == :queue_full
  # If we're running for longer than the timeout, we need to re-raise the queue full.
  # This will prevent from situation where cluster is down forever and we just retry and retry
  # in an infinite loop, effectively hanging the processing
  raise unless monotonic_now - produce_time < @config.wait_timeout_on_queue_full

  label = caller_locations(2, 1)[0].label.split(' ').last.split('#').last

  # We use this syntax here because we want to preserve the original `#cause` when we
  # instrument the error and there is no way to manually assign `#cause` value. We want to keep
  # the original cause to maintain the same API across all the errors dispatched to the
  # notifications pipeline.
  begin
    raise Errors::ProduceError, e.inspect
  rescue Errors::ProduceError => e
    # Users can configure this because in pipe-like flows with high throughput, queue full with
    # retry may be used as a throttling system that will backoff and wait.
    # In such scenarios this error notification can be removed and until queue full is
    # retryable, it will not be raised as an error.
    if @config.instrument_on_wait_queue_full
      # We want to instrument on this event even when we restart it.
      # The reason is simple: instrumentation and visibility.
      # We can recover from this, but despite that we should be able to instrument this.
      # If this type of event happens too often, it may indicate that the buffer settings are
      # not well configured.
      @monitor.instrument(
        'error.occurred',
        producer_id: id,
        message: message,
        error: e,
        type: "message.#{label}"
      )
    end

    # We do not poll the producer because polling happens in a background thread
    # It also should not be a frequent case (queue full), hence it's ok to just throttle.
    sleep @config.wait_backoff_on_queue_full / 1_000.0
  end

  @operations_in_progress.decrement
  retry
ensure
  @operations_in_progress.decrement
end
validate_message!(message) click to toggle source

Ensures that the message we want to send out to Kafka is actually valid and that it can be sent there @param message [Hash] message we want to send @raise [Karafka::Errors::MessageInvalidError]

# File lib/waterdrop/producer.rb, line 265
def validate_message!(message)
  @contract.validate!(message, Errors::MessageInvalidError)
end
wait(handler) click to toggle source

Waits on a given handler

@param handler [Rdkafka::Producer::DeliveryHandle]

# File lib/waterdrop/producer.rb, line 272
def wait(handler)
  handler.wait(
    # rdkafka max_wait_timeout is in seconds and we use ms
    max_wait_timeout: current_variant.max_wait_timeout / 1_000.0
  )
end