class Sidekiq::Pool::CLI

Constants

DEFAULT_FORK_WAIT
START_CTX

Public Class Methods

new() click to toggle source
Calls superclass method
# File lib/sidekiq/pool/cli.rb, line 13
def initialize
  @pool = []
  @done = false
  @system_booted = false
  super
end

Public Instance Methods

parse_config_file(filename) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 37
def parse_config_file(filename)
  config = YAML.load(ERB.new(File.read(filename)).result)
  unless config.key?(:workers)
    raise ArgumentError, 'Invalid configuration file - "workers" key must be present'
  end
  unless config[:workers].is_a?(Array)
    raise ArgumentError, 'Invalid configuration file - "workers" key must be a list'
  end
  unless config[:workers].size > 0
    raise ArgumentError, 'Invalid configuration file - Atleast one worker must be present'
  end
  config
end
run() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 26
def run
  @master_pid = $$
  write_pid

  trap_signals
  update_process_name
  start_new_pool

  wait_for_signals
end
Also aliased as: run_child
run_child()
Alias for: run
write_pid() click to toggle source
Calls superclass method
# File lib/sidekiq/pool/cli.rb, line 22
def write_pid
  super if @master_pid == ::Process.pid
end

Private Instance Methods

add_reload_marker() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 289
def add_reload_marker
  return unless options[:pidfile]
  File.write([options[:pidfile], '.reload'].join, '')
end
alive?(pid) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 323
def alive?(pid)
  ::Process.getpgid(pid)
  true
rescue Errno::ESRCH
  false
end
boot_system() click to toggle source
Calls superclass method
# File lib/sidekiq/pool/cli.rb, line 55
def boot_system
  if @system_booted
    logger.info "#{::Process.pid} - environment already started"
  else
    super
    @system_booted = true
  end
end
check_pool() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 309
def check_pool
  ::Process.waitpid2(-1, ::Process::WNOHANG)
  @pool.each do |child|
    next if alive?(child[:pid])
    handle_dead_child(child)
  end
end
cmdline_busy?(pid) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 330
def cmdline_busy?(pid)
  return unless alive?(pid)
  path = "/proc/#{pid}/cmdline"
  return unless File.exists?(path)
  cmdline = File.read(path)
  return if cmdline.empty?
  !cmdline.scan(/busy\]/).empty?
end
drop_reload_marker() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 294
def drop_reload_marker
  reload_marker = [options[:pidfile], '.reload'].join
  File.unlink(reload_marker) if File.exist?(reload_marker)
end
fork?() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 359
def fork?
  $$ != @master_pid
end
fork_child(command, index, wait_for_busy = true) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 198
def fork_child(command, index, wait_for_busy = true)
  logger.info "Adding child with args: (#{command}) in #{working_directory}, waiting for busy: #{wait_for_busy}"
  if working_directory && !Dir.exist?(working_directory)
    logger.info "Working directory: #{working_directory} does not exist unable to fork"
    return
  end

  pid = fork do
    Dir.chdir(working_directory) if working_directory
    opts = parse_options(command.split)
    options.merge!(opts)

    @self_write.close
    $0 = "sidekiq #{Sidekiq::VERSION} worker #{index} starting"
    options[:index] = index

    # reset child identity
    @@process_nonce = nil
    @@identity = nil
    options[:identity] = identity
    options[:tag] = "worker #{index}"

    run_after_fork_hooks
    run_child
  end

  wait_until_child_loaded ||= (@wait_until_child_loaded || 30)

  until cmdline_busy?(pid)
    break if (wait_until_child_loaded -= 1).zero?
    logger.info "Waiting for child #{pid} to be busy break in #{wait_until_child_loaded}"
    sleep 1
  end if wait_for_busy

  @pool << { pid: pid, index: index, command: command }
end
handle_dead_child(child) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 317
def handle_dead_child(child)
  logger.info "Child #{child[:pid]} (worker #{child[:index]}) died"
  @pool.delete(child)
  fork_child(child[:command], child[:index], false)
end
handle_master_signal(sig) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 242
def handle_master_signal(sig)
  case sig
  when 'INT', 'TERM'
    stop_children
    logger.info 'Bye!'
    exit(0)
  when 'CHLD'
    check_pool
  when 'USR1', 'TSTP'
    @done = true
    update_process_name
    signal_to_pool(sig)
  when 'USR2'
    logger.info "Sending #{sig} signal to the pool"
    signal_to_pool(sig)
  when 'HUP'
    reload
  end
