class Poseidon::MessageConductor

@api private

Constants

NO_BROKER
NO_PARTITION

Public Class Methods

new(cluster_metadata, partitioner) click to toggle source

Create a new message conductor

@param [Hash<String,TopicMetadata>] topics_metadata

Metadata for all topics this conductor may receive.

@param [Object] partitioner

Custom partitioner
# File lib/poseidon/message_conductor.rb, line 12
def initialize(cluster_metadata, partitioner)
  @cluster_metadata   = cluster_metadata
  @partitioner        = partitioner

  # Don't always start from partition 0
  @partition_counter  = rand(65536)
end

Public Instance Methods

destination(topic, key = nil) click to toggle source

Determines which partition a message should be sent to.

@param [String] topic

Topic we are sending this message to

@param [Object] key

Key for this message, may be nil

@return [Integer,Integer]

partition_id and broker_id to which this message should be sent
# File lib/poseidon/message_conductor.rb, line 30
def destination(topic, key = nil)
  topic_metadata = topic_metadatas[topic]
  if topic_metadata && topic_metadata.leader_available?
    partition_id = determine_partition(topic_metadata, key)
    broker_id    = topic_metadata.partition_leader(partition_id) || NO_BROKER
  else
    partition_id  = NO_PARTITION
    broker_id     = NO_BROKER
  end

  return partition_id, broker_id
end

Private Instance Methods

determine_partition(topic_metadata, key) click to toggle source
# File lib/poseidon/message_conductor.rb, line 49
def determine_partition(topic_metadata, key)
  if key
    partition_for_keyed_message(topic_metadata, key)
  else
    partition_for_keyless_message(topic_metadata)
  end
end
next_partition_counter() click to toggle source
# File lib/poseidon/message_conductor.rb, line 82
def next_partition_counter
  @partition_counter += 1
end
partition_for_keyed_message(topic_metadata, key) click to toggle source
# File lib/poseidon/message_conductor.rb, line 57
def partition_for_keyed_message(topic_metadata, key)
  partition_count  = topic_metadata.partition_count
  if @partitioner
    partition_id     = @partitioner.call(key, partition_count)

    if partition_id >= partition_count
      raise Errors::InvalidPartitionError, "partitioner (#{@partitioner.inspect}) requested #{partition_id} while only #{partition_count} partitions exist"
    end
  else
    partition_id = Zlib::crc32(key) % partition_count
  end

  partition_id
end
partition_for_keyless_message(topic_metadata) click to toggle source
# File lib/poseidon/message_conductor.rb, line 72
def partition_for_keyless_message(topic_metadata)
  partition_count = topic_metadata.available_partition_count

  if partition_count > 0
    topic_metadata.available_partitions[next_partition_counter % partition_count].id
  else
    NO_PARTITION
  end
end
topic_metadatas() click to toggle source
# File lib/poseidon/message_conductor.rb, line 45
def topic_metadatas
  @cluster_metadata.topic_metadata
end