class Trident::Pool

Attributes

handler[R]
name[R]
options[R]
orphans[R]
orphans_dir[R]
size[R]
workers[R]

Public Class Methods

new(name, handler, options={}) click to toggle source
# File lib/trident/pool.rb, line 8
def initialize(name, handler, options={})
  @name = name
  @handler = handler
  @size = options.delete('size') || 2
  @options = options || {}
  @workers = Set.new
  @orphans_dir = options.delete('pids_dir') || File.join(Dir.pwd, 'trident-pools', name, 'pids')
  @orphans = load_orphans(orphans_dir)
end

Public Instance Methods

above_threshold?() click to toggle source

@return [Boolean] true iff total_workers_count > size. false otherwise

# File lib/trident/pool.rb, line 64
def above_threshold?
  size < total_workers_count
end
at_threshold?() click to toggle source

@return [Boolean] true iff total_workers_count == size. false otherwise

# File lib/trident/pool.rb, line 70
def at_threshold?
  size == total_workers_count
end
has_workers?() click to toggle source

@return [Boolean] true iff workers.size > 0. false otherwise

# File lib/trident/pool.rb, line 76
def has_workers?
  workers.size > 0
end
load_orphans(path_to_orphans_dir) click to toggle source
# File lib/trident/pool.rb, line 18
def load_orphans(path_to_orphans_dir)
  unless File.exists?(path_to_orphans_dir)
    FileUtils.mkdir_p(path_to_orphans_dir)
  end

  orphans = Set.new

  Dir.foreach(path_to_orphans_dir) do |file|
    path = File.join(path_to_orphans_dir, file)
    next if File.directory?(path)

    pid = Integer(IO.read(path))
    orphan_worker = Worker.new(pid, self)
    orphans << orphan_worker
  end

  orphans
end
start() click to toggle source
# File lib/trident/pool.rb, line 37
def start
  logger.info "<pool-#{name}> Starting pool"
  maintain_worker_count('stop_gracefully')
  logger.info "<pool-#{name}> Pool started with #{workers.size} workers"
end
stop(action='stop_gracefully') click to toggle source
# File lib/trident/pool.rb, line 43
def stop(action='stop_gracefully')
  logger.info "<pool-#{name}> Stopping pool"
  @size = 0
  maintain_worker_count(action)
  logger.info "<pool-#{name}> Pool stopped"
end
total_workers_count() click to toggle source

@return [Integer] total number of workers including orphaned workers.

# File lib/trident/pool.rb, line 82
def total_workers_count
  workers.size + orphans.size
end
update() click to toggle source
# File lib/trident/pool.rb, line 56
def update
  logger.info "<pool-#{name}> Updating pool"
  maintain_worker_count('stop_gracefully')
  logger.info "<pool-#{name}> Pool up to date"
end
wait() click to toggle source
# File lib/trident/pool.rb, line 50
def wait
  logger.info "<pool-#{name}> Waiting for pool"
  cleanup_dead_workers(true)
  logger.info "<pool-#{name}> Wait complete"
end

Private Instance Methods

cleanup_dead_workers(blocking=true) click to toggle source
# File lib/trident/pool.rb, line 137
def cleanup_dead_workers(blocking=true)
  wait_flags = blocking ? 0 : Process::WNOHANG
  workers.clone.each do |worker|
    begin
      if Process.wait(worker.pid, wait_flags)
        workers.delete(worker)
      end
    rescue Errno::EINTR
      logger.warn("<pool-#{name}> Interrupted cleaning up workers, retrying")
      retry
    rescue Errno::ECHILD
      logger.warn("<pool-#{name}> Error cleaning up workers, ignoring")
      # Calling Process.wait on a pid that was already waited on throws
      # a ECHILD, so may as well remove it from our list of workers
      workers.delete(worker)
    end
  end
end
cleanup_orphaned_workers() click to toggle source

Remove orphan workers which are either not running or which we don't have permission to signal (thereby telling us they where never a part of the pool)

# File lib/trident/pool.rb, line 119
def cleanup_orphaned_workers
  orphans.clone.each do |worker|
    begin
      # Check if the process is running
      Process.kill(0, worker.pid)
    rescue Errno::EPERM, Errno::ESRCH => e
      # If we get EPERM (Permission error) or ESRCH (No process with that pid)
      # stop tracking that worker
      logger.info("<pool-#{name}> Cleaning up orphaned worker #{worker.pid} because #{e.class.name}:#{e.message})")
      orphans.delete(worker)
      worker.destroy
    rescue => e
      # Make sure we catch any unexpected errors when signaling the process.
      logger.error("<pool-#{name}> failed cleaning up worker #{worker.pid} because #{e.class.name}:#{e.message})")
    end
  end
end
kill_worker(worker, action) click to toggle source
# File lib/trident/pool.rb, line 190
def kill_worker(worker, action)
  sig = handler.signal_for(action)
  raise "<pool-#{name}> No signal for action: #{action}" unless sig
  logger.info "<pool-#{name}> Sending signal to worker: #{worker.pid}/#{sig}/#{action}"
  Process.kill(sig, worker.pid)

  workers.delete(worker)

  logger.info "<pool-#{name}> Killed worker #{worker.pid}, worker count now at #{workers.size}"
end
kill_workers(count, action) click to toggle source
# File lib/trident/pool.rb, line 163
def kill_workers(count, action)
  logger.info "<pool-#{name}> Killing #{count} workers with #{action}"
  workers.to_a[-count, count].each do |worker|
    kill_worker(worker, action)
  end
end
maintain_worker_count(kill_action) click to toggle source
# File lib/trident/pool.rb, line 88
def maintain_worker_count(kill_action)
  cleanup_orphaned_workers
  cleanup_dead_workers(false)

  if at_threshold?
    logger.debug "<pool-#{name}> Worker count is correct."
  # If we are above the threshold and we have workers
  # then reduce the number of workers.
  elsif above_threshold? && has_workers?
    overthreshold = total_workers_count - size
    workers_to_kill = [overthreshold, workers.size].min

    logger.info("<pool-#{name}> Total workers #{workers.size} above threshold #{size} killing #{workers_to_kill}.")
    kill_workers(workers_to_kill, kill_action)
  # If we are above the threshold, and no workers
  # then we can't do anything, but lets log out a
  # message indicating this state.
  elsif above_threshold?
    logger.info("<pool-#{name}> Waiting on orphans before spawning workers.")
  # If the sum of both the workers and orphan workers is under our
  # size requirement let's spawn the number of workers required to
  # reach that size.
  else
    logger.info("<pool-#{name}> Orphans #{orphans.size}, Workers #{workers.size}")
    spawn_workers(size - total_workers_count)
  end
end
spawn_worker() click to toggle source
# File lib/trident/pool.rb, line 170
def spawn_worker
  pid = fork do
    begin
      procline "pool-#{name}-worker", "starting handler #{handler.name}"
      Trident::SignalHandler.reset_for_fork
      handler.load
      handler.start(options)
    ensure
      worker = Worker.new(Process.pid, self)
      worker.destroy
    end
  end

  worker = Worker.new(pid, self)
  worker.save

  workers << worker
  logger.info "<pool-#{name}> Spawned worker #{pid}, worker count now at #{workers.size}"
end
spawn_workers(count) click to toggle source
# File lib/trident/pool.rb, line 156
def spawn_workers(count)
  logger.info "<pool-#{name}> Spawning #{count} workers"
  count.times do
    spawn_worker
  end
end