class Kafkat::Command::ResignForce
Public Instance Methods
run()
click to toggle source
# File lib/kafkat/command/resign-rewrite.rb, line 12 def run broker_id = ARGV[0] && ARGV.shift.to_i if broker_id.nil? puts "You must specify a broker ID." exit 1 end opts = Trollop.options do opt :force, "force" end print "This operation rewrites leaderships in ZK to exclude broker '#{broker_id}'.\n" print "WARNING: This is a last resort. Try the 'shutdown' command first!\n\n".red return unless agree("Proceed (y/n)?") brokers = zookeeper.get_brokers topics = zookeeper.get_topics force = opts[:force] ops = {} topics.each do |_, t| t.partitions.each do |p| next if p.leader != broker_id alternates = p.isr.reject { |i| i == broker_id } new_leader_id = alternates.sample if !new_leader_id && !force print "Partition #{t.name}-#{p.id} has no other ISRs!\n" exit 1 end new_leader_id ||= -1 ops[p] = new_leader_id end end print "\n" print "Summary of the new assignments:\n\n" print "Partition\tLeader\n" ops.each do |p, lid| print justify("#{p.topic_name}-#{p.id}") print justify(lid.to_s) print "\n" end begin print "\nStarting.\n" ops.each do |p, lid| retryable(tries: 3, on: Interface::Zookeeper::WriteConflictError) do zookeeper.write_leader(p, lid) end end rescue Interface::Zookeeper::WriteConflictError => e print "Failed to update leaderships in ZK. Try re-running.\n\n" exit 1 end print "Done.\n" end