class RubySkynet::Zookeeper::ServiceRegistry

Constants

IPV4_REG_EXP
ServerInfo

:score: [Integer] Score :servers: [Array<String>] ‘host:port’, ‘host:port’

Public Class Methods

new(params = {}) click to toggle source

Create a service registry See: RubyDoozer::Registry for the parameters

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 18
def initialize(params = {})
  # Registry has the following format
  #  Key: [String] 'name/version/region'
  #  Value: [Array<String>] 'host:port', 'host:port'
  @cache = ThreadSafe::Hash.new
  @notifications_cache = ThreadSafe::Hash.new

  # Keep a list of registered services so that they can be re-registered
  # if the connection is lost
  @services = ThreadSafe::Hash.new

  # Supply block to load the current keys from the Registry
  params[:root] = '/instances'
  params[:ephemeral] = true
  params[:on_connect] = Proc.new do |registry|
    # Re-Register services every time the connection to ZooKeeper is lost
    @services.values.each {|v| register_service(*v)}
  end
  @registry = Zookeeper::Registry.new(params) do |key, value|
    service_info_created(key, value)
  end
  # Register Callbacks
  @registry.on_create {|path, value| service_info_created(path, value) }
  @registry.on_update {|path, value| service_info_updated(path, value) }
  @registry.on_delete {|path|        service_info_deleted(path) }
end

Protected Class Methods

score_for_server(ip_address, local_ip_address) click to toggle source

Returns [Integer] the score for the supplied ip_address Score currently ranges from 0 to 4 with 4 being the best score If the IP address does not match an IP v4 address a DNS lookup will be performed

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 248
def self.score_for_server(ip_address, local_ip_address)
  ip_address = '127.0.0.1' if ip_address == 'localhost'
  score = 0
  # Each matching element adds 1 to the score
  # 192.168.  0.  0
  #               1
  #           1
  #       1
  #   1
  server_match = IPV4_REG_EXP.match(ip_address) || IPV4_REG_EXP.match(Resolv::DNS.new.getaddress(ip_address).to_s)
  if server_match
    local_match = IPV4_REG_EXP.match(local_ip_address)
    score = 0
    (1..4).each do |i|
      break if local_match[i].to_i != server_match[i].to_i
      score += 1
    end
  end
  score
end

Public Instance Methods

deregister_service(name, version, region, hostname, port) click to toggle source

Deregister the supplied service from the Registry

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 65
def deregister_service(name, version, region, hostname, port)
  uuid = "#{hostname}:#{port}-#{$$}-#{name}-#{version}"
  @registry.delete(File.join(uuid,'addr'), false)
  @registry.delete(File.join(uuid,'name'), false)
  @registry.delete(File.join(uuid,'version'), false)
  @registry.delete(File.join(uuid,'region'), false)
  @registry.delete(File.join(uuid,'registered'), false)
  @registry.delete(uuid, false)
  # Remove from local services list
  @services.delete(uuid)
        uuid
end
on_server_removed(server, &block) click to toggle source

Invokes registered callbacks when a specific server is shutdown or terminates Not when a server de-registers itself The callback will only be called once and will need to be re-registered after being called if future callbacks are required for that server

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 101
def on_server_removed(server, &block)
  ((@on_server_removed_callbacks ||= ThreadSafe::Hash.new)[server] ||= ThreadSafe::Array.new) << block
end
register_service(name, version, region, hostname, port) click to toggle source

Register the supplied service at this Skynet Server host and Port Returns the UUID for the service that was created

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 52
def register_service(name, version, region, hostname, port)
  uuid = "#{hostname}:#{port}-#{$$}-#{name}-#{version}"
  @registry[File.join(uuid,'addr')]       = "#{hostname}:#{port}"
  @registry[File.join(uuid,'name')]       = name
  @registry[File.join(uuid,'version')]    = version
  @registry[File.join(uuid,'region')]     = region
  @registry[File.join(uuid,'registered')] = true
  # Add to local services list
  @services[uuid] = [name, version, region, hostname, port]
  uuid
end
servers_for(name, version='*', region=RubySkynet.region) click to toggle source

