class Poseidon::MessageConductor
@api private
Constants
- NO_BROKER
- NO_PARTITION
Public Class Methods
new(cluster_metadata, partitioner)
click to toggle source
Create a new message conductor
@param [Hash<String,TopicMetadata>] topics_metadata
Metadata for all topics this conductor may receive.
@param [Object] partitioner
Custom partitioner
# File lib/poseidon/message_conductor.rb, line 12 def initialize(cluster_metadata, partitioner) @cluster_metadata = cluster_metadata @partitioner = partitioner # Don't always start from partition 0 @partition_counter = rand(65536) end
Public Instance Methods
destination(topic, key = nil)
click to toggle source
Determines which partition a message should be sent to.
@param [String] topic
Topic we are sending this message to
@param [Object] key
Key for this message, may be nil
@return [Integer,Integer]
partition_id and broker_id to which this message should be sent
# File lib/poseidon/message_conductor.rb, line 30 def destination(topic, key = nil) topic_metadata = topic_metadatas[topic] if topic_metadata && topic_metadata.leader_available? partition_id = determine_partition(topic_metadata, key) broker_id = topic_metadata.partition_leader(partition_id) || NO_BROKER else partition_id = NO_PARTITION broker_id = NO_BROKER end return partition_id, broker_id end
Private Instance Methods
determine_partition(topic_metadata, key)
click to toggle source
# File lib/poseidon/message_conductor.rb, line 49 def determine_partition(topic_metadata, key) if key partition_for_keyed_message(topic_metadata, key) else partition_for_keyless_message(topic_metadata) end end
next_partition_counter()
click to toggle source
# File lib/poseidon/message_conductor.rb, line 82 def next_partition_counter @partition_counter += 1 end
partition_for_keyed_message(topic_metadata, key)
click to toggle source
# File lib/poseidon/message_conductor.rb, line 57 def partition_for_keyed_message(topic_metadata, key) partition_count = topic_metadata.partition_count if @partitioner partition_id = @partitioner.call(key, partition_count) if partition_id >= partition_count raise Errors::InvalidPartitionError, "partitioner (#{@partitioner.inspect}) requested #{partition_id} while only #{partition_count} partitions exist" end else partition_id = Zlib::crc32(key) % partition_count end partition_id end
partition_for_keyless_message(topic_metadata)
click to toggle source
# File lib/poseidon/message_conductor.rb, line 72 def partition_for_keyless_message(topic_metadata) partition_count = topic_metadata.available_partition_count if partition_count > 0 topic_metadata.available_partitions[next_partition_counter % partition_count].id else NO_PARTITION end end
topic_metadatas()
click to toggle source
# File lib/poseidon/message_conductor.rb, line 45 def topic_metadatas @cluster_metadata.topic_metadata end