class Kafkat::Command::Drain

Public Instance Methods

build_partitions_by_broker(topic, destination_brokers) click to toggle source

Build a hash map from broker id to number of partitions on it to facilitate finding the broker with lowest number of partitions to help balance brokers.

# File lib/kafkat/command/drain.rb, line 97
def build_partitions_by_broker(topic, destination_brokers)
  partitions_by_broker = Hash.new(0)
  destination_brokers.each { |id| partitions_by_broker[id] = 0 }
  topic.partitions.each do |p|
    p.replicas.each do |r|
      partitions_by_broker[r] += 1
    end
  end
  partitions_by_broker
end
generate_assignments(source_broker, topics, destination_brokers) click to toggle source
# File lib/kafkat/command/drain.rb, line 55
def generate_assignments(source_broker, topics, destination_brokers)

  assignments = []
  topics.each do |_, t|
    partitions_by_broker = build_partitions_by_broker(t, destination_brokers)

    t.partitions.each do |p|
      if p.replicas.include? source_broker
        replicas_size = p.replicas.length
        replicas = p.replicas - [source_broker]
        source_broker_is_leader = p.replicas.first == source_broker
        potential_broker_ids = destination_brokers - replicas
        if potential_broker_ids.empty?
          print "ERROR: Not enough destination brokers to reassign topic \"#{t.name}\".\n"
          exit 1
        end

        num_partitions_on_potential_broker =
          partitions_by_broker.select { |id, _| potential_broker_ids.include? id }
        assigned_broker_id = num_partitions_on_potential_broker.min_by{ |id, num| num }[0]
        if source_broker_is_leader
          replicas.unshift(assigned_broker_id)
        else
          replicas << assigned_broker_id
        end
        partitions_by_broker[assigned_broker_id] += 1

        if replicas.length != replicas_size
          STDERR.print "ERROR: Number of replicas changes after reassignment topic: #{t.name}, partition: #{p.id} \n"
          exit 1
        end

        assignments << Assignment.new(t.name, p.id, replicas)
      end
    end
  end

  assignments
end
run() click to toggle source

For each partition (of specified topic) on the source broker, the command is to assign the partition to one of the destination brokers that does not already have this partition, along with existing brokers to achieve minimal movement of data. To help distribute data evenly, if there are more than one destination brokers meet the requirement, the command will always choose the brokers with the lowest number of partitions of the involving topic.

In order to find out the broker with lowest number of partitions, the command maintain a hash table with broker id as key and number of partitions as value. The hash table will be updated along with assignment.

# File lib/kafkat/command/drain.rb, line 20
def run
  source_broker = ARGV[0] && ARGV.shift.to_i
  if source_broker.nil?
    puts "You must specify a broker ID."
    exit 1
  end

  opts = Trollop.options do
    opt :brokers, "destination broker IDs", type: :string
    opt :topic,   "topic name to reassign", type: :string
  end

  topic_name = opts[:topic]
  topics = topic_name && zookeeper.get_topics([topic_name])
  topics ||= zookeeper.get_topics

  destination_brokers = opts[:brokers] && opts[:brokers].split(',').map(&:to_i)
  destination_brokers ||= zookeeper.get_brokers.values.map(&:id)
  destination_brokers.delete(source_broker)

  active_brokers = zookeeper.get_brokers.values.map(&:id)

  unless (inactive_brokers = destination_brokers - active_brokers).empty?
    print "ERROR: Broker #{inactive_brokers} are not currently active.\n"
    exit 1
  end

  assignments =
    generate_assignments(source_broker, topics, destination_brokers)

  print "Num of topics got from zookeeper: #{topics.length}\n"
  print "Num of partitions in the assignment: #{assignments.size}\n"
  prompt_and_execute_assignments(assignments)
end