class DruidConfig::Cluster
Class to initialize the connection to Zookeeper
Public Class Methods
Initialize the client to perform the queries
Parameters:¶ ↑
- zk_uri
-
String with URI or URIs (sparated by comma) of Zookeeper
- options
-
Hash with options:
- discovery_path: String with the discovery path of Druid
# File lib/druid_config/cluster.rb, line 20 def initialize(zk_uri, options) # Initialize the Client DruidConfig.client = DruidConfig::Client.new(zk_uri, options) # Used to check the number of retries on error @retries = 0 # Update the base uri to perform queries self.class.base_uri( "#{DruidConfig.client.coordinator}"\ "druid/coordinator/#{DruidConfig::Version::API_VERSION}") end
Public Instance Methods
Close connection with zookeeper
# File lib/druid_config/cluster.rb, line 36 def close! DruidConfig.client.close! end
Return all datasources
Returns:¶ ↑
Array of Datasource initialized.
# File lib/druid_config/cluster.rb, line 136 def datasources datasource_status = load_status secure_query do self.class.get('/datasources?simple').map do |data| DruidConfig::Entities::DataSource.new( data, datasource_status.select { |k, _| k == data['name'] }.values.first) end end end
Return default datasource. This datasource hasn’t got metadata associated. It’s only used to read and apply default rules.
# File lib/druid_config/cluster.rb, line 165 def default_datasource DruidConfig::Entities::DataSource.new( { 'name' => DruidConfig::Entities::DataSource::DEFAULT_DATASOURCE, 'properties' => {} }, {}) end
Return failed completed tasks
# File lib/druid_config/cluster.rb, line 345 def failed_tasks complete_tasks.select(&:failed?) end
Return the leader of the Druid cluster
# File lib/druid_config/cluster.rb, line 67 def leader secure_query do self.class.get('/leader').body end end
Load queue of the cluster
# File lib/druid_config/cluster.rb, line 85 def load_queue(params = '') secure_query do self.class.get("/loadqueue?#{params}") end end
Load status of the cluster
# File lib/druid_config/cluster.rb, line 76 def load_status(params = '') secure_query do self.class.get("/loadstatus?#{params}") end end
Return a Hash with metadata of datasources
# File lib/druid_config/cluster.rb, line 97 def metadata_datasources(params = '') secure_query do self.class.get("/metadata/datasources?#{params}") end end
Return a Hash with metadata of segments
Parameters:¶ ↑
- data_source
-
String with the name of the data source
- segment
-
(Optional) Segment to search
# File lib/druid_config/cluster.rb, line 114 def metadata_datasources_segments(data_source, segment = '') end_point = "/metadata/datasources/#{data_source}/segments" secure_query do if segment.empty? || segment == 'full' self.class.get("#{end_point}?#{params}") else self.class.get("#{end_point}/#{params}") end end end
URIs of the physical servers in the cluster
Returns:¶ ↑
Array of strings
# File lib/druid_config/cluster.rb, line 238 def physical_servers secure_query do @physical_servers ||= servers.map(&:host).uniq end end
URIs of the physical workers in the cluster
# File lib/druid_config/cluster.rb, line 290 def physical_workers @physical_workers ||= workers.map(&:host).uniq end
Reset the client
# File lib/druid_config/cluster.rb, line 43 def reset! DruidConfig.client.reset! self.class.base_uri( "#{DruidConfig.client.coordinator}"\ "druid/coordinator/#{DruidConfig::Version::API_VERSION}") end
Return the rules applied to a cluster
# File lib/druid_config/cluster.rb, line 178 def rules rules = DruidConfig::Entities::RuleCollection.new secure_query do self.class.get('/rules').each do |datasource, ds_rules| ds_rules.each do |rule| rules << DruidConfig::Entities::Rule.parse(rule, datasource) end end end # Return initialized rules rules end
Return all nodes of the cluster
Returns:¶ ↑
Array of node Objects
# File lib/druid_config/cluster.rb, line 221 def servers secure_query do queue = load_queue('simple') self.class.get('/servers?simple').map do |data| DruidConfig::Entities::Node.new( data, queue.select { |k, _| k == data['host'] }.values.first) end end end
Availabe services in the cluster
Parameters:¶ ↑
Array of Hash with the format:
{ server: [ services ], server2: [ services ], ... }
# File lib/druid_config/cluster.rb, line 366 def services return @services if @services services = {} physical_nodes.each { |node| services[node] = [] } # Load services realtimes.map(&:host).uniq.each { |r| services[r] << :realtime } historicals.map(&:host).uniq.each { |r| services[r] << :historical } physical_workers.each { |w| services[w] << :middleManager } # Return nodes @services = services end
Return success completed tasks
# File lib/druid_config/cluster.rb, line 352 def success_tasks complete_tasks.select(&:success?) end
Find a task
# File lib/druid_config/cluster.rb, line 321 def task(id) DruidConfig::Entities::Task.find(id) end
Return all tiers defined in the cluster
Returns:¶ ↑
Array of Tier instances
# File lib/druid_config/cluster.rb, line 200 def tiers current_nodes = servers # Initialize tiers secure_query do current_nodes.map(&:tier).uniq.map do |tier| DruidConfig::Entities::Tier.new( tier, current_nodes.select { |node| node.tier == tier }) end end end
Return all Workers (MiddleManager) of the cluster
Returns:¶ ↑
Array of Workers
# File lib/druid_config/cluster.rb, line 273 def workers workers = [] # Perform a query query_overlord do secure_query do workers = self.class.get('/workers').map do |worker| DruidConfig::Entities::Worker.new(worker) end end end # Return workers end