class Kafkat::Command::VerifyReplicas

Public Instance Methods

print_mismatched_partitions(partition_replica_size, partition_replica_size_stat, print_details, print_summary) click to toggle source
run() click to toggle source
# File lib/kafkat/command/verify-replicas.rb, line 9
def run
  opts = Trollop.options do
    opt :topics, "topic names", type: :string
    opt :broker, "broker ID", type: :string
    opt :print_details, "show replica size of mismatched partitions", :default => false
    opt :print_summary, "show summary of mismatched partitions", :default => false
  end

  topic_names = opts[:topics]
  print_details = opts[:print_details]
  print_summary = opts[:print_summary]

  if topic_names
    topics_list = topic_names.split(',')
    topics = zookeeper.get_topics(topics_list)
  end
  topics ||= zookeeper.get_topics
  broker = opts[:broker] && opts[:broker].to_i

  partition_replica_size, partition_replica_size_stat = verify_replicas(broker, topics)

  print_summary = !print_details || print_summary
  print_mismatched_partitions(partition_replica_size, partition_replica_size_stat, print_details, print_summary)
end
verify_replicas(broker, topics) click to toggle source
# File lib/kafkat/command/verify-replicas.rb, line 34
def verify_replicas(broker, topics)
  partition_replica_size = {}
  partition_replica_size_stat = {}

  topics.each do |_, t|
    partition_replica_size[t.name] = {}
    partition_replica_size_stat[t.name] = {}

    t.partitions.each do |p|
      replica_size = p.replicas.length

      next if broker && !p.replicas.include?(broker)

      partition_replica_size_stat[t.name][replica_size] ||= 0
      partition_replica_size_stat[t.name][replica_size] += 1

      partition_replica_size[t.name][p.id] = replica_size
    end

  end

  return partition_replica_size, partition_replica_size_stat
end