module Einhorn::Command
Constants
- MAX_SPINUP_INTERVAL
Unbounded exponential backoff is not a thing: we run into problems if e.g., each of our hundred workers simultaneously fail to boot for the same ephemeral reason. Instead cap backoff by some reasonable maximum, so we don't wait until the heat death of the universe to spin up new capacity.
Public Class Methods
# File lib/einhorn/command.rb, line 23 def self.cleanup(pid) unless spec = Einhorn::State.children[pid] Einhorn.log_error("Could not find any config for exited child #{pid.inspect}! This probably indicates a bug in Einhorn.") return end Einhorn::State.children.delete(pid) # Unacked worker if spec[:type] == :worker && !spec[:acked] Einhorn::State.consecutive_deaths_before_ack += 1 extra = ' before it was ACKed' else extra = nil end case type = spec[:type] when :worker Einhorn.log_info("===> Exited worker #{pid.inspect}#{extra}", :upgrade) when :state_passer Einhorn.log_debug("===> Exited state passing process #{pid.inspect}", :upgrade) else Einhorn.log_error("===> Exited process #{pid.inspect} has unrecgonized type #{type.inspect}: #{spec.inspect}", :upgrade) end end
# File lib/einhorn/command.rb, line 470 def self.cull acked = Einhorn::WorkerPool.ack_count unsignaled = Einhorn::WorkerPool.unsignaled_count target = Einhorn::WorkerPool.ack_target if Einhorn::State.upgrading && acked >= target Einhorn::State.upgrading = false Einhorn.log_info("Upgraded successfully to version #{Einhorn::State.version} (Einhorn #{Einhorn::VERSION}).", :upgrade) Einhorn.send_tagged_message(:upgrade, "Upgrade done", true) end old_workers = Einhorn::WorkerPool.old_workers Einhorn.log_debug("#{acked} acked, #{unsignaled} unsignaled, #{target} target, #{old_workers.length} old workers") if !Einhorn::State.upgrading && old_workers.length > 0 Einhorn.log_info("Killing off #{old_workers.length} old workers.", :upgrade) signal_all("USR2", old_workers) elsif Einhorn::State.upgrading && Einhorn::State.smooth_upgrade # In a smooth upgrade, kill off old workers one by one when we have # sufficiently many new workers. excess = (old_workers.length + acked) - target if excess > 0 Einhorn.log_info("Smooth upgrade: killing off #{excess} old workers.", :upgrade) signal_all("USR2", old_workers.take(excess)) else Einhorn.log_debug("Not killing old workers, as excess is #{excess}.") end end if unsignaled > target excess = Einhorn::WorkerPool.unsignaled_modern_workers_with_priority[0...(unsignaled-target)] Einhorn.log_info("Have too many workers at the current version, so killing off #{excess.length} of them.") signal_all("USR2", excess) end # Ensure all signaled workers that have outlived signal_timeout get killed. kill_expired_signaled_workers if Einhorn::State.signal_timeout end
# File lib/einhorn/command.rb, line 174 def self.decrement if Einhorn::State.config[:number] <= 1 output = "Can't decrease number of workers (already at #{Einhorn::State.config[:number]}). Run kill #{$$} if you really want to kill einhorn." $stderr.puts(output) return output end Einhorn::Event.break_loop old = Einhorn::State.config[:number] new = (Einhorn::State.config[:number] -= 1) output = "Decrementing number of workers from #{old} -> #{new}" $stderr.puts(output) output end
# File lib/einhorn/command.rb, line 207 def self.dumpable_state global_state = Einhorn::State.dumpable_state descriptor_state = Einhorn::Event.persistent_descriptors.map do |descriptor| descriptor.to_state end { :state => global_state, :persistent_descriptors => descriptor_state, } end
@param options [Hash]
@option options [Boolean] :smooth (false) Whether to perform a smooth or
fleet upgrade. In a smooth upgrade, bring up new workers and cull old workers one by one as soon as there is a replacement. In a fleet upgrade, bring up all the new workers and don't cull any old workers until they're all up.
# File lib/einhorn/command.rb, line 427 def self.full_upgrade(options={}) options = {:smooth => false}.merge(options) Einhorn::State.smooth_upgrade = options.fetch(:smooth) reload_for_upgrade end
# File lib/einhorn/command.rb, line 437 def self.full_upgrade_fleet full_upgrade(:smooth => false) end
# File lib/einhorn/command.rb, line 434 def self.full_upgrade_smooth full_upgrade(:smooth => true) end
# File lib/einhorn/command.rb, line 165 def self.increment Einhorn::Event.break_loop old = Einhorn::State.config[:number] new = (Einhorn::State.config[:number] += 1) output = "Incrementing number of workers from #{old} -> #{new}" $stderr.puts(output) output end
# File lib/einhorn/command.rb, line 508 def self.kill_expired_signaled_workers now = Time.now children = Einhorn::State.children.select do |_,c| # Only interested in USR2 signaled workers next unless c[:signaled] && c[:signaled].length > 0 next unless c[:signaled].include?('USR2') # Ignore processes that have received KILL since it can't be trapped. next if c[:signaled].include?('KILL') # Filter out those children that have not reached signal_timeout yet. next unless c[:last_signaled_at] expires_at = c[:last_signaled_at] + Einhorn::State.signal_timeout next unless now >= expires_at true end Einhorn.log_info("#{children.size} expired signaled workers found.") if children.size > 0 children.each do |pid, child| Einhorn.log_info("Child #{pid.inspect} was signaled #{(child[:last_signaled_at] - now).abs.to_i}s ago. Sending SIGKILL as it is still active after #{Einhorn::State.signal_timeout}s timeout.", :upgrade) begin Process.kill('KILL', pid) rescue Errno::ESRCH Einhorn.log_debug("Attempted to SIGKILL child #{pid.inspect} but the process does not exist.") end child[:signaled].add('KILL') child[:last_signaled_at] = Time.now end end
# File lib/einhorn/command.rb, line 628 def self.louder(log=true) Einhorn::State.verbosity -= 1 if Einhorn::State.verbosity > 0 output = "Verbosity set to #{Einhorn::State.verbosity}" Einhorn.log_info(output) if log output end
# File lib/einhorn/command.rb, line 271 def self.next_index all_indexes = Set.new(Einhorn::State.children.map { |k, st| st[:index] }) 0.upto(all_indexes.length) do |i| return i unless all_indexes.include?(i) end end
# File lib/einhorn/command.rb, line 344 def self.prepare_child_environment(index) # This is run from the child ENV['EINHORN_MASTER_PID'] = Process.ppid.to_s ENV['EINHORN_SOCK_PATH'] = Einhorn::Command::Interface.socket_path if Einhorn::State.command_socket_as_fd socket = UNIXSocket.open(Einhorn::Command::Interface.socket_path) Einhorn::TransientState.socket_handles << socket ENV['EINHORN_SOCK_FD'] = socket.fileno.to_s end ENV['EINHORN_FD_COUNT'] = Einhorn::State.bind_fds.length.to_s Einhorn::State.bind_fds.each_with_index {|fd, i| ENV["EINHORN_FD_#{i}"] = fd.to_s} ENV['EINHORN_CHILD_INDEX'] = index.to_s # EINHORN_FDS is deprecated. It was originally an attempt to # match Upstart's nominal internal support for space-separated # FD lists, but nobody uses that in practice, and it makes # finding individual FDs more difficult ENV['EINHORN_FDS'] = Einhorn::State.bind_fds.map(&:to_s).join(' ') end
# File lib/einhorn/command.rb, line 396 def self.prepare_child_process Process.setpgrp Einhorn.renice_self end
# File lib/einhorn/command.rb, line 621 def self.quieter(log=true) Einhorn::State.verbosity += 1 if Einhorn::State.verbosity < 2 output = "Verbosity set to #{Einhorn::State.verbosity}" Einhorn.log_info(output) if log output end
# File lib/einhorn/command.rb, line 10 def self.reap begin while true Einhorn.log_debug('Going to reap a child process') pid = Process.wait(-1, Process::WNOHANG) return unless pid cleanup(pid) Einhorn::Event.break_loop end rescue Errno::ECHILD end end
# File lib/einhorn/command.rb, line 86 def self.register_ack(pid) unless spec = Einhorn::State.children[pid] Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.") return end if spec[:acked] Einhorn.log_error("Pid #{pid.inspect} already ACKed; ignoring new ACK.") return end if Einhorn::State.consecutive_deaths_before_ack > 0 extra = ", breaking the streak of #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked workers dying" else extra = nil end Einhorn::State.consecutive_deaths_before_ack = 0 spec[:acked] = true Einhorn.log_info("Up to #{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} #{Einhorn::State.ack_mode[:type]} ACKs#{extra}") # Could call cull here directly instead, I believe. Einhorn::Event.break_loop end
# File lib/einhorn/command.rb, line 59 def self.register_manual_ack(pid) ack_mode = Einhorn::State.ack_mode unless ack_mode[:type] == :manual Einhorn.log_error("Received a manual ACK for #{pid.inspect}, but ack_mode is #{ack_mode.inspect}. Ignoring ACK.") return end Einhorn.log_info("Received a manual ACK from #{pid.inspect}") register_ack(pid) end
# File lib/einhorn/command.rb, line 49 def self.register_ping(pid, request_id) unless spec = Einhorn::State.children[pid] Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.") return end spec[:pinged_at] = Time.now spec[:pinged_request_id] = request_id end
# File lib/einhorn/command.rb, line 69 def self.register_timer_ack(time, pid) ack_mode = Einhorn::State.ack_mode unless ack_mode[:type] == :timer Einhorn.log_error("Received a timer ACK for #{pid.inspect}, but ack_mode is #{ack_mode.inspect}. Ignoring ACK.") return end unless Einhorn::State.children[pid] # TODO: Maybe cancel pending ACK timers upon death? Einhorn.log_debug("Worker #{pid.inspect} died before its timer ACK happened.") return end Einhorn.log_info("Worker #{pid.inspect} has been up for #{time}s, so we are considering it alive.") register_ack(pid) end
# File lib/einhorn/command.rb, line 219 def self.reload unless Einhorn::State.respawn Einhorn.log_info("Not reloading einhorn because we're exiting") return end Einhorn.log_info("Reloading einhorn master (#{Einhorn::TransientState.script_name})...", :reload) # In case there's anything lurking $stdout.flush # Spawn a child to pass the state through the pipe read, write = Einhorn::Compat.pipe fork do Einhorn::TransientState.whatami = :state_passer Einhorn::State.children[Process.pid] = {type: :state_passer} Einhorn::State.generation += 1 read.close begin write.write(YAML.dump(dumpable_state)) rescue Errno::EPIPE => e e.message << " (state worker could not write state, which likely means the parent process has died)" raise e end write.close exit(0) end write.close unless Einhorn.can_safely_reload? Einhorn.log_error("Can not initiate einhorn master reload safely, aborting", :reload) Einhorn::State.reloading_for_upgrade = false read.close return end begin Einhorn.initialize_reload_environment respawn_commandline = Einhorn.upgrade_commandline(['--with-state-fd', read.fileno.to_s]) respawn_commandline << { :close_others => false } Einhorn.log_info("About to re-exec einhorn master as #{respawn_commandline.inspect}", :reload) Einhorn::Compat.exec(*respawn_commandline) rescue SystemCallError => e Einhorn.log_error("Could not reload! Attempting to continue. Error was: #{e}", :reload) Einhorn::State.reloading_for_upgrade = false read.close end end
# File lib/einhorn/command.rb, line 441 def self.reload_for_upgrade Einhorn::State.reloading_for_upgrade = true reload end
# File lib/einhorn/command.rb, line 545 def self.replenish return unless Einhorn::State.respawn if !Einhorn::State.last_spinup replenish_immediately else replenish_gradually end end
# File lib/einhorn/command.rb, line 571 def self.replenish_gradually(max_unacked=nil) return if Einhorn::TransientState.has_outstanding_spinup_timer return unless Einhorn::WorkerPool.missing_worker_count > 0 max_unacked ||= Einhorn::State.config[:max_unacked] # default to spinning up at most NCPU workers at once unless max_unacked begin @processor_count ||= Einhorn::Compat.processor_count rescue => err Einhorn.log_error(err.inspect) @processor_count = 1 end max_unacked = @processor_count end if max_unacked <= 0 raise ArgumentError.new("max_unacked must be positive") end # Exponentially backoff automated spinup if we're just having # things die before ACKing spinup_interval = Einhorn::State.config[:seconds] * (1.5 ** Einhorn::State.consecutive_deaths_before_ack) spinup_interval = [spinup_interval, MAX_SPINUP_INTERVAL].min seconds_ago = (Time.now - Einhorn::State.last_spinup).to_f if seconds_ago > spinup_interval if trigger_spinup?(max_unacked) msg = "Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}s, so spinning up a new process." if Einhorn::State.consecutive_deaths_before_ack > 0 Einhorn.log_info("#{msg} (there have been #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked worker deaths)", :upgrade) else Einhorn.log_debug(msg) end spinup end else Einhorn.log_debug("Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}s, so not spinning up a new process.") end Einhorn::TransientState.has_outstanding_spinup_timer = true Einhorn::Event::Timer.open(spinup_interval) do Einhorn::TransientState.has_outstanding_spinup_timer = false replenish end end
# File lib/einhorn/command.rb, line 555 def self.replenish_immediately missing = Einhorn::WorkerPool.missing_worker_count if missing <= 0 Einhorn.log_error("Missing is currently #{missing.inspect}, but should always be > 0 when replenish_immediately is called. This probably indicates a bug in Einhorn.") return end Einhorn.log_info("Launching #{missing} new workers") missing.times {spinup} end
Reseed common ruby random number generators.
OpenSSL::Random uses the PID to reseed after fork, which means that if a long-lived master process over its lifetime spawns two workers with the same PID, those workers will start with the same OpenSSL seed.
Ruby >= 1.9 has a guard against this in SecureRandom, but any direct users of OpenSSL::Random will still be affected.
Ruby 1.8 didn't even reseed the default random number generator used by Kernel#rand in certain releases.
bugs.ruby-lang.org/issues/4579
# File lib/einhorn/command.rb, line 380 def self.reseed_random # reseed Kernel#rand srand # reseed OpenSSL::Random if it's loaded if defined?(OpenSSL::Random) if defined?(Random) seed = Random.new_seed else # Ruby 1.8 seed = rand end OpenSSL::Random.seed(seed.to_s) end end
# File lib/einhorn/command.rb, line 189 def self.set_workers(new) if new == Einhorn::State.config[:number] return "" end Einhorn::Event.break_loop old = Einhorn::State.config[:number] Einhorn::State.config[:number] = new output = "Altering worker count, #{old} -> #{new}. Will " if old < new output << "spin up additional workers." else output << "gracefully terminate workers." end $stderr.puts(output) output end
# File lib/einhorn/command.rb, line 401 def self.setup_parent_watch(expected_ppid) if Einhorn::State.kill_children_on_exit then begin # NB: Having the USR2 signal handler set to terminate (the default) at # this point is required. If it's set to a ruby handler, there are # race conditions that could cause the worker to leak. Einhorn::Prctl.set_pdeathsig("USR2") if Process.ppid != expected_ppid then Einhorn.log_error("Parent process died before we set pdeathsig; cowardly refusing to exec child process.") exit(1) end rescue NotImplementedError # Unsupported OS; silently continue. end end end
# File lib/einhorn/command.rb, line 110 def self.signal_all(signal, children=nil, record=true) children ||= Einhorn::WorkerPool.workers signaled = {} Einhorn.log_info("Sending #{signal} to #{children.inspect}", :upgrade) children.each do |child| unless spec = Einhorn::State.children[child] Einhorn.log_error("Trying to send #{signal} to dead child #{child.inspect}. The fact we tried this probably indicates a bug in Einhorn.", :upgrade) next end if record if spec[:signaled].include?(signal) Einhorn.log_error("Re-sending #{signal} to already-signaled child #{child.inspect}. It may be slow to spin down, or it may be swallowing #{signal}s.", :upgrade) end spec[:signaled].add(signal) spec[:last_signaled_at] = Time.now end begin Process.kill(signal, child) rescue Errno::ESRCH Einhorn.log_debug("Attempted to #{signal} child #{child.inspect} but the process does not exist", :upgrade) else signaled[child] = spec end end if Einhorn::State.signal_timeout && record Einhorn::Event::Timer.open(Einhorn::State.signal_timeout) do children.each do |child| spec = Einhorn::State.children[child] next unless spec # Process is already dead and removed by cleanup signaled_spec = signaled[child] next unless signaled_spec # We got ESRCH when trying to signal if spec[:spinup_time] != signaled_spec[:spinup_time] Einhorn.log_info("Different spinup time recorded for #{child} after #{Einhorn::State.signal_timeout}s. This probably indicates a PID rollover.", :upgrade) next end Einhorn.log_info("Child #{child.inspect} is still active after #{Einhorn::State.signal_timeout}s. Sending SIGKILL.") begin Process.kill('KILL', child) rescue Errno::ESRCH end spec[:signaled].add('KILL') end end Einhorn.log_info("Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.keys}") end end
# File lib/einhorn/command.rb, line 278 def self.spinup(cmd=nil) cmd ||= Einhorn::State.cmd index = next_index expected_ppid = Process.pid if Einhorn::State.preloaded pid = fork do Einhorn::TransientState.whatami = :worker prepare_child_process Einhorn.log_info('About to tear down Einhorn state and run einhorn_main') Einhorn::Command::Interface.uninit Einhorn::Event.close_all_for_worker Einhorn.set_argv(cmd, true) reseed_random setup_parent_watch(expected_ppid) prepare_child_environment(index) einhorn_main end else pid = fork do Einhorn::TransientState.whatami = :worker prepare_child_process Einhorn.log_info("About to exec #{cmd.inspect}") Einhorn::Command::Interface.uninit # Here's the only case where cloexec would help. Since we # have to track and manually close FDs for other cases, we # may as well just reuse close_all rather than also set # cloexec on everything. # # Note that Ruby 1.9's close_others option is useful here. Einhorn::Event.close_all_for_worker setup_parent_watch(expected_ppid) prepare_child_environment(index) Einhorn::Compat.exec(cmd[0], cmd[1..-1], :close_others => false) end end Einhorn.log_info("===> Launched #{pid} (index: #{index})", :upgrade) Einhorn::State.last_spinup = Time.now Einhorn::State.children[pid] = { :type => :worker, :version => Einhorn::State.version, :acked => false, :signaled => Set.new, :last_signaled_at => nil, :index => index, :spinup_time => Einhorn::State.last_spinup, } # Set up whatever's needed for ACKing ack_mode = Einhorn::State.ack_mode case type = ack_mode[:type] when :timer Einhorn::Event::ACKTimer.open(ack_mode[:timeout], pid) when :manual else Einhorn.log_error("Unrecognized ACK mode #{type.inspect}") end end
# File lib/einhorn/command.rb, line 540 def self.stop_respawning Einhorn::State.respawn = false Einhorn::Event.break_loop end
# File lib/einhorn/command.rb, line 635 def self.trigger_spinup?(max_unacked) unacked = Einhorn::WorkerPool.unacked_unsignaled_modern_workers.length if unacked >= max_unacked Einhorn.log_info("There are #{unacked} unacked new workers, and max_unacked is #{max_unacked}, so not spinning up a new process.") return false elsif Einhorn::State.config[:max_upgrade_additional] capacity_exceeded = (Einhorn::State.config[:number] + Einhorn::State.config[:max_upgrade_additional]) - Einhorn::WorkerPool.workers_with_state.length if capacity_exceeded < 0 Einhorn.log_info("Over worker capacity by #{capacity_exceeded.abs} during upgrade, #{Einhorn::WorkerPool.modern_workers.length} new workers of #{Einhorn::WorkerPool.workers_with_state.length} total. Waiting for old workers to exit before spinning up a process.") return false end end true end
# File lib/einhorn/command.rb, line 446 def self.upgrade_workers if Einhorn::State.upgrading Einhorn.log_info("Currently upgrading (#{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} ACKs; bumping version and starting over)...", :upgrade) else Einhorn::State.upgrading = true u_type = Einhorn::State.smooth_upgrade ? 'smooth' : 'fleet' Einhorn.log_info("Starting #{u_type} upgrade from version" + " #{Einhorn::State.version}...", :upgrade) end # Reset this, since we've just upgraded to a new universe (I'm # not positive this is the right behavior, but it's not # obviously wrong.) Einhorn::State.consecutive_deaths_before_ack = 0 Einhorn::State.last_upgraded = Time.now Einhorn::State.version += 1 if Einhorn::State.smooth_upgrade replenish_gradually else replenish_immediately end end