class Kafkat::Interface::Zookeeper
Attributes
zk_path[R]
Public Class Methods
new(config)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 12 def initialize(config) @zk_path = config.zk_path end
Public Instance Methods
get_broker(id)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 38 def get_broker(id) path = broker_path(id) string = zk.get(path).first json = JSON.parse(string) host, port = json['host'], json['port'] Broker.new(id, host, port) rescue ZK::Exceptions::NoNode raise NotFoundError end
get_broker_ids()
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 16 def get_broker_ids zk.children(brokers_path) end
get_brokers(ids=nil)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 20 def get_brokers(ids=nil) brokers = {} ids ||= zk.children(brokers_path) threads = ids.map do |id| id = id.to_i Thread.new do begin brokers[id] = get_broker(id) rescue end end end threads.map(&:join) brokers end
get_controller()
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 117 def get_controller string = zk.get(controller_path).first controller_json = JSON.parse(string) controller_id = controller_json['brokerid'] get_broker(controller_id) rescue ZK::Exceptions::NoNode raise NotFoundError end
get_topic(name)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 80 def get_topic(name) partition_queue = Queue.new path1 = topic_path(name) path2 = topic_partitions_path(name) partitions = [] topic_string = pool.with_connection { |cnx| cnx.get(path1).first } partition_ids = pool.with_connection { |cnx| cnx.children(path2) } topic_json = JSON.parse(topic_string) threads = partition_ids.map do |id| id = id.to_i Thread.new do path3 = topic_partition_state_path(name, id) partition_string = pool.with_connection { |cnx| cnx.get(path3).first } partition_json = JSON.parse(partition_string) replicas = topic_json['partitions'][id.to_s] leader = partition_json['leader'] isr = partition_json['isr'] partition_queue << Partition.new(name, id, replicas, leader, isr) end end threads.map(&:join) until partition_queue.empty? do partitions << partition_queue.pop end partitions.sort_by!(&:id) Topic.new(name, partitions) rescue ZK::Exceptions::NoNode raise NotFoundError end
get_topic_names()
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 48 def get_topic_names() return zk.children(topics_path) end
get_topics(names=nil)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 52 def get_topics(names=nil) error_msgs = {} topics = {} if names == nil pool.with_connection do |cnx| names = cnx.children(topics_path) end end threads = names.map do |name| Thread.new do begin topics[name] = get_topic(name) rescue => e error_msgs[name] = e end end end threads.map(&:join) unless error_msgs.empty? STDERR.print "ERROR: zk cmds failed on get_topics: \n#{error_msgs.values.join("\n")}\n" exit 1 end topics end
write_leader(partition, broker_id)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 126 def write_leader(partition, broker_id) path = topic_partition_state_path(partition.topic_name, partition.id) string, stat = zk.get(path) partition_json = JSON.parse(string) partition_json['leader'] = broker_id new_string = JSON.dump(partition_json) unless zk.set(path, new_string, version: stat.version) raise ChangedDuringUpdateError end end
Private Instance Methods
broker_path(id)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 153 def broker_path(id) "/brokers/ids/#{id}" end
brokers_path()
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 149 def brokers_path '/brokers/ids' end
controller_path()
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 173 def controller_path "/controller" end
pool()
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 141 def pool @pool ||= ZK.new_pool(zk_path, :min_clients => 10, :max_clients => 300, :timeout => 1) end
topic_partition_state_path(name, id)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 169 def topic_partition_state_path(name, id) "/brokers/topics/#{name}/partitions/#{id}/state" end
topic_partitions_path(name)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 165 def topic_partitions_path(name) "/brokers/topics/#{name}/partitions" end
topic_path(name)
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 161 def topic_path(name) "/brokers/topics/#{name}" end
topics_path()
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 157 def topics_path '/brokers/topics' end
zk()
click to toggle source
# File lib/kafkat/interface/zookeeper.rb, line 145 def zk @zk ||= ZK.new(zk_path) end