class Etcenv::Watcher

Constants

WATCH_TIMEOUT

Attributes

env[R]
verbose[R]

Public Class Methods

new(env, verbose: false) click to toggle source
# File lib/etcenv/watcher.rb, line 7
def initialize(env, verbose: false)
  @env = env
  @verbose = verbose
  @indices = {}
  @lock = Mutex.new
end

Public Instance Methods

auto_reload_loop() { |env| ... } click to toggle source
# File lib/etcenv/watcher.rb, line 32
def auto_reload_loop
  retries = 0
  loop do
    begin
      watch
      $stderr.puts "[watcher] reloading env #{env.root_key}" if verbose
      env.load
      yield env if block_given?
    rescue => e
      retries += 1
      interval = (2**retries) * 0.1

      $stderr.puts "[watcher][error] Failed to reload env #{env.root_key}: #{e.inspect}"
      $stderr.puts "\t#{e.backtrace.join("\n\t")}"
      $stderr.puts "[watcher][error] RETRYING reload #{env.root_key} in #{'%.2f' % interval} sec"
      sleep interval
    else
      retries = 0
    end
  end
end
etcd() click to toggle source
# File lib/etcenv/watcher.rb, line 16
def etcd
  env.etcd
end
watch() click to toggle source
# File lib/etcenv/watcher.rb, line 20
def watch
  ch = Queue.new
  threads = env.keys.map do |key|
    Thread.new(ch, key, env.cluster_index, &method(:watch_thread)).tap do |th|
      th.abort_on_exception = true
    end
  end
  report = ch.pop
  threads.each(&:kill)
  report
end

Private Instance Methods

try_watch(key, index) click to toggle source
# File lib/etcenv/watcher.rb, line 70
def try_watch(key, index)
  $stderr.puts "[watcher] waiting for change on #{key} (index: #{index.succ})" if verbose
  index = [@indices[key], index].compact.max
  index += 1 if index
  response = etcd.watch(key, recursive: true, index: index, timeout: WATCH_TIMEOUT)
  @lock.synchronize do
    # Record modified_index in watcher itself; Because the latest index may be hidden in normal response
    # e.g. unlisted keys, removed keys
    @indices[key] = response.node.modified_index
  end
  $stderr.puts "[watcher] dir #{key} has updated" if verbose
  ch << key
  return true
rescue Etcd::EventIndexCleared => e
  $stderr.puts "[watcher][warn] #{e.inspect} on key #{key.inspect}, trying to get X-Etcd-Index" if verbose
  @lock.synchronize do
    @indices[key] = etcd.get(key).etcd_index
  end
  $stderr.puts "[watcher][warn] Updated #{key.inspect} index to #{@indices[key]}" if verbose
  return nil
rescue Net::ReadTimeout
  $stderr.puts "[watcher] #{e.inspect} on key #{key.inspect}" if verbose
  return nil
end
watch_thread(ch, key, index) click to toggle source
# File lib/etcenv/watcher.rb, line 56
def watch_thread(ch, key, index)
  tries = 0
  loop do
    if try_watch(key, index)
      ch << key
      break
    end
    interval = (2 ** tries) * 0.1
    $stderr.puts "[watcher] RETRYING; #{key.inspect} watch will resume after #{'%.2f' % interval} sec"
    sleep interval
    tries += 1
  end
end