class Procrastinate::ProcessManager
Dispatches and handles tasks and task completion. Only low level unixy manipulation here, no strategy. The only methods you should call from the outside are setup
, step
, wakeup
and shutdown.
Constants
- ChildProcess
A <completion handler, result> tuple that stores the handler to call when a child exits and the object that will handle child-master communication if desired.
Attributes
A hash of <pid, callback> that contains callbacks for all the child processes we spawn. Once the process is complete, the callback is called in the procrastinate thread.
This pipe is used to wait for events in the master process.
Public Class Methods
# File lib/procrastinate/process_manager.rb, line 19 def initialize # This controls process manager wakeup @control_pipe = IO.pipe # All presently running children @children = {} end
Public Instance Methods
Gets executed in child process to clean up file handles and pipes that the master holds.
# File lib/procrastinate/process_manager.rb, line 198 def cleanup # Children dont need the parents signal handler unregister_signals # The child doesn't need the control pipe for now. control_pipe.each { |io| io.close } end
Spawns a process to work on task
. If a block is given, it is called when the task completes. This method should only be called from a strategy inside the dispatchers thread. Otherwise it will expose threading issues.
@example
create_process(wi) { puts "Task is complete" }
@param task [Procrastinate::Task::Callable] task to be run inside the
forked process
@param completion_handler [Proc] completion handler that is called when
the process exits
@return [void]
# File lib/procrastinate/process_manager.rb, line 166 def create_process(task, &completion_handler) # Tasks that are interested in getting messages from their childs must # provide a result object that handles incoming 'result' messages. result = task.result child_process = ChildProcess.new(completion_handler, result) pid = fork do cleanup if result endpoint = lambda { |obj| child_process.send_message(obj) } task.run(endpoint) else task.run(nil) end exit! # this seems to be needed to avoid rspecs cleanup tasks end # This should never fire: New children are spawned only after we loose # track of the old ones because they have been successfully processed. fail "PID REUSE!" if children.has_key?(pid) # The spawning is done in the same thread as the reaping is done. This is # why no race condition to the following line exists. (or in other code, # for that matter.) children[pid] = child_process.tap { |s| s.start } end
# File lib/procrastinate/process_manager.rb, line 124 def finalize_children children. select { |pid, child| child.stopped? }. each { |pid, child| child.finalize } children.delete_if { |pid, child| child.removable? } end
Kills all running processes by sending them a QUIT signal.
@param signal [String] signal to send to the forked processes.
# File lib/procrastinate/process_manager.rb, line 221 def kill_processes(signal='QUIT') children.each do |pid, process| Process.kill(signal, pid) end end
Returns the number of child processes that are alive at this point. Note that even if a child process is marked dead internally, it counts towards this number, since its results may not have been dispatched yet.
# File lib/procrastinate/process_manager.rb, line 65 def process_count children.count end
Calls completion handlers for all the childs that have now exited.
# File lib/procrastinate/process_manager.rb, line 135 def reap_childs loop do child_pid, status = Process.waitpid(-1, Process::WNOHANG) break unless child_pid # Trigger the completion callback if child=children[child_pid] child.sigchld_received # Maybe there are messages queued for this child. If nothing is queued # up, the thread will hang in the select in #wait_for_event unless # we wake it up. wakeup end end rescue Errno::ECHILD # Ignore: This means that no childs remain. end
Register signals that aid in child care. NB: Because we do this globally, holding more than one dispatcher in a process will not work yet.
# File lib/procrastinate/process_manager.rb, line 74 def register_signals trap('CHLD') { wakeup } end
Sets up resource usage for dispatcher. You must call this before dispatcher can start its work.
# File lib/procrastinate/process_manager.rb, line 30 def setup register_signals end
Performs one step in the dispatchers work. This will sleep and wait for work to be done, then wake up and reap processes that are still pending. This method will mostly sleep.
# File lib/procrastinate/process_manager.rb, line 38 def step # Sleep until either work arrives or we receive a SIGCHLD wait_for_event # Reap all processes that have terminated in the meantime. reap_childs end
Tears down the dispatcher. This frees resources that have been allocated and waits for all children to terminate.
# File lib/procrastinate/process_manager.rb, line 48 def teardown wait_for_all_childs unregister_signals end
Unregister signals. Process should be as before.
# File lib/procrastinate/process_manager.rb, line 80 def unregister_signals trap('CHLD', 'DEFAULT') end
Waits for all childs to complete.
# File lib/procrastinate/process_manager.rb, line 208 def wait_for_all_childs # TODO Maybe signal KILL to children after some time. until children.empty? wait_for_event reap_childs finalize_children end end
Called from the child management thread, will put that thread to sleep until someone requests it to become active again. See wakeup
.
This method also depletes the child queue, reading end of processing messages from all childs and dispatching them to the children.
# File lib/procrastinate/process_manager.rb, line 90 def wait_for_event cp_read_end = control_pipe.first loop do # until we have input in the cp_read_end (control_pipe) io_map = children.inject({}) { |map, (_, child)| map[child.master_pipe] = child; map } ready, _, _ = IO.select(io_map.keys + [cp_read_end], [], [], 0.1) next unless ready # Process all messages that were sent from our childs to us. ready.each { |io| next if io == cp_read_end child = io_map[io] fail "Assert: All IOs correspond to a child" unless child child.read_message } # Send the tracking code for the child processes the final notifications # and remove them from the children hash. At this point we know that # no messages are waiting in the child queue. finalize_children if ready.include?(cp_read_end) # Consume the data (not important) cp_read_end.read_nonblock(1024) # And return to our caller. This is the event we've been waiting for. return end end end
Wake up the dispatcher thread.
# File lib/procrastinate/process_manager.rb, line 55 def wakeup control_pipe.last.write '.' # rescue IOError # Ignore: end