class RubySkynet::Zookeeper::Registry

Registry

Store information in Zookeepr and subscribe to future changes

Notifies registered subscribers when information has changed

All paths specified are relative to the root. As such the root key is never returned, nor is it required when a key is supplied as input. For example, with a root of /foo/bar, any paths passed in will leave out the root: host/name

Attributes

root[R]

Public Class Methods

new(params, &block) click to toggle source

Create a Registry instance to manage a information within Zookeeper

:root [String]

Root key to load and then monitor for changes
It is not recommended to set the root to "/" as it will generate
significant traffic since it will also monitor ZooKeeper Admin changes
Mandatory

:ephemeral [Boolean]

All set operations of non-nil values will result in ephemeral nodes.

:on_connect [Proc]

Block to call after the connection to Zookeeper has been established
and every time the connection is re-established

:registry [Hash]

:servers [Array of String]
  Array of URL's of ZooKeeper servers to connect to with port numbers
  ['server1:2181', 'server2:2181']

:connect_timeout [Float]
  Time in seconds to timeout when trying to connect to the server

Optional Block

The block will be called for every key found in the registry on startup

Example:

require 'ruby_skynet/zookeeper'
registry = RubySkynet::Zookeeper::Registry.new(root: '/registry') do |key, value, version|
  puts "Found #{key} => '#{value}' V#{version}"
end
# File lib/ruby_skynet/zookeeper/registry.rb, line 62
def initialize(params, &block)
  params = params.dup
  @root = params.delete(:root)
  raise "Missing mandatory parameter :root" unless @root

  # Add leading '/' to root if missing
  @root = "/#{@root}" unless @root.start_with?('/')

  # Strip trailing '/' if supplied
  @root = @root[0..-2] if @root.end_with?("/")
  @root_with_trail = "#{@root}/"
  @root = '/' if @root == ''

  registry_config = (params.delete(:registry) || {}).dup

  #   server1:2181,server2:2181,server3:2181
  @servers = (registry_config.delete(:servers) || ['127.0.0.1:2181']).join(',')
  @connect_timeout = (registry_config.delete(:connect_timeout) || 10).to_f

  # Generate warning log entries for any unknown configuration options
  registry_config.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: zookeeper.#{k}"}

  # Allow the serializer and deserializer implementations to be replaced
  @serializer   = params.delete(:serializer)   || RubySkynet::Zookeeper::Json::Serializer
  @deserializer = params.delete(:deserializer) || RubySkynet::Zookeeper::Json::Deserializer

  @ephemeral = params.delete(:ephemeral)
  @ephemeral = false if @ephemeral.nil?

  @on_connect = params.delete(:on_connect)

  # Generate warning log entries for any unknown configuration options
  params.each_pair {|k,v| logger.warn "Ignoring unknown configuration option: #{k}"}

  # Hash with Array values containing the list of children for each node, if any
  @children = ThreadSafe::Hash.new

  # Block is used in init
  @block = block

  self.init
end

Public Instance Methods

[](key) click to toggle source

Retrieve the latest value from a specific path from the registry Returns nil when the key is not present in the registry

# File lib/ruby_skynet/zookeeper/registry.rb, line 107
def [](key)
  result = @zookeeper.get(:path => full_key(key))
  case result[:rc]
  when ::Zookeeper::ZOK
    @deserializer.deserialize(result[:data])
  when ::Zookeeper::ZNONODE
    # Return nil if node not present
  else
    check_rc(result)
  end
end
[]=(key,value) click to toggle source

Replace the latest value at a specific key Supplying a nil value will result in the key being deleted in ZooKeeper

# File lib/ruby_skynet/zookeeper/registry.rb, line 121
def []=(key,value)
  if value.nil?
    delete(key)
    return value
  end
  v = @serializer.serialize(value)
  k = full_key(key)
  result = @zookeeper.set(:path => k, :data => v)
  if result[:rc] == ::Zookeeper::ZNONODE
    create_path(k, v)
  else
    check_rc(result)
  end
  value
end
close() click to toggle source

Cleanup on process termination

# File lib/ruby_skynet/zookeeper/registry.rb, line 191
def close
  @zookeeper.close if @zookeeper
  @zookeeper = nil
end
delete(key, remove_empty_parents=true) click to toggle source

Delete the value at a specific key and any parent nodes if they don’t have any children or values

Params

remove_empty_parents
  If set to true it will also delete any parent nodes that have no
  children or value

Returns nil

