class ZkRecipes::Cache

Constants

AS_NOTIFICATION
USE_DEFAULT

Public Class Methods

new(logger: nil, host: nil, timeout: nil, zk_opts: {}) { |self| ... } click to toggle source
# File lib/zk_recipes/cache.rb, line 10
def initialize(logger: nil, host: nil, timeout: nil, zk_opts: {})
  @cache = Concurrent::Map.new
  @latch = Concurrent::CountDownLatch.new
  @logger = logger
  @pending_updates = Concurrent::Hash.new # Concurrent::Map does not implement #reject!
  @registerable = true
  @registered_values = Concurrent::Map.new
  @session_id = nil
  @watches = Concurrent::Map.new
  @zk = nil

  if block_given?
    @owned_zk = true
    @warm_cache_timeout = timeout || 30
    yield(self)

    expiration = Time.now + @warm_cache_timeout
    connect(host, zk_opts)

    wait_for_warm_cache(expiration - Time.now)
  elsif host || timeout || !zk_opts.empty?
    raise ArgumentError, "host, zk_opts, and timeout are only allowed with a block"
  else
    @owned_zk = false
  end
end

Public Instance Methods

[](path)
Alias for: fetch
close!() click to toggle source
# File lib/zk_recipes/cache.rb, line 99
def close!
  @watches.each_value(&:unsubscribe)
  @zk.close! if @owned_zk
  @watches.clear
  @pending_updates.clear
end
fetch(path) click to toggle source
# File lib/zk_recipes/cache.rb, line 119
def fetch(path)
  @cache.fetch(path).value
rescue KeyError
  raise PathError, "no registered path for #{path.inspect}"
end
Also aliased as: []
fetch_valid(path) click to toggle source
# File lib/zk_recipes/cache.rb, line 126
def fetch_valid(path)
  cached = @cache.fetch(path)
  cached.value if cached.valid?
rescue KeyError
  raise PathError, "no registered path=#{path.inspect}"
end
register(path, default_value, &block) click to toggle source
# File lib/zk_recipes/cache.rb, line 37
def register(path, default_value, &block)
  raise Error, "register only allowed before setup_callbacks called" unless @registerable

  debug { "added path=#{path} default_value=#{default_value.inspect}" }
  @cache[path] = CachedPath.new(default_value)
  @registered_values[path] = RegisteredPath.new(default_value, block)
  ActiveSupport::Notifications.instrument(AS_NOTIFICATION, path: path, value: default_value)
end
reopen() click to toggle source

reopen the client after the process forks This is not the opposite of `#close!`.

# File lib/zk_recipes/cache.rb, line 108
def reopen
  @latch = Concurrent::CountDownLatch.new
  @session_id = nil
  @pending_updates.clear
  if @owned_zk
    expiration = Time.now + @warm_cache_timeout
    @zk.reopen
    wait_for_warm_cache(expiration - Time.now)
  end
end
setup_callbacks(zk) click to toggle source
# File lib/zk_recipes/cache.rb, line 46
def setup_callbacks(zk)
  raise Error, "setup_callbacks can only be called once" unless @registerable
  @zk = zk
  @registerable = false

  if @zk.connected? || @zk.connecting?
    raise Error, "the ZK::Client is already connected, the cached values must be set before connecting"
  end

  @registered_values.each do |path, _value|
    @watches[path] = @zk.register(path) do |event|
      if event.node_event?
        debug { "node event path=#{event.path} #{event.event_name} #{event.state_name}" }
        unless update_cache(event.path)
          @pending_updates[path] = nil
          @zk.defer { process_pending_updates }
        end
      else
        warn { "session event #{event.event_name} #{event.state_name}" }
      end
    end
  end

  @watches["on_connected"] = @zk.on_connected do
    if @session_id == @zk.session_id
      process_pending_updates
      next
    end

    debug("on_connected new session")
    @pending_updates.clear
    @registered_values.each do |path, _value|
      @pending_updates[path] = nil unless update_cache(path)
    end
    @session_id = @zk.session_id
    @latch.count_down
  end

  @zk.on_exception do |e|
    error { "on_exception exception=#{e.inspect} backtrace=#{e.backtrace.inspect}" }
  end
end
wait_for_warm_cache(timeout = 30) click to toggle source
# File lib/zk_recipes/cache.rb, line 89
def wait_for_warm_cache(timeout = 30)
  debug { "waiting for cache to warm timeout=#{timeout.inspect}" }
  if @latch.wait(timeout)
    true
  else
    warn { "didn't warm cache before timeout connected=#{@zk.connected?} timeout=#{timeout.inspect}" }
    false
  end
end

Private Instance Methods

connect(host, zk_opts) click to toggle source
# File lib/zk_recipes/cache.rb, line 135
def connect(host, zk_opts)
  raise Error, "already connected" if @zk

  debug { "connecting host=#{host.inspect}" }
  ZK.new(host, **zk_opts) do |zk|
    setup_callbacks(zk)
  end
end
process_pending_updates() click to toggle source
# File lib/zk_recipes/cache.rb, line 195
def process_pending_updates
  return if @pending_updates.empty?
  debug { "processing pending updates=#{@pending_updates.size}" }
  @pending_updates.reject! do |missed_path, _|
    update_cache(missed_path)
  end
end
update_cache(path) click to toggle source

only called from ZK thread

# File lib/zk_recipes/cache.rb, line 145
def update_cache(path)
  stat = @zk.stat(path, watch: true)

  instrument_params = { path: path }

  unless stat.exists?
    value = @registered_values.fetch(path).default_value
    @cache[path] = CachedPath.new(value, stat: stat)
    debug { "no node, setting watch path=#{path}" }
    instrument_params[:value] = value
    ActiveSupport::Notifications.instrument(AS_NOTIFICATION, instrument_params)
    return true
  end

  raw_value, stat = @zk.get(path, watch: true)

  instrument_params[:latency_seconds] = Time.now - stat.mtime_t
  instrument_params[:version] = stat.version
  instrument_params[:data_length] = stat.data_length

  valid = true
  value = begin
    registered_value = @registered_values.fetch(path)
    instrument_params[:value] = registered_value.deserialize(raw_value)
  rescue => e
    error { "deserialization error path=#{path} stat=#{stat.inspect} exception=#{e.inspect} #{e.backtrace.inspect}" }
    instrument_params[:error] = e
    instrument_params[:raw_value] = raw_value
    valid = false
    registered_value.default_value
  end

  if value == USE_DEFAULT
    valid = false
    value = registered_value.default_value
  end

  @cache[path] = CachedPath.new(value, stat: stat, valid: valid)

  ActiveSupport::Notifications.instrument(AS_NOTIFICATION, instrument_params)
  debug { "update_cache path=#{path} raw_value=#{raw_value.inspect} value=#{value.inspect} stat=#{stat.inspect}" }
  true
rescue ::ZK::Exceptions::ZKError => e
  warn { "update_cache path=#{path} exception=#{e.inspect}, retrying" }
  retry
rescue ::ZK::Exceptions::KeeperException, ::Zookeeper::Exceptions::ZookeeperException => e
  error { "update_cache path=#{path} exception=#{e.inspect}" }
  false
end