class DRbQS::Worker::ProcessSet
Constants
- WAITALL_INTERVAL_TIME
Attributes
process[R]
Public Class Methods
new(process_class = nil)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 11 def initialize(process_class = nil) @process_class = process_class || DRbQS::Worker::SimpleForkedProcess @process = {} @result = Queue.new @on_error = nil @on_result = nil end
Public Instance Methods
all_processes()
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 88 def all_processes @process.keys end
calculating?(key)
click to toggle source
Return true if the process of key
is calculating.
# File lib/drbqs/worker/worker_process_set.rb, line 72 def calculating?(key) @process[key] && !@process[key][:task].empty? end
create_process(*keys)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 55 def create_process(*keys) keys.each do |key| get_process(key) end end
exist?(key)
click to toggle source
Return true if the process of key
exists. @param [Symbol,nil] key Key of child process or nil.
# File lib/drbqs/worker/worker_process_set.rb, line 63 def exist?(key) @process[key] end
has_process?()
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 67 def has_process? !@process.empty? end
kill_all_processes()
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 200 def kill_all_processes @process.each do |key, h| Process.detach(h[:pid]) Process.kill("KILL", h[:pid]) end @process.clear end
on_error(&block)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 19 def on_error(&block) @on_error = block end
on_result(&block)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 23 def on_result(&block) @on_result = block end
prepare_to_exit(key = nil)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 119 def prepare_to_exit(key = nil) if key if h = send_object(key, :prepare_to_exit) h[:exit] = true end else @process.each do |key, h| prepare_to_exit(key) end end end
respond_signal()
click to toggle source
Read IOs and respond signals from chiled processes. If there is no data from child processes then the method returns false. Otherwise, true. Types of signals are :result, :node_error, :finish_preparing_to_exit.
- :result Execute callback set by DRbQS::Worker::ProcessSet#on_result. - :node_error Execute callback set by DRbQS::Worker::ProcessSet#on_error. - :finish_preparing_to_exit Send :exit signale to the process and delete from list of child processes.
# File lib/drbqs/worker/worker_process_set.rb, line 151 def respond_signal num = 0 to_be_deleted = [] @process.each do |key, h| if !h[:task].empty? || h[:exit] data = '' begin loop do data << h[:in].read_nonblock(READ_BYTE_SIZE) end rescue IO::WaitReadable rescue $stderr.puts "Stored data: " + data.inspect raise end if !data.empty? num += 1 h[:unpacker].feed_each(data) do |ary| response_type, response = ary case response_type when :result task_id, result = response h[:task].delete(task_id) if @on_result @on_result.call(key, [task_id, result]) else $stderr.puts "The instance of DRbQS::Worker::ProcessSet can not deal with results from child processes." end when :node_error if @on_error @on_error.call(key, response) else $stderr.puts "The instance of DRbQS::Worker::ProcessSet can not deal with error from child processes." end when :finish_preparing_to_exit delete_process(key) to_be_deleted << key end end end end end to_be_deleted.each do |key| @process.delete(key) end to_be_deleted.clear num > 0 end
send_task(key, dumped_task_ary)
click to toggle source
@param [Array] dumped_task_ary is [task_id, obj, method_name, args].
# File lib/drbqs/worker/worker_process_set.rb, line 109 def send_task(key, dumped_task_ary) if h = send_object(key, dumped_task_ary) if dumped_task_ary[0] h[:task] << dumped_task_ary[0] end else raise "Process #{key.inspect} does not exist." end end
waitall(interval_time = nil)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 210 def waitall(interval_time = nil) unless @process.all? { |key, h| h[:exit] } return nil end t = interval_time || WAITALL_INTERVAL_TIME until @process.empty? respond_signal Kernel.sleep(t) end until Process.waitall == [] Kernel.sleep(t) end true end
waiting?(key)
click to toggle source
Return true if the process key
does not calculate any tasks.
# File lib/drbqs/worker/worker_process_set.rb, line 77 def waiting?(key) !calculating?(key) end
waiting_processes()
click to toggle source
Return keys of processes not calculating a task.
# File lib/drbqs/worker/worker_process_set.rb, line 82 def waiting_processes @process.keys.select do |key| @process[key][:task].empty? end end
Private Instance Methods
delete_process(key)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 131 def delete_process(key) if h = get_process(key) Process.detach(h[:pid]) output_to_io(h[:out], :exit) else nil end end
get_process(key)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 46 def get_process(key) if @process[key] @process[key] else process_create(key) end end
output_to_io(io, obj)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 92 def output_to_io(io, obj) io.print Serialize.dump(obj) io.flush end
process_create(key)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 27 def process_create(key) io_r, io_w = IO.pipe('BINARY') io_r2, io_w2 = IO.pipe('BINARY') parent_pid = Process.pid pid = fork do $PROGRAM_NAME = "[worker:#{key.to_s}] for drbqs-node (PID #{parent_pid})" io_w.close io_r2.close worker = @process_class.new(io_r, io_w2) worker.start end @process[key] = { :pid => pid, :out => io_w, :in => io_r2, :unpacker => DRbQS::Worker::Serialize::Unpacker.new, :task => [] } end
send_object(key, obj)
click to toggle source
# File lib/drbqs/worker/worker_process_set.rb, line 98 def send_object(key, obj) if h = get_process(key) output_to_io(h[:out], obj) h else nil end end