class Toiler::CLI
Command line client interface
Attributes
supervisor[RW]
Public Instance Methods
run(args)
click to toggle source
# File lib/toiler/cli.rb, line 25 def run(args) @self_read, @self_write = IO.pipe trap_signals options = Utils::ArgumentParser.parse(args) Utils::EnvironmentLoader.load(options) daemonize write_pid load_concurrent start_supervisor handle_stop end
Private Instance Methods
daemonize()
click to toggle source
# File lib/toiler/cli.rb, line 134 def daemonize return unless Toiler.options[:daemon] fail 'Logfile required when daemonizing' unless Toiler.options[:logfile] files_to_reopen = [] ObjectSpace.each_object(File) do |file| files_to_reopen << file unless file.closed? end Process.daemon(true, true) reopen_files(files_to_reopen) reopen_std end
handle_signal(signal)
click to toggle source
# File lib/toiler/cli.rb, line 115 def handle_signal(signal) case signal when 'QUIT' print_stacktraces print_status when 'INT', 'TERM' fail WaitShutdown, 60 when 'ABRT' fail WaitShutdown, Toiler.options[:shutdown_timeout] * 60 end end
handle_stop()
click to toggle source
# File lib/toiler/cli.rb, line 41 def handle_stop while (readable_io = IO.select([@self_read])) handle_signal(readable_io.first[0].gets.strip) end rescue WaitShutdown => shutdown_error Toiler.logger.info "Received Interrupt, Waiting up to #{shutdown_error.wait} seconds for actors to finish..." success = supervisor.ask(:terminate!).wait(shutdown_error.wait) if success Toiler.logger.info 'Supervisor successfully terminated' else Toiler.logger.info 'Timeout waiting for Supervisor to terminate' end ensure exit 0 end
load_concurrent()
click to toggle source
# File lib/toiler/cli.rb, line 127 def load_concurrent require 'concurrent-edge' Concurrent.global_logger = lambda do |level, progname, msg = nil, &block| Toiler.logger.log(level, msg, progname, &block) end if Toiler.logger end
print_stacktraces()
click to toggle source
# File lib/toiler/cli.rb, line 81 def print_stacktraces return unless Toiler.logger Toiler.logger.info "-------------------" Toiler.logger.info "Received QUIT, dumping threads:" Thread.list.each do |t| id = t.object_id Toiler.logger.info "[thread:#{id}] #{t.backtrace.join("\n[thread:#{id}] ")}" end Toiler.logger.info '-------------------' end
print_status()
click to toggle source
# File lib/toiler/cli.rb, line 92 def print_status return unless Toiler.logger Toiler.logger.info "-------------------" Toiler.logger.info "Received QUIT, dumping status:" Toiler.queues.each do |queue| fetcher = Toiler.fetcher(queue).send(:core).send(:context) processor_pool = Toiler.processor_pool(queue).send(:core).send(:context) processors = processor_pool.instance_variable_get(:@workers).collect{|w| w.send(:core).send(:context)} busy_processors = processors.count{|pr| pr.executing?} message = "Status for [queue:#{queue}]:" message += "\n[fetcher:#{fetcher.name}] [executing:#{fetcher.executing?}] [waiting_messages:#{fetcher.waiting_messages}] [free_processors:#{fetcher.free_processors}]" message += "\n[processor_pool:#{processor_pool.name}] [workers:#{processors.count}] [busy:#{busy_processors}]" processors.each do |processor| thread = processor.thread thread_id = thread.nil? ? "nil" : thread.object_id message += "\n[processor:#{processor.name}] [executing:#{processor.executing?}] [thread:#{thread_id}]" message += " Stack:\n" + thread.backtrace.join("\n\t") unless thread.nil? end Toiler.logger.info message end Toiler.logger.info '-------------------' end
reopen_files(files_to_reopen)
click to toggle source
# File lib/toiler/cli.rb, line 149 def reopen_files(files_to_reopen) files_to_reopen.each do |file| begin file.reopen file.path, 'a+' file.sync = true rescue StandardError puts "Failed to reopen file #{file}" end end end
reopen_std()
click to toggle source
# File lib/toiler/cli.rb, line 160 def reopen_std [$stdout, $stderr].each do |io| File.open(Toiler.options[:logfile], 'ab') do |f| io.reopen(f) end io.sync = true end $stdin.reopen('/dev/null') end
shutdown_pools()
click to toggle source
# File lib/toiler/cli.rb, line 57 def shutdown_pools Concurrent.global_fast_executor.shutdown Concurrent.global_io_executor.shutdown return if Concurrent.global_io_executor.wait_for_termination(60) Concurrent.global_io_executor.kill end
start_supervisor()
click to toggle source
# File lib/toiler/cli.rb, line 64 def start_supervisor require 'toiler/actor/supervisor' @supervisor = Actor::Supervisor.spawn! :supervisor end
trap_signals()
click to toggle source
# File lib/toiler/cli.rb, line 69 def trap_signals %w(INT TERM QUIT USR1 USR2 TTIN ABRT).each do |sig| begin trap sig do @self_write.puts(sig) end rescue ArgumentError puts "System does not support signal #{sig}" end end end
write_pid()
click to toggle source
# File lib/toiler/cli.rb, line 170 def write_pid file = Toiler.options[:pidfile] File.write file, Process.pid if file end