class Racecar::Consumer
Constants
- Subscription
Attributes
Public Class Methods
Adds one or more topic subscriptions.
Can be called multiple times in order to subscribe to more topics.
@param topics [String] one or more topics to subscribe to. @param start_from_beginning [Boolean] whether to start from the beginning or the end
of each partition.
@param max_bytes_per_partition [Integer] the maximum number of bytes to fetch from
each partition at a time.
@param additional_config [Hash] Configuration properties for consumer.
See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
@return [nil]
# File lib/racecar/consumer.rb, line 30 def subscribes_to( *topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {} ) topics.each do |topic| subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config) end end
# File lib/racecar/consumer.rb, line 14 def subscriptions @subscriptions ||= [] end
Public Instance Methods
# File lib/racecar/consumer.rb, line 42 def configure(producer:, consumer:, instrumenter: NullInstrumenter, config: Racecar.config) @producer = producer @delivery_handles = [] @consumer = consumer @instrumenter = instrumenter @config = config end
Blocks until all messages produced so far have been successfully published. If message delivery finally fails, a Racecar::MessageDeliveryError
is raised. The delivery failed for the reason in the exception. The error can be broker side (e.g. downtime, configuration issue) or specific to the message being sent. The caller must handle the latter cases or run into head of line blocking.
# File lib/racecar/consumer.rb, line 59 def deliver! @delivery_handles ||= [] if @delivery_handles.any? instrumentation_payload = { delivered_message_count: @delivery_handles.size } @instrumenter.instrument('deliver_messages', instrumentation_payload) do @delivery_handles.each do |handle| # rdkafka-ruby checks every wait_timeout seconds if the message was # successfully delivered, up to max_wait_timeout seconds before raising # Rdkafka::AbstractHandle::WaitTimeoutError. librdkafka will (re)try to # deliver all messages in the background, until "config.message_timeout" # (message.timeout.ms) is exceeded. Phrased differently, rdkafka-ruby's # WaitTimeoutError is just informative. # The raising can be avoided if max_wait_timeout below is greater than # config.message_timeout, but config is not available here (without # changing the interface). handle.wait(max_wait_timeout: 60, wait_timeout: 0.1) rescue Rdkafka::AbstractHandle::WaitTimeoutError => e partition = MessageDeliveryError.partition_from_delivery_handle(handle) # ideally we could use the logger passed to the Runner, but it is not # available here. The runner sets it for Rdkafka, though, so we can use # that instead. @config.logger.debug "Still trying to deliver message to (partition #{partition})... (will try up to Racecar.config.message_timeout)" retry rescue Rdkafka::RdkafkaError => e raise MessageDeliveryError.new(e, handle) end end end @delivery_handles.clear end
# File lib/racecar/consumer.rb, line 52 def teardown; end
Protected Instance Methods
# File lib/racecar/consumer.rb, line 120 def heartbeat warn "DEPRECATION WARNING: Manual heartbeats are not supported and not needed with librdkafka." end
github.com/appsignal/rdkafka-ruby#producing-messages
# File lib/racecar/consumer.rb, line 94 def produce(payload, topic:, key: nil, partition_key: nil, headers: nil, create_time: nil) @delivery_handles ||= [] message_size = payload.respond_to?(:bytesize) ? payload.bytesize : 0 instrumentation_payload = { value: payload, headers: headers, key: key, partition_key: partition_key, topic: topic, message_size: message_size, create_time: Time.now, buffer_size: @delivery_handles.size, } @instrumenter.instrument("produce_message", instrumentation_payload) do @delivery_handles << @producer.produce( topic: topic, payload: payload, key: key, partition_key: partition_key, timestamp: create_time, headers: headers, ) end end