class Procrastinate::ProcessManager

Dispatches and handles tasks and task completion. Only low level unixy manipulation here, no strategy. The only methods you should call from the outside are setup, step, wakeup and shutdown.

Constants

ChildProcess

A <completion handler, result> tuple that stores the handler to call when a child exits and the object that will handle child-master communication if desired.

Attributes

children[R]

A hash of <pid, callback> that contains callbacks for all the child processes we spawn. Once the process is complete, the callback is called in the procrastinate thread.

control_pipe[R]

This pipe is used to wait for events in the master process.

Public Class Methods

new() click to toggle source
# File lib/procrastinate/process_manager.rb, line 19
def initialize
  # This controls process manager wakeup
  @control_pipe = IO.pipe
  
  # All presently running children
  @children = {}
end

Public Instance Methods

cleanup() click to toggle source

Gets executed in child process to clean up file handles and pipes that the master holds.

# File lib/procrastinate/process_manager.rb, line 198
def cleanup
  # Children dont need the parents signal handler
  unregister_signals
  
  # The child doesn't need the control pipe for now.
  control_pipe.each { |io| io.close }
end
create_process(task, &completion_handler) click to toggle source

Spawns a process to work on task. If a block is given, it is called when the task completes. This method should only be called from a strategy inside the dispatchers thread. Otherwise it will expose threading issues.

@example

create_process(wi) { puts "Task is complete" }

@param task [Procrastinate::Task::Callable] task to be run inside the

forked process

@param completion_handler [Proc] completion handler that is called when

the process exits

@return [void]

# File lib/procrastinate/process_manager.rb, line 166
def create_process(task, &completion_handler)
  # Tasks that are interested in getting messages from their childs must
  # provide a result object that handles incoming 'result' messages.
  result = task.result
  child_process = ChildProcess.new(completion_handler, result)
  
  pid = fork do
    cleanup
          
    if result
      endpoint = lambda { |obj| child_process.send_message(obj) }
      task.run(endpoint)
    else
      task.run(nil)
    end

    exit! # this seems to be needed to avoid rspecs cleanup tasks
  end
      
  # This should never fire: New children are spawned only after we loose
  # track of the old ones because they have been successfully processed.
  fail "PID REUSE!" if children.has_key?(pid)
  
  # The spawning is done in the same thread as the reaping is done. This is
  # why no race condition to the following line exists. (or in other code,
  # for that matter.)
  children[pid] = child_process.tap { |s| s.start }
end
finalize_children() click to toggle source
# File lib/procrastinate/process_manager.rb, line 124
def finalize_children
  children.
    select { |pid, child| child.stopped? }.
    each { |pid, child| child.finalize }

  children.delete_if { |pid, child| 
    child.removable? }
end
kill_processes(signal='QUIT') click to toggle source

Kills all running processes by sending them a QUIT signal.

@param signal [String] signal to send to the forked processes.

# File lib/procrastinate/process_manager.rb, line 221
def kill_processes(signal='QUIT')
  children.each do |pid, process|
    Process.kill(signal, pid)
  end
end
process_count() click to toggle source

Returns the number of child processes that are alive at this point. Note that even if a child process is marked dead internally, it counts towards this number, since its results may not have been dispatched yet.

# File lib/procrastinate/process_manager.rb, line 65
def process_count
  children.count
end
reap_childs() click to toggle source

Calls completion handlers for all the childs that have now exited.

# File lib/procrastinate/process_manager.rb, line 135
def reap_childs
  loop do
    child_pid, status = Process.waitpid(-1, Process::WNOHANG)
    break unless child_pid

    # Trigger the completion callback
    if child=children[child_pid]
      child.sigchld_received 
      # Maybe there are messages queued for this child. If nothing is queued
      # up, the thread will hang in the select in #wait_for_event unless
      # we wake it up.
      wakeup
    end
  end
rescue Errno::ECHILD
  # Ignore: This means that no childs remain.
end
register_signals() click to toggle source

Register signals that aid in child care. NB: Because we do this globally, holding more than one dispatcher in a process will not work yet.

# File lib/procrastinate/process_manager.rb, line 74
def register_signals
  trap('CHLD') { wakeup }
end
setup() click to toggle source

Sets up resource usage for dispatcher. You must call this before dispatcher can start its work.

# File lib/procrastinate/process_manager.rb, line 30
def setup
  register_signals
end
step() click to toggle source

Performs one step in the dispatchers work. This will sleep and wait for work to be done, then wake up and reap processes that are still pending. This method will mostly sleep.

# File lib/procrastinate/process_manager.rb, line 38
def step
  # Sleep until either work arrives or we receive a SIGCHLD
  wait_for_event
  # Reap all processes that have terminated in the meantime.
  reap_childs
end
teardown() click to toggle source

Tears down the dispatcher. This frees resources that have been allocated and waits for all children to terminate.

# File lib/procrastinate/process_manager.rb, line 48
def teardown
  wait_for_all_childs
  unregister_signals
end
unregister_signals() click to toggle source

Unregister signals. Process should be as before.

# File lib/procrastinate/process_manager.rb, line 80
def unregister_signals
  trap('CHLD', 'DEFAULT')
end
wait_for_all_childs() click to toggle source

Waits for all childs to complete.

# File lib/procrastinate/process_manager.rb, line 208
def wait_for_all_childs
  # TODO Maybe signal KILL to children after some time.
  until children.empty?
    wait_for_event
    reap_childs
    finalize_children
  end
end
wait_for_event() click to toggle source

Called from the child management thread, will put that thread to sleep until someone requests it to become active again. See wakeup.

This method also depletes the child queue, reading end of processing messages from all childs and dispatching them to the children.

# File lib/procrastinate/process_manager.rb, line 90
def wait_for_event
  cp_read_end = control_pipe.first
  
  loop do # until we have input in the cp_read_end (control_pipe)
    io_map = children.inject({}) { |map, (_, child)|
      map[child.master_pipe] = child; map }

    ready, _, _ = IO.select(io_map.keys + [cp_read_end], [], [], 0.1)
    next unless ready

    # Process all messages that were sent from our childs to us.
    ready.each { |io| 
      next if io == cp_read_end

      child = io_map[io]

      fail "Assert: All IOs correspond to a child" unless child
      child.read_message 
    }
         
    # Send the tracking code for the child processes the final notifications
    # and remove them from the children hash. At this point we know that
    # no messages are waiting in the child queue.
    finalize_children
    
    if ready.include?(cp_read_end)
      # Consume the data (not important)
      cp_read_end.read_nonblock(1024)
      # And return to our caller. This is the event we've been waiting for.
      return
    end
  end
end
wakeup() click to toggle source

Wake up the dispatcher thread.

# File lib/procrastinate/process_manager.rb, line 55
def wakeup
  control_pipe.last.write '.'
# rescue IOError
  # Ignore:
end