# File lib/ruby_skynet/zookeeper/registry.rb, line 146
def delete(key, remove_empty_parents=true)
  result = @zookeeper.delete(:path => full_key(key))
  return if result[:rc] == ::Zookeeper::ZNONODE
  check_rc(result)

  if remove_empty_parents
    paths = key.split('/')
    paths.pop
    while paths.size > 0
      parent_path = full_key(paths.join('/'))
      result = @zookeeper.get(:path => parent_path)
      break if (result[:rc] == ::Zookeeper::ZNONODE) || (result[:data] != nil)

      delete(parent_path)
      paths.pop
    end
  end
  nil
end
each_pair(relative_path = '', &block) click to toggle source

Iterate over every key, value pair in the registry Optional relative path can be supplied Returns the number of nodes iterated over

Example:

registry.each_pair {|k,v| puts "#{k} => #{v}"}
# File lib/ruby_skynet/zookeeper/registry.rb, line 172
def each_pair(relative_path = '', &block)
  get_recursive(full_key(relative_path), watch=false, &block)
end
keys() click to toggle source

Returns [Array<String>] all keys in the registry

# File lib/ruby_skynet/zookeeper/registry.rb, line 177
def keys
  keys = []
  each_pair {|k,v| keys << k}
  keys
end
on_create(key='*', &block) click to toggle source

When an entry is created the block will be called

Parameters
  key
    The relative key to watch for changes
  block
    The block to be called

Parameters passed to the block:
  key
    The key that was created
    Supplying a key of '*' means all paths
    Default: '*'

  value
    New value from the registry

  version
    The version number of this node

Example:

registry.on_update do |key, value, revision|
  puts "#{key} was created with #{value}"
end

Note: They key must either be the exact path or ‘*’ for all keys

# File lib/ruby_skynet/zookeeper/registry.rb, line 221
def on_create(key='*', &block)
  ((@create_subscribers ||= ThreadSafe::Hash.new)[key] ||= ThreadSafe::Array.new) << block
end
on_delete(key='*', &block) click to toggle source

When an entry is deleted the block will be called

Parameters
  key
    The relative key to watch for changes
  block
    The block to be called

Parameters passed to the block:
  key
    The key that was deleted from the registry
    Supplying a key of '*' means all paths
    Default: '*'

Example:

registry.on_delete do |key, revision|
  puts "#{key} was deleted"
end

Note: They key must either be the exact path or ‘*’ for all keys

# File lib/ruby_skynet/zookeeper/registry.rb, line 273
def on_delete(key='*', &block)
  ((@delete_subscribers ||= ThreadSafe::Hash.new)[key] ||= ThreadSafe::Array.new) << block
end
on_update(key='*', &block) click to toggle source

When an entry is updated the block will be called

Parameters
  key
    The relative key to watch for changes
  block
    The block to be called

Parameters passed to the block:
  key
    The key that was updated in the registry
    Supplying a key of '*' means all paths
    Default: '*'

  value
    New value from the registry

  version
    The version number of this node

Example:

registry.on_update do |key, value, version|
  puts "#{key} was updated to #{value}"
end

Note: They key must either be the exact path or ‘*’ for all keys

# File lib/ruby_skynet/zookeeper/registry.rb, line 250
def on_update(key='*', &block)
  ((@update_subscribers ||= ThreadSafe::Hash.new)[key] ||= ThreadSafe::Array.new) << block
end
to_h() click to toggle source

Returns a copy of the registry as a Hash

# File lib/ruby_skynet/zookeeper/registry.rb, line 184
def to_h
  h = {}
  each_pair {|k,v| h[k] = v}
  h
end

Protected Instance Methods

check_rc(result) click to toggle source

Checks the return code from ZooKeeper and raises an exception if it is non-zero

# File lib/ruby_skynet/zookeeper/registry.rb, line 479
def check_rc(result)
  if result[:rc] != ::Zookeeper::ZOK
    logger.error "Zookeeper failure", result
    ::Zookeeper::Exceptions.raise_on_error(result[:rc])
  end
end
create_path(full_path, value=nil) click to toggle source

Create the supplied path and set the supplied value

Navigates through tree and creates all required parents with no values
as needed to create child node with its value

Note: Value must already be serialized

# File lib/ruby_skynet/zookeeper/registry.rb, line 306
def create_path(full_path, value=nil)
  paths = full_path.split('/')
  # Don't create the child node yet
  paths.pop
  paths.shift
  path = ''
  paths.each do |p|
    path << "/#{p}"
    # Ignore errors since it may already exist
    @zookeeper.create(:path => path)
  end
  if value
    @zookeeper.create(:path => full_path, :data => value, :ephemeral => @ephemeral)
  else
    @zookeeper.create(:path => full_path)
  end
