class DruidConfig::ZK

Class to connect and get information about nodes in cluster using Zookeeper

Constants

COORDINATOR

Coordinator service

OVERLORD
SERVICES

Public Class Methods

new(uri, opts = {}) click to toggle source

Initialize variables and call register

Parameters:

uri

Uri of zookeper

opts

Hash with options:

- discovery_path: Custom URL of discovery path for Druid
# File lib/druid_config/zk.rb, line 31
def initialize(uri, opts = {})
  # Control Zookeper connection
  @zk = ::ZK.new(uri, chroot: :check)
  @registry = Hash.new { |hash, key| hash[key] = [] }
  @discovery_path = opts[:discovery_path] || '/discovery'
  @watched_services = {}
  @verify_retry = 0
  register
end

Public Instance Methods

check_service(service) click to toggle source

Check a given Druid service. Now we only need to track coordinator and overlord services. This method create a watcher to the service to check changes.

This method get the available nodes in the Zookeeper path. When return them, it tries to connect to /status end point to check if the node is available. After it, it store in @registry.

Parameters:

service

String with the service to check

# File lib/druid_config/zk.rb, line 209
def check_service(service)
  # Only watch some services
  return if @watched_services.include?(service) ||
            !SERVICES.include?(service)
  # Start to watch this service
  watch_service(service)
  # New list of nodes
  new_list = []

  # Verify every node
  live = @zk.children(watch_path(service), watch: true)
  live.each do |name|
    # Verify a node
    uri = verify_node(name, service)
    # If != false store the URI
    new_list.push(name: name, uri: uri) if uri
  end
  # Register new service in the registry
  register_service(service, new_list)
end
check_services() click to toggle source

Check current services

# File lib/druid_config/zk.rb, line 135
def check_services
  $log.info("druid.zk checking services") if $log
  zk_services = @zk.children(@discovery_path, watch: true)

  (services - zk_services).each do |service|
    unregister_service(service)
  end

  zk_services.each do |service|
    check_service(service)
  end
end
close!() click to toggle source

Force to close Zookeper connection

# File lib/druid_config/zk.rb, line 57
def close!
  $log.info('druid.zk shutting down') if $log
  @zk.close!
end
coordinator() click to toggle source

Return the URI of a random available coordinator. Poor mans load balancing

# File lib/druid_config/zk.rb, line 66
def coordinator
  random_node(COORDINATOR)
end
overlord() click to toggle source

Return the URI of a random available overlord. Poor mans load balancing

# File lib/druid_config/zk.rb, line 74
def overlord
  random_node(OVERLORD)
end
random_node(service) click to toggle source

Return a random value of a service

Parameters:

service

String with the name of the service

# File lib/druid_config/zk.rb, line 85
def random_node(service)
  return nil if @registry[service].size == 0
  # Return a random broker from available brokers
  @registry[service].sample[:uri]
end
register() click to toggle source

Load the data from Zookeeper

# File lib/druid_config/zk.rb, line 44
def register
  $log.info('druid.zk register discovery path') if $log
  @zk.on_expired_session { register }
  @zk.register(@discovery_path, only: :child) do
    $log.info('druid.zk got event on discovery path') if $log
    check_services
  end
  check_services
end
register_service(service, brokers) click to toggle source

Register a new service

# File lib/druid_config/zk.rb, line 94
def register_service(service, brokers)
  $log.info("druid.zk register", service: service, brokers: brokers) if $log
  # poor mans load balancing
  @registry[service] = brokers
end
services() click to toggle source

Get all available services

# File lib/druid_config/zk.rb, line 233
def services
  @registry.keys
end
to_s() click to toggle source
# File lib/druid_config/zk.rb, line 237
def to_s
  @registry.to_s
end
unregister_service(service) click to toggle source

Unregister a service

# File lib/druid_config/zk.rb, line 103
def unregister_service(service)
  $log.info("druid.zk unregister", service: service) if $log
  @registry.delete(service)
  unwatch_service(service)
end
unwatch_service(service) click to toggle source

Unset a service to watch

# File lib/druid_config/zk.rb, line 126
def unwatch_service(service)
  return unless @watched_services.include?(service)
  $log.info("druid.zk unwatch", service: service) if $log
  @watched_services.delete(service).unregister
end
verify_node(name, service) click to toggle source

Verify is a Coordinator is available. To do check, this method perform a GET request to the /status end point. This method will retry to connect three times with a delay of 0.8, 1.6, 2.4 seconds.

Parameters:

name

String with the name of the coordinator

service

String with the service

Returns:

URI of the coordinator or false

# File lib/druid_config/zk.rb, line 162
def verify_node(name, service)
  $log.info("druid.zk verify", node: name, service: service) if $log
  info = @zk.get("#{watch_path(service)}/#{name}")
  node = JSON.parse(info[0])
  uri = "http://#{node['address']}:#{node['port']}/"
  # Try to get /status
  check = RestClient::Request.execute(
    method: :get, url: "#{uri}status",
    timeout: 5, open_timeout: 5
  )
  $log.info("druid.zk verified", uri: uri, sources: check) if $log
  return uri if check.code == 200
rescue
  return false unless @verify_retry < 3
  # Sleep some time and retry
  @verify_retry += 1
  sleep 0.8 * @verify_retry
  retry
ensure
  # Reset verify retries
  @verify_retry = 0
end
watch_path(service) click to toggle source

Return the path of a service in Zookeeper.

Parameters:

service

String with the name of the service

# File lib/druid_config/zk.rb, line 192
def watch_path(service)
  "#{@discovery_path}/#{service}"
end
watch_service(service) click to toggle source

Set a watcher for a service

# File lib/druid_config/zk.rb, line 112
def watch_service(service)
  return if @watched_services.include?(service)
  $log.info("druid.zk watch", service: service) if $log
  watch = @zk.register(watch_path(service), only: :child) do |event|
    $log.info("druid.zk got event on watch path for", service: service, event: event) if $log
    unwatch_service(service)
    check_service(service)
  end
  @watched_services[service] = watch
end