class ZkExec::Executor
Public Class Methods
new(options)
click to toggle source
# File lib/zkexec/executor.rb, line 9 def initialize(options) @cmd = options[:exec] @cluster = options[:cluster] @health = options[:health] @health_interval = options[:health_interval] @health_delay = options[:health_delay] @mirrors = options[:mirrors] @alert = options[:alert] @lock_name = options[:lock] log "connecting to #{@cluster}" @zk = ZK.new(@cluster, :thread => :per_callback) raise "timeout connecting to #{@cluster}" unless @zk.connected? log "connected" # re-establish watches @on_connected ||= @zk.on_connected do @mirrors.each do |(local, remote)| watch(remote) end end @restart_lock = @lock_name && @zk.exclusive_locker(@lock_name) @local_lock = Mutex.new @mirrors.each do |(local, remote)| log "registering callback on #{remote}" @zk.register(remote) do |event| if event.changed? log "#{remote} changed" copy(local, remote) kill_to_refork else watch(remote) end end watch(remote) end end
Public Instance Methods
execute()
click to toggle source
# File lib/zkexec/executor.rb, line 164 def execute @should_refork = true while @should_refork start_health_thread log "forking #{@cmd}" @child = fork { exec @cmd } log "forked #{@child}" @should_refork = false wait @child end if $?.exitstatus != 0 alert raise "command failed" end end
run()
click to toggle source
# File lib/zkexec/executor.rb, line 160 def run Thread.new { execute } end
Private Instance Methods
alert()
click to toggle source
# File lib/zkexec/executor.rb, line 135 def alert if @alert fork { exec(@alert) } end end
copy(local, remote)
click to toggle source
# File lib/zkexec/executor.rb, line 50 def copy(local, remote) data = watch(remote) File.open(local, "w") {|f| f.print(data) } rescue ZK::Exceptions::NoNode => e raise "node not found in #{e.message}" end
kill_to_refork()
click to toggle source
# File lib/zkexec/executor.rb, line 93 def kill_to_refork if @child with_restart_lock do if @health_checks Thread.kill(@health_checks) @health_checks = nil end @should_refork = true child = @child @child = nil log "killing #{child}" Process.kill("TERM", child) checks_started = Time.now while pid_exists?(child) && Time.now - checks_started < 30 sleep 1 end if pid_exists?(child) log "force killing #{child}" Process.kill("KILL", child) end log "#{child} terminated" # This intentionally infinite loops on failure, so that we don't propagate bad config health_checks_started = Time.now loop do log "waiting for health check success" if system(@health) log "health checks succeeding" return end if Time.now - health_checks_started > @health_delay log "health checks failing" alert end sleep @health_interval end end end end
pid_exists?(pid)
click to toggle source
# File lib/zkexec/executor.rb, line 83 def pid_exists?(pid) begin Process.getpgid(pid) true rescue Errno::ESRCH false end end
start_health_thread()
click to toggle source
# File lib/zkexec/executor.rb, line 141 def start_health_thread @health_checks ||= @health && Thread.new { sleep @health_delay loop do log "health checking via: #{@health}" pid = fork { exec(@health) } wait pid if $?.exitstatus == 0 log "successful health check" else log "failed health check, alerting" alert end sleep @health_interval end } end
watch(remote)
click to toggle source
# File lib/zkexec/executor.rb, line 58 def watch(remote) data, stat = *@zk.get(remote, :watch => true) return data rescue ZK::Exceptions::NoNode => e raise "node not found in #{e.message}" end
with_restart_lock() { || ... }
click to toggle source
# File lib/zkexec/executor.rb, line 66 def with_restart_lock if @restart_lock begin log "waiting on lock: #{@lock_name}" @restart_lock.lock(:wait => true) log "acquired lock: #{@lock_name}" yield ensure @restart_lock.unlock log "released lock: #{@lock_name}" end else yield end end