class Kafka::EC2::MixedInstanceAssignmentStrategy

Attributes

member_id_to_metadata[RW]

metadata is a byte sequence created by Kafka::Protocol::ConsumerGroupProtocol.encode

Public Class Methods

new(cluster:, instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {}) click to toggle source

@param cluster [Kafka::Cluster] @param instance_family_weights [Hash{String => Numeric}, Proc] a hash whose the key

is the instance family and whose value is the weight. If the object is a proc,
it must returns such a hash and the proc is called every time the method "assign"
is called.

@param availability_zone_weights [Hash{String => Numeric}, Proc] a hash whose the key

is the availability zone and whose value is the weight. If the object is a proc,
it must returns such a hash and the proc is called every time the method "assign"
is called.

@param weights [Hash{String => Hash{String => Numeric}}, Proc] a hash whose the key

is the availability zone or the instance family and whose value is the hash like
instance_family_weights or availability_zone_weights. If the object is a proc,
it must returns such a hash and the proc is called every time the method "assign"
is called.
# File lib/kafka/ec2/mixed_instance_assignment_strategy.rb, line 26
def initialize(cluster:, instance_family_weights: {}, availability_zone_weights: {}, weights: {}, partition_weights: {})
  @cluster = cluster
  @instance_family_weights = instance_family_weights
  @availability_zone_weights = availability_zone_weights
  @weights = weights
  @partition_weights = partition_weights
end

Public Instance Methods

assign(members:, topics:) click to toggle source

Assign the topic partitions to the group members.

@param members [Array<String>] member ids @param topics [Array<String>] topics @return [Hash{String => Protocol::MemberAssignment}] a hash mapping member

ids to assignments.
# File lib/kafka/ec2/mixed_instance_assignment_strategy.rb, line 40
def assign(members:, topics:)
  group_assignment = {}
  instance_id_to_capacity = Hash.new(0)
  instance_id_to_member_ids = Hash.new { |h, k| h[k] = [] }
  total_capacity = 0
  member_id_to_instance_id = {}

  instance_family_to_capacity = @instance_family_weights.is_a?(Proc) ? @instance_family_weights.call() : @instance_family_weights
  az_to_capacity = @availability_zone_weights.is_a?(Proc) ? @availability_zone_weights.call() : @availability_zone_weights
  weights = @weights.is_a?(Proc) ? @weights.call() : @weights
  members.each do |member_id|
    group_assignment[member_id] = Protocol::MemberAssignment.new

    instance_id, instance_type, az = member_id_to_metadata[member_id].split(",")
    instance_id_to_member_ids[instance_id] << member_id
    member_id_to_instance_id[member_id] = instance_id
    capacity = calculate_capacity(instance_type, az, instance_family_to_capacity, az_to_capacity, weights)
    instance_id_to_capacity[instance_id] += capacity
    total_capacity += capacity
  end

  topic_partitions = topics.flat_map do |topic|
    begin
      partitions = @cluster.partitions_for(topic).map(&:partition_id)
    rescue UnknownTopicOrPartition
      raise UnknownTopicOrPartition, "unknown topic #{topic}"
    end
    Array.new(partitions.count) { topic }.zip(partitions)
  end

  partition_weights = build_partition_weights(topics)
  partition_weight_per_capacity = topic_partitions.sum { |topic, partition| partition_weights.dig(topic, partition) } / total_capacity

  last_index = 0
  member_id_to_acceptable_partition_weight = {}
  instance_id_to_total_acceptable_partition_weight = Hash.new(0)
  instance_id_to_capacity.each do |instance_id, capacity|
    member_ids = instance_id_to_member_ids[instance_id]
    member_ids.each do |member_id|
      acceptable_partition_weight = capacity * partition_weight_per_capacity / member_ids.size
      while last_index < topic_partitions.size
        topic, partition = topic_partitions[last_index]
        partition_weight = partition_weights.dig(topic, partition)
        break if acceptable_partition_weight - partition_weight < 0

        group_assignment[member_id].assign(topic, [partition])
        acceptable_partition_weight -= partition_weight

        last_index += 1
      end

      member_id_to_acceptable_partition_weight[member_id] = acceptable_partition_weight
      instance_id_to_total_acceptable_partition_weight[instance_id] += acceptable_partition_weight
    end
  end

  while last_index < topic_partitions.size
    max_acceptable_partition_weight = member_id_to_acceptable_partition_weight.values.max
    member_ids = member_id_to_acceptable_partition_weight.select { |_, w| w == max_acceptable_partition_weight }.keys
    if member_ids.size == 1
      member_id = member_ids.first
    else
      member_id =  member_ids.max_by { |id| instance_id_to_total_acceptable_partition_weight[member_id_to_instance_id[id]] }
    end
    topic, partition = topic_partitions[last_index]
    group_assignment[member_id].assign(topic, [partition])

    partition_weight = partition_weights.dig(topic, partition)
    member_id_to_acceptable_partition_weight[member_id] -= partition_weight
    instance_id_to_total_acceptable_partition_weight[member_id_to_instance_id[member_id]] -= partition_weight

    last_index += 1
  end

  group_assignment
rescue Kafka::LeaderNotAvailable
  sleep 1
  retry
end

Private Instance Methods

build_partition_weights(topics) click to toggle source
# File lib/kafka/ec2/mixed_instance_assignment_strategy.rb, line 129
def build_partition_weights(topics)
  # Duplicate the weights to not destruct @partition_weights or the return value of @partition_weights
  weights = (@partition_weights.is_a?(Proc) ? @partition_weights.call() : @partition_weights).dup
  topics.each do |t|
    weights[t] = weights[t].dup || {}
    weights[t].default = 1
  end

  weights
end
calculate_capacity(instance_type, az, instance_family_to_capacity, az_to_capacity, weights) click to toggle source
# File lib/kafka/ec2/mixed_instance_assignment_strategy.rb, line 122
def calculate_capacity(instance_type, az, instance_family_to_capacity, az_to_capacity, weights)
  instance_family, _ = instance_type.split(".")

  capacity = weights.dig(az, instance_family) || weights.dig(instance_family, az)
  (capacity || instance_family_to_capacity.fetch(instance_family, 1) * az_to_capacity.fetch(az, 1)).to_f
end