end
parse_options(argv) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 87
def parse_options(argv)
  opts = {}

  @parser = OptionParser.new do |o|
    o.on '-c', '--concurrency INT', "processor threads to use" do |arg|
      opts[:concurrency] = Integer(arg)
    end

    o.on '-d', '--daemon', "Daemonize process" do |arg|
      opts[:daemon] = arg
    end

    o.on '-e', '--environment ENV', "Application environment" do |arg|
      opts[:environment] = arg
    end

    o.on '-g', '--tag TAG', "Process tag for procline" do |arg|
      opts[:tag] = arg
    end

    o.on "-q", "--queue QUEUE[,WEIGHT]", "Queues to process with optional weights" do |arg|
      queue, weight = arg.split(",")
      parse_queue opts, queue, weight
    end

    o.on '-r', '--require [PATH|DIR]', "Location of Rails application with workers or file to require" do |arg|
      opts[:require] = arg
    end

    o.on '-t', '--timeout NUM', "Shutdown timeout" do |arg|
      opts[:timeout] = Integer(arg)
    end

    o.on "-v", "--verbose", "Print more verbose output" do |arg|
      opts[:verbose] = arg
    end

    o.on '-w', '--fork-wait NUM', "seconds to wait between child forks, default #{DEFAULT_FORK_WAIT}" do |arg|
      @fork_wait = Integer(arg)
    end

    o.on '-C', '--config PATH', "path to YAML config file" do |arg|
      opts[:config_file] = arg
    end

    o.on '-L', '--logfile PATH', "path to writable logfile" do |arg|
      opts[:logfile] = arg
    end

    o.on '-P', '--pidfile PATH', "path to pidfile" do |arg|
      opts[:pidfile] = arg
    end

    o.on '-p', '--pool-config PATH', "path to pool config file" do |arg|
      @pool_config = arg
    end

    o.on '--working-directory PATH', "path to working directory" do |arg|
      unless Dir.exist?(arg)
        puts "Provided directory #{arg} does not exist"
        die(1)
      end
      @working_directory = arg
    end

    o.on '--wait-until-child-loaded NUM', "Seconds to wait until forked child is busy" do |arg|
      @wait_until_child_loaded = Integer(arg)
    end

    o.on '--suspend-before-graceful-reload', "Send USR1 singal to old pool when doing graceful reload" do |arg|
      @suspend_before_graceful_reload = arg
    end

    o.on '-V', '--version', "Print version and exit" do |arg|
      puts "Sidekiq #{Sidekiq::VERSION}"
      die(0)
    end
  end

  @parser.banner = 'sidekiq-pool [options]'
  @parser.on_tail '-h', '--help', 'Show help' do
    logger.info @parser
    die 1
  end
  @parser.parse!(argv)

  %w[config/sidekiq.yml config/sidekiq.yml.erb].each do |filename|
    opts[:config_file] ||= filename if File.exist?(filename)
  end

  %w[config/sidekiq-pool.yml config/sidekiq-pool.yml.erb].each do |filename|
    @pool_config ||= filename if File.exist?(filename)
  end

  opts
end
reexec() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 277
def reexec
  fork do
    cmd = [START_CTX[0]].concat(START_CTX[:argv])
    logger.info("Starting new master process #{cmd}")
    exec(*cmd)
  end
end
reload() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 262
def reload
  logger.info 'Gracefully reloading pool'

  # USR1 tells Sidekiq it will be shutting down in near future.
  signal_to_pool('USR1') if @suspend_before_graceful_reload

  add_reload_marker
  reexec

  stop_children
  logger.info 'Graceful reload completed'
ensure
  exit(0)
end
run_after_fork_hooks() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 285
def run_after_fork_hooks
  Sidekiq::Pool.after_fork_hooks.each(&:call)
end
signal_to_child(sig, pid) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 303
def signal_to_child(sig, pid)
  ::Process.kill(sig, pid)
rescue Errno::ESRCH
  @pool.delete(pid)
end
signal_to_pool(sig, given_pool = @pool) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 299
def signal_to_pool(sig, given_pool = @pool)
  given_pool.each { |child| signal_to_child(sig, child[:pid]) }
end
start_new_pool() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 68
def start_new_pool
  logger.info 'Starting new pool'
  @settings = parse_config_file(@pool_config)
  Dir.chdir(working_directory) if working_directory

  boot_system

  @types = @settings[:workers]
  index = -1
  @types.each do |type|
    type[:amount].times do
      index += 1
      sleep @fork_wait || DEFAULT_FORK_WAIT
      fork_child(type[:command], index)
    end
  end
  drop_reload_marker
end
stop_children(given_pool = @pool) click to toggle source
# File lib/sidekiq/pool/cli.rb, line 339
def stop_children(given_pool = @pool)
  @done = true
  logger.info 'Stopping children'
  update_process_name

  time = Time.now
  loop do
    wait_time = (Time.now - time).to_i
    if wait_time > options[:timeout] + 2
      logger.warn("Children didn't stop in #{wait_time}s, killing")
      signal_to_pool('KILL', given_pool)
    else
      signal_to_pool('TERM', given_pool)
    end
    sleep(1)
    ::Process.waitpid2(-1, ::Process::WNOHANG)
    break if given_pool.none? { |child| alive?(child[:pid]) }
  end
end
stopping?() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 363
def stopping?
  @done
end
trap_signals() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 184
def trap_signals
  @self_read, @self_write = IO.pipe

  %w(INT TERM USR1 TSTP USR2 CHLD HUP).each do |sig|
    begin
      trap sig do
        @self_write.puts(sig) unless fork?
      end
    rescue ArgumentError
      puts "Signal #{sig} not supported"
    end
  end
end
update_process_name() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 367
def update_process_name
  parts = [
    'sidekiq-pool',
    Sidekiq::Pool::VERSION,
    options[:tag]
  ]

  parts << 'stopping' if stopping?

  $0 = parts.compact.join(' ')
end
wait_for_signals() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 235
def wait_for_signals
  while readable_io = IO.select([@self_read])
    signal = readable_io.first[0].gets.strip
    handle_master_signal(signal)
  end
end
working_directory() click to toggle source
# File lib/sidekiq/pool/cli.rb, line 64
def working_directory
  @working_directory || @settings[:working_directory]
end