class WaterDrop::Producer
Main WaterDrop
messages producer
Constants
- EMPTY_HASH
Empty has to save on memory allocations
- SUPPORTED_FLOW_ERRORS
Which of the inline flow errors do we want to intercept and re-bind
Attributes
@return [Object] dry-configurable config object
@return [String] uuid of the current producer
@return [Array] internal messages buffer
@return [Object] monitor we want to use
@return [Status] producer status object
Public Class Methods
Creates a not-yet-configured instance of the producer @param block [Proc] configuration block @return [Producer] producer instance
# File lib/waterdrop/producer.rb, line 40 def initialize(&block) @operations_in_progress = Helpers::Counter.new @buffer_mutex = Mutex.new @connecting_mutex = Mutex.new @operating_mutex = Mutex.new @transaction_mutex = Mutex.new @status = Status.new @messages = [] return unless block setup(&block) end
Public Instance Methods
@return [Rdkafka::Producer] raw rdkafka producer @note Client is lazy initialized, keeping in mind also the fact of a potential fork that
can happen any time.
@note It is not recommended to fork a producer that is already in use so in case of
bootstrapping a cluster, it's much better to fork configured but not used producers
# File lib/waterdrop/producer.rb, line 77 def client return @client if @client && @pid == Process.pid # Don't allow to obtain a client reference for a producer that was not configured raise Errors::ProducerNotConfiguredError, id if @status.initial? @connecting_mutex.synchronize do return @client if @client && @pid == Process.pid # We undefine all the finalizers, in case it was a fork, so the finalizers from the parent # process don't leak ObjectSpace.undefine_finalizer(id) # We should raise an error when trying to use a producer with client from a fork. Always. if @client # We need to reset the client, otherwise there might be attempt to close the parent # client @client = nil raise Errors::ProducerUsedInParentProcess, Process.pid end # Finalizer tracking is needed for handling shutdowns gracefully. # I don't expect everyone to remember about closing all the producers all the time, thus # this approach is better. Although it is still worth keeping in mind, that this will # block GC from removing a no longer used producer unless closed properly but at least # won't crash the VM upon closing the process ObjectSpace.define_finalizer(id, proc { close }) @pid = Process.pid @client = Builder.new.call(self, @config) @status.connected! @monitor.instrument('producer.connected', producer_id: id) end @client end
Flushes the buffers in a sync way and closes the producer @param force [Boolean] should we force closing even with outstanding messages after the
max wait timeout
# File lib/waterdrop/producer.rb, line 176 def close(force: false) @operating_mutex.synchronize do return unless @status.active? @monitor.instrument( 'producer.closed', producer_id: id ) do @status.closing! @monitor.instrument('producer.closing', producer_id: id) # No need for auto-gc if everything got closed by us # This should be used only in case a producer was not closed properly and forgotten ObjectSpace.undefine_finalizer(id) # We save this thread id because we need to bypass the activity verification on the # producer for final flush of buffers. @closing_thread_id = Thread.current.object_id # Wait until all the outgoing operations are done. Only when no one is using the # underlying client running operations we can close sleep(0.001) until @operations_in_progress.value.zero? # Flush has its own buffer mutex but even if it is blocked, flushing can still happen # as we close the client after the flushing (even if blocked by the mutex) flush(true) # We should not close the client in several threads the same time # It is safe to run it several times but not exactly the same moment # We also mark it as closed only if it was connected, if not, it would trigger a new # connection that anyhow would be immediately closed if @client # Why do we trigger it early instead of just having `#close` do it? # The linger.ms time will be ignored for the duration of the call, # queued messages will be sent to the broker as soon as possible. begin @client.flush(current_variant.max_wait_timeout) unless @client.closed? # We can safely ignore timeouts here because any left outstanding requests # will anyhow force wait on close if not forced. # If forced, we will purge the queue and just close rescue ::Rdkafka::RdkafkaError, Rdkafka::AbstractHandle::WaitTimeoutError nil ensure # Purge fully the local queue in case of a forceful shutdown just to be sure, that # there are no dangling messages. In case flush was successful, there should be # none but we do it just in case it timed out purge if force end @client.close @client = nil end # Remove callbacks runners that were registered ::Karafka::Core::Instrumentation.statistics_callbacks.delete(@id) ::Karafka::Core::Instrumentation.error_callbacks.delete(@id) ::Karafka::Core::Instrumentation.oauthbearer_token_refresh_callbacks.delete(@id) @status.closed! end end end
Closes the producer with forced close after timeout, purging any outgoing data
# File lib/waterdrop/producer.rb, line 241 def close! close(force: true) end
@return [Boolean] true if current producer is idempotent
# File lib/waterdrop/producer.rb, line 159 def idempotent? # Every transactional producer is idempotent by default always return true if transactional? return @idempotent if instance_variable_defined?(:'@idempotent') @idempotent = config.kafka.to_h.key?(:'enable.idempotence') end
Returns and caches the middleware object that may be used @return [WaterDrop::Producer::Middleware]
# File lib/waterdrop/producer.rb, line 169 def middleware @middleware ||= config.middleware end
Fetches and caches the partition count of a topic
@param topic [String] topic for which we want to get the number of partitions @return [Integer] number of partitions of the requested topic or -1 if number could not be
retrieved.
@note It uses the underlying ‘rdkafka-ruby` partition count fetch and cache.
# File lib/waterdrop/producer.rb, line 122 def partition_count(topic) client.partition_count(topic.to_s) end
Purges data from both the buffer queue as well as the librdkafka queue.
@note This is an operation that can cause data loss. Keep that in mind. It will not only
purge the internal WaterDrop buffer but will also purge the librdkafka queue as well as will cancel any outgoing messages dispatches.
# File lib/waterdrop/producer.rb, line 131 def purge @monitor.instrument('buffer.purged', producer_id: id) do @buffer_mutex.synchronize do @messages = [] end # We should not purge if there is no client initialized # It may not be initialized if we created a new producer that never connected to kafka, # we used buffer and purged. In cases like this client won't exist @connecting_mutex.synchronize do @client&.purge end end end
Sets up the whole configuration and initializes all that is needed @param block [Block] configuration block
# File lib/waterdrop/producer.rb, line 57 def setup(&block) raise Errors::ProducerAlreadyConfiguredError, id unless @status.initial? @config = Config .new .setup(&block) .config @id = @config.id @monitor = @config.monitor @contract = Contracts::Message.new(max_payload_size: @config.max_payload_size) @default_variant = Variant.new(self, default: true) @status.configured! end
Builds the variant alteration and returns it.
@param args [Object] anything ‘Producer::Variant` initializer accepts @return [WaterDrop::Producer::Variant] variant proxy to use with alterations
# File lib/waterdrop/producer.rb, line 150 def with(**args) ensure_active! Variant.new(self, **args) end
Private Instance Methods
@return [Producer::Context] the variant config. Either custom if built using ‘#with` or
a default one.
# File lib/waterdrop/producer.rb, line 281 def current_variant Thread.current[id] || @default_variant end
Ensures that we don’t run any operations when the producer is not configured or when it was already closed
# File lib/waterdrop/producer.rb, line 249 def ensure_active! return if @status.active? return if @status.closing? && @operating_mutex.owned? raise Errors::ProducerNotConfiguredError, id if @status.initial? raise Errors::ProducerClosedError, id if @status.closing? raise Errors::ProducerClosedError, id if @status.closed? # This should never happen raise Errors::StatusInvalidError, [id, @status.to_s] end
Runs the client produce method with a given message
@param message [Hash] message we want to send
# File lib/waterdrop/producer.rb, line 288 def produce(message) produce_time ||= monotonic_now # This can happen only during flushing on closing, in case like this we don't have to # synchronize because we already own the lock if @operating_mutex.owned? @operations_in_progress.increment else @operating_mutex.synchronize { @operations_in_progress.increment } ensure_active! end # We basically only duplicate the message hash only if it is needed. # It is needed when user is using a custom settings variant or when symbol is provided as # the topic name. We should never mutate user input message as it may be a hash that the # user is using for some other operations if message[:topic].is_a?(Symbol) || !current_variant.default? message = message.dup # In case someone defines topic as a symbol, we need to convert it into a string as # librdkafka does not accept symbols message[:topic] = message[:topic].to_s message[:topic_config] = current_variant.topic_config end if transactional? transaction { client.produce(**message) } else client.produce(**message) end rescue SUPPORTED_FLOW_ERRORS.first => e # Unless we want to wait and retry and it's a full queue, we raise normally raise unless @config.wait_on_queue_full raise unless e.code == :queue_full # If we're running for longer than the timeout, we need to re-raise the queue full. # This will prevent from situation where cluster is down forever and we just retry and retry # in an infinite loop, effectively hanging the processing raise unless monotonic_now - produce_time < @config.wait_timeout_on_queue_full label = caller_locations(2, 1)[0].label.split(' ').last.split('#').last # We use this syntax here because we want to preserve the original `#cause` when we # instrument the error and there is no way to manually assign `#cause` value. We want to keep # the original cause to maintain the same API across all the errors dispatched to the # notifications pipeline. begin raise Errors::ProduceError, e.inspect rescue Errors::ProduceError => e # Users can configure this because in pipe-like flows with high throughput, queue full with # retry may be used as a throttling system that will backoff and wait. # In such scenarios this error notification can be removed and until queue full is # retryable, it will not be raised as an error. if @config.instrument_on_wait_queue_full # We want to instrument on this event even when we restart it. # The reason is simple: instrumentation and visibility. # We can recover from this, but despite that we should be able to instrument this. # If this type of event happens too often, it may indicate that the buffer settings are # not well configured. @monitor.instrument( 'error.occurred', producer_id: id, message: message, error: e, type: "message.#{label}" ) end # We do not poll the producer because polling happens in a background thread # It also should not be a frequent case (queue full), hence it's ok to just throttle. sleep @config.wait_backoff_on_queue_full / 1_000.0 end @operations_in_progress.decrement retry ensure @operations_in_progress.decrement end
Ensures that the message we want to send out to Kafka is actually valid and that it can be sent there @param message [Hash] message we want to send @raise [Karafka::Errors::MessageInvalidError]
# File lib/waterdrop/producer.rb, line 265 def validate_message!(message) @contract.validate!(message, Errors::MessageInvalidError) end
Waits on a given handler
@param handler [Rdkafka::Producer::DeliveryHandle]
# File lib/waterdrop/producer.rb, line 272 def wait(handler) handler.wait( # rdkafka max_wait_timeout is in seconds and we use ms max_wait_timeout: current_variant.max_wait_timeout / 1_000.0 ) end