Returns [Array<String>] a list of servers implementing the requested service

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 79
def servers_for(name, version='*', region=RubySkynet.region)
  if version == '*'
    # Find the highest version for the named service in this region
    version = -1
    @cache.keys.each do |key|
      if match = key.match(/#{name}\/(\d+)\/#{region}/)
        ver = match[1].to_i
        version = ver if ver > version
      end
    end
  end
  servers = if server_infos = @cache["#{name}/#{version}/#{region}"]
    server_infos.first.servers
  end
  raise ServiceUnavailable.new("No servers available for service: #{name} with version: #{version} in region: #{region}") unless servers
  servers
end
to_h() click to toggle source

Returns the Service Registry as a Hash

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 46
def to_h
  @cache.dup
end

Protected Instance Methods

add_server(key, hostname, port) click to toggle source

Add the host to the registry based on it’s score

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 169
def add_server(key, hostname, port)
  server  = "#{hostname}:#{port}"

  server_infos = (@cache[key] ||= ThreadSafe::Array.new)

  # If already present, then nothing to do
  server_info = server_infos.find{|si| si.servers.include?(server)}
  return server_info if server_info

  # Look for the same score with a different server
  score = self.class.score_for_server(hostname, RubySkynet.local_ip_address)
  logger.info "Service: #{key} now running at #{server} with score #{score}"
  if server_info = server_infos.find{|si| si.score == score}
    server_info.servers << server
    return server_info
  end

  # New score
  servers = ThreadSafe::Array.new
  servers << server
  server_info = ServerInfo.new(score, servers)

  # Insert into Array in order of score
  if index = server_infos.find_index {|si| si.score <= score}
    server_infos.insert(index, server_info)
  else
    server_infos << server_info
  end
  server_info
end
remove_server(key, hostname, port, notify) click to toggle source

Remove the host from the registry based Returns the server instance if it was removed

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 202
def remove_server(key, hostname, port, notify)
  server = "#{hostname}:#{port}"
  logger.info "Service: #{key} stopped running at #{server}"
  server_info = nil
  if server_infos = @cache[key]
    server_infos.each do |si|
      if si.servers.delete(server)
        server_info = si
        break
      end
    end

    # Found server
    if server_info
      # Cleanup if no more servers in server list
      server_infos.delete(server_info) if server_info.servers.size == 0

      # Cleanup if no more server infos
      @cache.delete(key) if server_infos.size == 0

      server_removed(server) if notify
    end
  end
  server_info
end
server_removed(server) click to toggle source

Invoke any registered callbacks for the specific server

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 229
def server_removed(server)
  if @on_server_removed_callbacks && (callbacks = @on_server_removed_callbacks.delete(server))
    callbacks.each do |block|
      begin
        logger.debug "Calling callback for server: #{server}"
        block.call(server)
      rescue Exception => exc
        logger.error("Exception during a callback for server: #{server}", exc)
      end
    end
  end
end
service_info_created(path, value=nil) click to toggle source

Service information created

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 124
def service_info_created(path, value=nil)
  logger.trace("service_info_created: #{path}", value)
  # path: "uuid/key"
  uuid, key = path.split('/')

  server = (@notifications_cache[uuid] ||= ThreadSafe::Hash.new)
  server[key] = value

  # Check if service is 'registered' and has all required attributes
  if (server['registered'] == true) && server['addr'] && server['name'] && server['version'] && server['region']
    hostname, port = server['addr'].split(':')
    add_server(File.join(server['name'], server['version'].to_s, server['region']), hostname, port)
  end
end
service_info_deleted(path) click to toggle source
# File lib/ruby_skynet/zookeeper/service_registry.rb, line 108
def service_info_deleted(path)
  logger.trace("service_info_deleted: #{path}")
  # path: "uuid/key"
  uuid, key = path.split('/')
  # If any child of the node is deleted, deregister the service,
  # not just for (key == 'registered')
  server = @notifications_cache.delete(uuid)
  # Handle partially registered services
  if server && server['addr'] && server['name'] && server['version'] && server['region']
    hostname, port = server['addr'].split(':')
    # Service has stopped and needs to be removed
    remove_server(File.join(server['name'], server['version'].to_s, server['region']), hostname, port, true)
  end
end
service_info_updated(path, value=nil) click to toggle source

Service information changed

# File lib/ruby_skynet/zookeeper/service_registry.rb, line 140
def service_info_updated(path, value=nil)
  logger.trace("service_info_updated: #{path}", value)
  # path: "uuid/key"
  uuid, key = path.split('/')

  # Big Assumption: 'registered' event will be received last as true
  if (key == 'registered') && (value == true)
    server = @notifications_cache[uuid]
    if server && server['addr'] && server['name'] && server['version'] && server['region']
      hostname, port = server['addr'].split(':')
      add_server(File.join(server['name'], server['version'].to_s, server['region']), hostname, port)
    else
      logger.warn "Registered notification received for #{uuid} but is missing critical information. Received:", server
    end
  else
    (@notifications_cache[uuid] ||= ThreadSafe::Hash.new)[key] = value
  end
end