class Poseidon::MessagesForBroker
Messages that should be sent to a particular broker. @api private
Constants
- ALWAYS_RETRYABLE
We can always retry these errors because they mean none of the kafka brokers persisted the message
Attributes
broker_id[R]
messages[R]
Public Class Methods
new(broker_id)
click to toggle source
# File lib/poseidon/messages_for_broker.rb, line 7 def initialize(broker_id) @broker_id = broker_id @topics = {} @messages = [] end
Public Instance Methods
add(message, partition_id)
click to toggle source
Add a messages for this broker
# File lib/poseidon/messages_for_broker.rb, line 14 def add(message, partition_id) @messages << message @topics[message.topic] ||= {} @topics[message.topic][partition_id] ||= [] @topics[message.topic][partition_id] << message end
build_protocol_objects(compression_config)
click to toggle source
Build protocol objects for this broker!
# File lib/poseidon/messages_for_broker.rb, line 23 def build_protocol_objects(compression_config) @topics.map do |topic, messages_by_partition| codec = compression_config.compression_codec_for_topic(topic) messages_for_partitions = messages_by_partition.map do |partition, messages| message_set = MessageSet.new(messages) if codec Protocol::MessagesForPartition.new(partition, message_set.compress(codec)) else Protocol::MessagesForPartition.new(partition, message_set) end end Protocol::MessagesForTopic.new(topic, messages_for_partitions) end end
successfully_sent(producer_response)
click to toggle source
# File lib/poseidon/messages_for_broker.rb, line 42 def successfully_sent(producer_response) failed = [] producer_response.topic_response.each do |topic_response| topic_response.partitions.each do |partition| if ALWAYS_RETRYABLE.include?(partition.error_class) Poseidon.logger.debug { "Received #{partition.error_class} when attempting to send messages to #{topic_response.topic} on #{partition.partition}" } failed.push(*@topics[topic_response.topic][partition.partition]) end end end return @messages - failed end