class DRbQS::Worker::ProcessSet

Constants

WAITALL_INTERVAL_TIME

Attributes

process[R]

Public Class Methods

new(process_class = nil) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 11
def initialize(process_class = nil)
  @process_class = process_class || DRbQS::Worker::SimpleForkedProcess
  @process = {}
  @result = Queue.new
  @on_error = nil
  @on_result = nil
end

Public Instance Methods

all_processes() click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 88
def all_processes
  @process.keys
end
calculating?(key) click to toggle source

Return true if the process of key is calculating.

# File lib/drbqs/worker/worker_process_set.rb, line 72
def calculating?(key)
  @process[key] && !@process[key][:task].empty?
end
create_process(*keys) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 55
def create_process(*keys)
  keys.each do |key|
    get_process(key)
  end
end
exist?(key) click to toggle source

Return true if the process of key exists. @param [Symbol,nil] key Key of child process or nil.

# File lib/drbqs/worker/worker_process_set.rb, line 63
def exist?(key)
  @process[key]
end
has_process?() click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 67
def has_process?
  !@process.empty?
end
kill_all_processes() click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 200
def kill_all_processes
  @process.each do |key, h|
    Process.detach(h[:pid])
    Process.kill("KILL", h[:pid])
  end
  @process.clear
end
on_error(&block) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 19
def on_error(&block)
  @on_error = block
end
on_result(&block) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 23
def on_result(&block)
  @on_result = block
end
prepare_to_exit(key = nil) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 119
def prepare_to_exit(key = nil)
  if key
    if h = send_object(key, :prepare_to_exit)
      h[:exit] = true
    end
  else
    @process.each do |key, h|
      prepare_to_exit(key)
    end
  end
end
respond_signal() click to toggle source

Read IOs and respond signals from chiled processes. If there is no data from child processes then the method returns false. Otherwise, true. Types of signals are :result, :node_error, :finish_preparing_to_exit.

- :result
  Execute callback set by DRbQS::Worker::ProcessSet#on_result.
- :node_error
  Execute callback set by DRbQS::Worker::ProcessSet#on_error.
- :finish_preparing_to_exit
  Send :exit signale to the process and delete from list of child processes.
# File lib/drbqs/worker/worker_process_set.rb, line 151
def respond_signal
  num = 0
  to_be_deleted = []
  @process.each do |key, h|
    if !h[:task].empty? || h[:exit]
      data = ''
      begin
        loop do
          data << h[:in].read_nonblock(READ_BYTE_SIZE)
        end
      rescue IO::WaitReadable
      rescue
        $stderr.puts "Stored data: " + data.inspect
        raise
      end
      if !data.empty?
        num += 1
        h[:unpacker].feed_each(data) do |ary|
          response_type, response = ary
          case response_type
          when :result
            task_id, result = response
            h[:task].delete(task_id)
            if @on_result
              @on_result.call(key, [task_id, result])
            else
              $stderr.puts "The instance of DRbQS::Worker::ProcessSet can not deal with results from child processes."
            end
          when :node_error
            if @on_error
              @on_error.call(key, response)
            else
              $stderr.puts "The instance of DRbQS::Worker::ProcessSet can not deal with error from child processes."
            end
          when :finish_preparing_to_exit
            delete_process(key)
            to_be_deleted << key
          end
        end
      end
    end
  end
  to_be_deleted.each do |key|
    @process.delete(key)
  end
  to_be_deleted.clear
  num > 0
end
send_task(key, dumped_task_ary) click to toggle source

@param [Array] dumped_task_ary is [task_id, obj, method_name, args].

# File lib/drbqs/worker/worker_process_set.rb, line 109
def send_task(key, dumped_task_ary)
  if h = send_object(key, dumped_task_ary)
    if dumped_task_ary[0]
      h[:task] << dumped_task_ary[0]
    end
  else
    raise "Process #{key.inspect} does not exist."
  end
end
waitall(interval_time = nil) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 210
def waitall(interval_time = nil)
  unless @process.all? { |key, h| h[:exit] }
    return nil
  end
  t = interval_time || WAITALL_INTERVAL_TIME
  until @process.empty?
    respond_signal
    Kernel.sleep(t)
  end
  until Process.waitall == []
    Kernel.sleep(t)
  end
  true
end
waiting?(key) click to toggle source

Return true if the process key does not calculate any tasks.

# File lib/drbqs/worker/worker_process_set.rb, line 77
def waiting?(key)
  !calculating?(key)
end
waiting_processes() click to toggle source

Return keys of processes not calculating a task.

# File lib/drbqs/worker/worker_process_set.rb, line 82
def waiting_processes
  @process.keys.select do |key|
    @process[key][:task].empty?
  end
end

Private Instance Methods

delete_process(key) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 131
def delete_process(key)
  if h = get_process(key)
    Process.detach(h[:pid])
    output_to_io(h[:out], :exit)
  else
    nil
  end
end
get_process(key) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 46
def get_process(key)
  if @process[key]
    @process[key]
  else
    process_create(key)
  end
end
output_to_io(io, obj) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 92
def output_to_io(io, obj)
  io.print Serialize.dump(obj)
  io.flush
end
process_create(key) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 27
def process_create(key)
  io_r, io_w = IO.pipe('BINARY')
  io_r2, io_w2 = IO.pipe('BINARY')
  parent_pid = Process.pid
  pid = fork do
    $PROGRAM_NAME = "[worker:#{key.to_s}] for drbqs-node (PID #{parent_pid})"
    io_w.close
    io_r2.close
    worker = @process_class.new(io_r, io_w2)
    worker.start
  end
  @process[key] = {
    :pid => pid, :out => io_w, :in => io_r2,
    :unpacker => DRbQS::Worker::Serialize::Unpacker.new,
    :task => []
  }
end
send_object(key, obj) click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 98
def send_object(key, obj)
  if h = get_process(key)
    output_to_io(h[:out], obj)
    h
  else
    nil
  end
end