class RubySkynet::Doozer::ServiceRegistry

Constants

IPV4_REG_EXP
ServerInfo

:score: [Integer] Score :servers: [Array<String>] ‘host:port’, ‘host:port’

Public Class Methods

new() click to toggle source

Create a service registry See: RubyDoozer::Registry for the parameters

# File lib/ruby_skynet/doozer/service_registry.rb, line 18
def initialize
  # Registry has the following format
  #  Key: [String] 'name/version/region'
  #  Value: [Array<String>] 'host:port', 'host:port'
  @cache = ThreadSafe::Hash.new

  # Supply block to load the current keys from the Registry
  @registry = Doozer::Registry.new(:root => '/services') do |key, value|
    service_info_changed(key, value)
  end
  # Register Callbacks
  @registry.on_update {|path, value| service_info_changed(path, value) }
  @registry.on_delete {|path|        service_info_changed(path) }

  # Zookeeper Registry also supports on_create
  @registry.on_create {|path, value| service_info_changed(path, value) } if @registry.respond_to?(:on_create)
end

Protected Class Methods

score_for_server(ip_address, local_ip_address) click to toggle source

Returns [Integer] the score for the supplied ip_address Score currently ranges from 0 to 4 with 4 being the best score If the IP address does not match an IP v4 address a DNS lookup will be performed

# File lib/ruby_skynet/doozer/service_registry.rb, line 206
def self.score_for_server(ip_address, local_ip_address)
  ip_address = '127.0.0.1' if ip_address == 'localhost'
  score = 0
  # Each matching element adds 1 to the score
  # 192.168.  0.  0
  #               1
  #           1
  #       1
  #   1
  server_match = IPV4_REG_EXP.match(ip_address) || IPV4_REG_EXP.match(Resolv::DNS.new.getaddress(ip_address).to_s)
  if server_match
    local_match = IPV4_REG_EXP.match(local_ip_address)
    score = 0
    (1..4).each do |i|
      break if local_match[i].to_i != server_match[i].to_i
      score += 1
    end
  end
  score
end

Public Instance Methods

deregister_service(name, version, region, hostname, port) click to toggle source

Deregister the supplied service from the Registry

# File lib/ruby_skynet/doozer/service_registry.rb, line 60
def deregister_service(name, version, region, hostname, port)
  @registry.delete("#{name}/#{version}/#{region}/#{hostname}/#{port}")
end
on_server_removed(server, &block) click to toggle source

Invokes registered callbacks when a specific server is shutdown or terminates Not when a server de-registers itself The callback will only be called once and will need to be re-registered after being called if future callbacks are required for that server

# File lib/ruby_skynet/doozer/service_registry.rb, line 87
def on_server_removed(server, &block)
  ((@on_server_removed_callbacks ||= ThreadSafe::Hash.new)[server] ||= ThreadSafe::Array.new) << block
end
register_service(name, version, region, hostname, port) click to toggle source

Register the supplied service at this Skynet Server host and Port

# File lib/ruby_skynet/doozer/service_registry.rb, line 42
def register_service(name, version, region, hostname, port)
  @registry["#{name}/#{version}/#{region}/#{hostname}/#{port}"] = {
    "Config" => {
      "UUID"    => "#{hostname}:#{port}-#{$$}-#{name}-#{version}",
      "Name"    => name,
      "Version" => version.to_s,
      "Region"  => region,
      "ServiceAddr" => {
        "IPAddress" => hostname,
        "Port"      => port,
        "MaxPort"   => port + 999
      },
    },
    "Registered" => true
  }
end
servers_for(name, version='*', region=RubySkynet.region) click to toggle source

Returns [Array<String>] a list of servers implementing the requested service

# File lib/ruby_skynet/doozer/service_registry.rb, line 65
def servers_for(name, version='*', region=RubySkynet.region)
  if version == '*'
    # Find the highest version for the named service in this region
    version = -1
    @cache.keys.each do |key|
      if match = key.match(/#{name}\/(\d+)\/#{region}/)
        ver = match[1].to_i
        version = ver if ver > version
      end
    end
  end
  servers = if server_infos = @cache["#{name}/#{version}/#{region}"]
    server_infos.first.servers
  end
  raise ServiceUnavailable.new("No servers available for service: #{name} with version: #{version} in region: #{region}") unless servers
  servers
end
to_h() click to toggle source

Returns the Service Registry as a Hash

# File lib/ruby_skynet/doozer/service_registry.rb, line 37
def to_h
  @cache.dup
end

Protected Instance Methods

add_server(key, hostname, port) click to toggle source

Add the host to the registry based on it’s score

# File lib/ruby_skynet/doozer/service_registry.rb, line 127
def add_server(key, hostname, port)
  server  = "#{hostname}:#{port}"

  server_infos = (@cache[key] ||= ThreadSafe::Array.new)

  # If already present, then nothing to do
  server_info = server_infos.find{|si| si.servers.include?(server)}
  return server_info if server_info

  # Look for the same score with a different server
  score = self.class.score_for_server(hostname, RubySkynet.local_ip_address)
  logger.info "Service: #{key} now running at #{server} with score #{score}"
  if server_info = server_infos.find{|si| si.score == score}
    server_info.servers << server
    return server_info
  end

  # New score
  servers = ThreadSafe::Array.new
  servers << server
  server_info = ServerInfo.new(score, servers)

  # Insert into Array in order of score
  if index = server_infos.find_index {|si| si.score <= score}
    server_infos.insert(index, server_info)
  else
    server_infos << server_info
  end
  server_info
end
remove_server(key, hostname, port, notify) click to toggle source

Remove the host from the registry based Returns the server instance if it was removed

# File lib/ruby_skynet/doozer/service_registry.rb, line 160
def remove_server(key, hostname, port, notify)
  server = "#{hostname}:#{port}"
  logger.info "Service: #{key} stopped running at #{server}"
  server_info = nil
  if server_infos = @cache[key]
    server_infos.each do |si|
      if si.servers.delete(server)
        server_info = si
        break
      end
    end

    # Found server
    if server_info
      # Cleanup if no more servers in server list
      server_infos.delete(server_info) if server_info.servers.size == 0

      # Cleanup if no more server infos
      @cache.delete(key) if server_infos.size == 0

      server_removed(server) if notify
    end
  end
  server_info
end
server_removed(server) click to toggle source

Invoke any registered callbacks for the specific server

# File lib/ruby_skynet/doozer/service_registry.rb, line 187
def server_removed(server)
  if @on_server_removed_callbacks && (callbacks = @on_server_removed_callbacks.delete(server))
    callbacks.each do |block|
      begin
        logger.debug "Calling callback for server: #{server}"
        block.call(server)
      rescue Exception => exc
        logger.error("Exception during a callback for server: #{server}", exc)
      end
    end
  end
end
service_info_changed(path, value=nil) click to toggle source

Service information changed in doozer, so update internal registry

# File lib/ruby_skynet/doozer/service_registry.rb, line 95
def service_info_changed(path, value=nil)
  logger.info("service_info_changed: #{path}", value)
  # path: "TutorialService/1/Development/127.0.0.1/9000"
  e = path.split('/')

  # Key: [String] 'name/version/region'
  key = "#{e[0]}/#{e[1]}/#{e[2]}"
  hostname, port = e[3], e[4]

  if value
    if value['Registered']
      add_server(key, hostname, port)
    else
      # Service just de-registered
      remove_server(key, hostname, port, false)
    end
  else
    # Service has stopped and needs to be removed
    remove_server(key, hostname, port, true)
  end
end