end
full_key(relative_key) click to toggle source

Returns the full key given a relative key

# File lib/ruby_skynet/zookeeper/registry.rb, line 281
def full_key(relative_key)
  relative_key = strip_slash(relative_key)
  relative_key == '' ? @root : File.join(@root,relative_key)
end
get_recursive(full_path, watch=false, create_path=true, &block) click to toggle source

Recursively fetches all the values in the registry and optionally registers and starts watching all nodes in ZooKeeper Returns the number of nodes iterated over

Optionally supply a block of code to be called when each node is being subscribed to. It calls the block supplying the value of that node along with its relative path

Example:

get_recursive(full_key(relative_path), true)
# File lib/ruby_skynet/zookeeper/registry.rb, line 429
def get_recursive(full_path, watch=false, create_path=true, &block)
  watch_proc = watcher if watch

  # Get value for this node
  result = @zookeeper.get(:path => full_path, :watcher => watch_proc)

  # Create the path if it does not exist
  if create_path && (result[:rc] == ::Zookeeper::ZNONODE)
    create_path(full_path)
    result = @zookeeper.get(:path => full_path, :watcher => watch_proc)
  end

  # Cannot find this node
  return 0 if result[:rc] == ::Zookeeper::ZNONODE

  check_rc(result)
  value = @deserializer.deserialize(result[:data])
  stat = result[:stat]

  # ZooKeeper assigns a nil value to all parent nodes when no value is supplied
  # Call block if this is a leaf node, or if it is a parent node with a value
  if block && ((stat.num_children == 0) || value != nil)
    block.call(relative_key(full_path), value, stat.version, stat.num_children)
  end

  # Iterate over children if any
  node_count = 1
  # Ephemeral nodes cannot have children
  if !(stat.ephemeral_owner && (stat.ephemeral_owner != 0)) && (watch || (stat.num_children > 0))
    # Also watch this node for child changes
    result = @zookeeper.get_children(:path => full_path, :watcher => watch_proc)

    # This node could have been deleted already
    if result[:rc] == ::Zookeeper::ZOK
      children = result[:children]

      # Save the current list of children so that we can figure out what
      # a child changed event actually means
      @children[full_path] = children if watch

      # Also watch children nodes
      children.each do |child|
        node_count += get_recursive(File.join(full_path,child), watch, &block)
      end
    end
  end
  node_count
end
init() click to toggle source

Create ZooKeeper connection and start watching the registry for any changes

# File lib/ruby_skynet/zookeeper/registry.rb, line 538
def init
  logger.benchmark_info "Connected to Zookeeper" do
    @zookeeper.close if @zookeeper
    # Create Zookeeper connection
    @zookeeper = ::Zookeeper.new(@servers, @connect_timeout, watcher)

    # Start watching registry for any changes
    get_recursive(@root, watch=true, create_path=true, &@block)

    # Call on_connect callback if supplied
    @on_connect.call(self) if @on_connect
  end
end
node_created(key, value, version) click to toggle source

The key was created in the registry

# File lib/ruby_skynet/zookeeper/registry.rb, line 487
def node_created(key, value, version)
  logger.debug "Created: #{key}", value

  return unless @create_subscribers

  # Subscribers to specific paths
  if subscribers = @create_subscribers[key]
    subscribers.each{|subscriber| subscriber.call(key, value, version)}
  end

  # Any subscribers for all events?
  if all_subscribers = @create_subscribers['*']
    all_subscribers.each{|subscriber| subscriber.call(key, value, version)}
  end
end
node_deleted(key) click to toggle source

An existing key was removed from the registry

# File lib/ruby_skynet/zookeeper/registry.rb, line 521
def node_deleted(key)
  logger.debug { "Deleted: #{key}" }

  return unless @delete_subscribers

  # Subscribers to specific paths
  if subscribers = @delete_subscribers[key]
    subscribers.each{|subscriber| subscriber.call(key)}
  end

  # Any subscribers for all events?
  if all_subscribers = @delete_subscribers['*']
    all_subscribers.each{|subscriber| subscriber.call(key)}
  end
end
node_updated(key, value, version) click to toggle source

The key was updated in the registry

# File lib/ruby_skynet/zookeeper/registry.rb, line 504
def node_updated(key, value, version)
  logger.debug "Updated: #{key}", value

  return unless @update_subscribers

  # Subscribers to specific paths
  if subscribers = @update_subscribers[key]
    subscribers.each{|subscriber| subscriber.call(key, value, version)}
  end

  # Any subscribers for all events?
  if all_subscribers = @update_subscribers['*']
    all_subscribers.each{|subscriber| subscriber.call(key, value, version)}
  end
