class WaterDrop::Clients::Buffered

Client used to buffer messages that we send out in specs and other places.

Attributes

messages[RW]

Public Class Methods

new(*args) click to toggle source

@param args [Object] anything accepted by ‘Clients::Dummy`

Calls superclass method WaterDrop::Clients::Dummy::new
# File lib/waterdrop/clients/buffered.rb, line 10
def initialize(*args)
  super
  @messages = []
  @topics = Hash.new { |k, v| k[v] = [] }

  @transaction_active = false
  @transaction_messages = []
  @transaction_topics = Hash.new { |k, v| k[v] = [] }
  @transaction_level = 0
end

Public Instance Methods

abort_transaction() click to toggle source

Aborts the transaction

# File lib/waterdrop/clients/buffered.rb, line 70
def abort_transaction
  @transaction_level -= 1
  @transaction_topics.clear
  @transaction_messages.clear
  @transaction_active = false
end
begin_transaction() click to toggle source

Starts the transaction on a given level

# File lib/waterdrop/clients/buffered.rb, line 38
def begin_transaction
  @transaction_level += 1
  @transaction_active = true
end
commit_transaction() click to toggle source

Finishes given level of transaction

# File lib/waterdrop/clients/buffered.rb, line 44
def commit_transaction
  @transaction_level -= 1

  # Transfer transactional data on success
  @transaction_topics.each do |topic, messages|
    @topics[topic] += messages
  end

  @messages += @transaction_messages

  @transaction_topics.clear
  @transaction_messages.clear
  @transaction_active = false
end
messages_for(topic) click to toggle source

Returns messages produced to a given topic @param topic [String]

# File lib/waterdrop/clients/buffered.rb, line 79
def messages_for(topic)
  @topics[topic]
end
produce(message) click to toggle source

“Produces” message to Kafka: it acknowledges it locally, adds it to the internal buffer @param message [Hash] ‘WaterDrop::Producer#produce_sync` message hash @return [Dummy::Handle] fake delivery handle that can be materialized into a report

Calls superclass method WaterDrop::Clients::Dummy#produce
# File lib/waterdrop/clients/buffered.rb, line 24
def produce(message)
  if @transaction_active
    @transaction_topics[message.fetch(:topic)] << message
    @transaction_messages << message
  else
    # We pre-validate the message payload, so topic is ensured to be present
    @topics[message.fetch(:topic)] << message
    @messages << message
  end

  super(**message.to_h)
end
reset() click to toggle source

Clears internal buffer Used in between specs so messages do not leak out

# File lib/waterdrop/clients/buffered.rb, line 85
def reset
  @transaction_level = 0
  @transaction_active = false
  @transaction_topics.clear
  @transaction_messages.clear
  @messages.clear
  @topics.each_value(&:clear)
end
send_offsets_to_transaction(_consumer, _tpl, _timeout) click to toggle source

Fakes storing the offset in a transactional fashion

@param _consumer [#consumer_group_metadata_pointer] any consumer from which we can obtain

the librdkafka consumer group metadata pointer

@param _tpl [Rdkafka::Consumer::TopicPartitionList] consumer tpl for offset storage @param _timeout [Integer] ms timeout

# File lib/waterdrop/clients/buffered.rb, line 65
def send_offsets_to_transaction(_consumer, _tpl, _timeout)
  nil
end