class AWS::Flow::ForkingExecutor
@api private
Attributes
executors[RW]
is_shutdown[RW]
max_workers[RW]
pids[RW]
Public Class Methods
new(options = {})
click to toggle source
# File lib/aws/decider/executor.rb, line 51 def initialize(options = {}) unless @log = options[:logger] @log = Utilities::LogFactory.make_logger(self) end @semaphore = Mutex.new @max_workers = options[:max_workers] || 1 @pids = [] @is_shutdown = false ForkingExecutor.executors ||= [] ForkingExecutor.executors << self end
Public Instance Methods
block_on_max_workers()
click to toggle source
@api private
# File lib/aws/decider/executor.rb, line 118 def block_on_max_workers @log.debug "block_on_max_workers workers=#{@pids.size}, max_workers=#{@max_workers}" if @pids.size >= @max_workers @log.info "Reached maximum number of workers (#{@max_workers}), waiting for some to finish" begin remove_completed_pids(true) end while @pids.size >= @max_workers end @log.debug "Available workers: #{@max_workers - @pids.size} out of #{@max_workers}" end
execute(&block)
click to toggle source
# File lib/aws/decider/executor.rb, line 63 def execute(&block) @log.debug "Currently running pids: #{@pids}" raise RejectedExecutionException if @is_shutdown block_on_max_workers @log.debug "Creating a new child process: parent=#{Process.pid}" child_pid = fork do begin @log.debug "Inside the new child process: parent=#{Process.ppid}, child_pid=#{Process.pid}" # TODO: which signals to ignore? # ignore signals in the child %w{ TERM INT HUP SIGUSR2 }.each { |signal| Signal.trap(signal, 'SIG_IGN') } @log.debug "Executing block from child process: parent=#{Process.ppid}, child_pid=#{Process.pid}" block.call @log.debug "Exiting from child process: parent=#{Process.ppid}, child_pid=#{Process.pid}" Process.exit!(0) rescue => e @log.error "child_pid=#{Process.pid} failed while executing the task: #{e}. Exiting: parent=#{Process.ppid}, child_pid=#{Process.pid}" Process.exit!(1) end end @log.debug "Created a new child process: parent=#{Process.pid}, child_pid=#{child_pid}" @pids << child_pid end
shutdown(timeout_seconds)
click to toggle source
# File lib/aws/decider/executor.rb, line 87 def shutdown(timeout_seconds) @log.debug "Shutdown requested. Currently running pids: #{@pids}" @is_shutdown = true remove_completed_pids unless @pids.empty? # If the timeout_seconds value is set to Float::INFINITY, it will wait # indefinitely till all workers finish their work. This allows us to # handle graceful shutdown of workers. if timeout_seconds == Float::INFINITY @log.info "Exit requested, waiting indefinitely till all child processes finish" remove_completed_pids true while !@pids.empty? else @log.info "Exit requested, waiting up to #{timeout_seconds} seconds for child processes to finish" # check every second for child processes to finish timeout_seconds.times do sleep 1 remove_completed_pids break if @pids.empty? end end # forcibly kill all remaining children unless @pids.empty? @log.warn "Child processes #{@pids} still running, sending KILL signal: #{@pids.join(',')}" @pids.each { |pid| Process.kill('KILL', pid) } end end end
Private Instance Methods
remove_completed_pids(block=false)
click to toggle source
Removes all child processes from @pids list that have finished. Block for at least one child to finish if block argument is set to ‘true`. @api private
# File lib/aws/decider/executor.rb, line 135 def remove_completed_pids(block=false) @log.debug "Removing completed child processes" # waitpid2 throws an Errno::ECHILD if there are no child processes, # so we don't even call it if there aren't any pids to wait on. if @pids.empty? @log.debug "No child processes. Returning." return end @log.debug "Current child processes: #{@pids}" # Non-blocking wait only returns a non-null pid if the child process has exited. # This is the only part where we block if block=true. pid, status = Process.waitpid2(-1, block ? 0 : Process::WNOHANG) loop do # If nothing to reap, then exit break unless pid # We have something to reap @log.debug "Reaping child process=#{pid}" if status.success? @log.debug "Child process #{pid} exited successfully" else @log.error "Child process #{pid} exited with non-zero status code: #{status}" end # Reap @pids.delete(pid) # waitpid2 throws an Errno::ECHILD if there are no child processes, # so we don't even call it if there aren't any pids to wait on. break if @pids.empty? @log.debug "Current child processes: #{@pids}" # Contract is to block only once if block=true. Since we have potentially already # blocked once above, we only need to do a non blocking call to waitpid2 to see if # any other process is available to reap. pid, status = Process.waitpid2(-1, Process::WNOHANG) end end