class Kafka::ConsumerGroup::Assignor
A consumer group partition assignor
Constants
- Partition
Public Class Methods
new(cluster:, strategy:)
click to toggle source
@param cluster [Kafka::Cluster] @param strategy [Object] an object that implements protocol_type,
#user_data, and #assign.
# File lib/kafka/consumer_group/assignor.rb, line 15 def initialize(cluster:, strategy:) @cluster = cluster @strategy = strategy end
Public Instance Methods
assign(members:, topics:)
click to toggle source
Assign the topic partitions to the group members.
@param members [Hash<String, Kafka::Protocol::JoinGroupResponse::Metadata>] a hash
mapping member ids to metadata.
@param topics [Array<String>] topics @return [Hash<String, Kafka::Protocol::MemberAssignment>] a hash mapping member
ids to assignments.
# File lib/kafka/consumer_group/assignor.rb, line 35 def assign(members:, topics:) topic_partitions = topics.flat_map do |topic| begin partition_ids = @cluster.partitions_for(topic).map(&:partition_id) rescue UnknownTopicOrPartition raise UnknownTopicOrPartition, "unknown topic #{topic}" end partition_ids.map {|partition_id| Partition.new(topic, partition_id) } end group_assignment = {} members.each_key do |member_id| group_assignment[member_id] = Protocol::MemberAssignment.new end @strategy.call(cluster: @cluster, members: members, partitions: topic_partitions).each do |member_id, partitions| Array(partitions).each do |partition| group_assignment[member_id].assign(partition.topic, [partition.partition_id]) end end group_assignment rescue Kafka::LeaderNotAvailable sleep 1 retry end
protocol_name()
click to toggle source
# File lib/kafka/consumer_group/assignor.rb, line 20 def protocol_name @strategy.respond_to?(:protocol_name) ? @strategy.protocol_name : @strategy.class.to_s end
user_data()
click to toggle source
# File lib/kafka/consumer_group/assignor.rb, line 24 def user_data @strategy.user_data if @strategy.respond_to?(:user_data) end