end
relative_key(full_key) click to toggle source

Returns the full key given a relative key

# File lib/ruby_skynet/zookeeper/registry.rb, line 287
def relative_key(full_key)
  key = full_key.sub(@root_with_trail, '')
  key == '' ? '/' : key
end
strip_slash(path) click to toggle source

Strip leading and trailing ‘/’

# File lib/ruby_skynet/zookeeper/registry.rb, line 296
def strip_slash(path)
  path = path[1..-1] if path.start_with?('/')
  path = path[0..-2] if path.end_with?('/')
  path
end
watcher() click to toggle source

returns the watcher proc for this registry instance

# File lib/ruby_skynet/zookeeper/registry.rb, line 325
def watcher
  # Subscription block to call for watch events
  @watch_proc ||= Proc.new do |event_hash|
    begin
      path = event_hash[:path]
      logger.trace "Event Received", event_hash
      case event_hash[:type]
      when ::Zookeeper::ZOO_CHANGED_EVENT
        begin
          logger.debug "Node '#{path}' Changed", event_hash

          # Fetch current value and re-subscribe
          result = @zookeeper.get(:path => path, :watcher => @watch_proc)
          check_rc(result)
          value = @deserializer.deserialize(result[:data])
          stat = result[:stat]

          # Invoke on_update callbacks
          node_updated(relative_key(path), value, stat.version)
        rescue ::Zookeeper::Exceptions::NoNode
          # Ignore errors where the value changes and then deleted
        end

      when ::Zookeeper::ZOO_DELETED_EVENT
        # A node has been deleted
        # TODO How to ignore child deleted when it is a directory, not a leaf
        logger.debug "Node '#{path}' Deleted", event_hash
        @children.delete(path)
        node_deleted(relative_key(path))

      when ::Zookeeper::ZOO_CHILD_EVENT
        # The list of nodes has changed - Does not say if it was added or removed
        logger.debug "Node '#{path}' Child changed", event_hash
        result = @zookeeper.get_children(:path => path, :watcher => @watch_proc)

        # This node could have been deleted already
        if result[:rc] == ::Zookeeper::ZOK
          current_children = result[:children]
          previous_children = @children[path]

          # Save children so that we can later identify new children
          @children[path] = current_children

          # New Child Nodes
          new_nodes = previous_children ? (current_children - previous_children) : current_children
          new_nodes.each do |child|
            get_recursive(File.join(path,child), true) do |key, value, version|
              node_created(key, value, version)
            end
          end
          # Ignore Deleted Child Nodes since they will be handled by the Deleted Node event
        end

      when ::Zookeeper::ZOO_CREATED_EVENT
        # Node created events are only created for paths that were deleted
        # and then created again
        # No op - This is covered by node_child created event
        logger.debug "Node '#{path}' Created - No op", event_hash

      when ::Zookeeper::ZOO_SESSION_EVENT
        logger.debug "Session Event: #{@zookeeper.state_by_value(event_hash[:state]) if @zookeeper}", event_hash

        # Replace zookeeper connection since it is stale. Only react to global request
        # since this event will be received for every node being watched.
        #   Do not close the current connection since this background watcher thread is running
        #   as part of the current zookeeper connection
        #     event_hash => {:req_id=>-1, :type=>-1, :state=>-112, :path=>"", :context=>nil}
        if @zookeeper && !@zookeeper.closed? && (event_hash[:req_id] == -1) && (event_hash[:state] == ::Zookeeper::ZOO_EXPIRED_SESSION_STATE)
          Thread.new do
            begin
              self.init
            rescue ::Zookeeper::Exceptions::ZookeeperException => exc
              logger.warn "Failed to reconnect to Zookeeper. Assuming shutdown in progress", exc
              # These can occur during a shutdown scenario, hopefully not during
              # an actual network or connection loss to the Zookeeper Server
            end
          end
        end

      when ::Zookeeper::ZOO_NOTWATCHING_EVENT
        logger.debug "Ignoring ZOO_NOTWATCHING_EVENT", event_hash

      else
        # TODO Need to re-load registry when re-connected
        logger.warn "Ignoring unknown event", event_hash
      end
    rescue ::Zookeeper::Exceptions::ZookeeperException => exc
      logger.warn "Watching thread failed due to Zookeeper failure", exc
    rescue Exception => exc
      logger.error "Watching thread failed due to unhandled exception", exc
    end
  end
end