class DRbQS::Worker
We can use DRbQS::Worker
to send some child processes. Note that DRbQS::Worker
is not used in DRbQS::Node
class and then is not included in main part of DRbQS
.
Constants
- READ_BYTE_SIZE
Attributes
process[R]
Public Class Methods
new(opts = {})
click to toggle source
# File lib/drbqs/worker/worker.rb, line 10 def initialize(opts = {}) @process = DRbQS::Worker::ProcessSet.new(opts[:class]) if opts[:key] opts[:key].each do |key| @process.create_process(key) end end @state = Hash.new { |h, k| h[k] = Hash.new } @task_pool = {} @task_group = Hash.new { |h, k| h[k] = Array.new } @task_num = 0 end
Public Instance Methods
add_task(task, broadcast = nil)
click to toggle source
# File lib/drbqs/worker/worker.rb, line 67 def add_task(task, broadcast = nil) if broadcast @process.all_processes.each do |proc_key| send_task(proc_key, nil, task) end else task_id = (@task_num += 1) @task_pool[task_id] = { :task => task } @task_group[task.group] << task_id task_id end end
calculating?()
click to toggle source
# File lib/drbqs/worker/worker.rb, line 23 def calculating? !@task_pool.empty? end
finish(interval_time = 1)
click to toggle source
Send signal to exit to all child processes and wait the completion with sleep interval_time
. @param [Numeric] interval_time An argument of Kernel#sleep.
# File lib/drbqs/worker/worker.rb, line 131 def finish(interval_time = 1) @process.prepare_to_exit @process.waitall(interval_time) end
group(grp, *keys)
click to toggle source
# File lib/drbqs/worker/worker.rb, line 39 def group(grp, *keys) keys.each do |key| (@state[key][:group] ||= []) << grp end end
on_error(&block)
click to toggle source
# File lib/drbqs/worker/worker.rb, line 57 def on_error(&block) @process.on_error(&block) end
on_result(&block)
click to toggle source
# File lib/drbqs/worker/worker.rb, line 45 def on_result(&block) @process.on_result do |proc_key, ary| task_id, result = ary if task_data = @task_pool.delete(task_id) task = task_data[:task] @task_group[task.group].delete(task_id) task.exec_hook(self, result) end block.call(proc_key, ary) end end
sleep(*keys)
click to toggle source
# File lib/drbqs/worker/worker.rb, line 27 def sleep(*keys) keys.each do |key| @state[key][:sleep] = true end end
step()
click to toggle source
This method sends a stored task for each process that is not calculating a task and responds signals from child processes.
# File lib/drbqs/worker/worker.rb, line 82 def step @process.waiting_processes.each do |proc_key| if @state[proc_key][:sleep] next end catch(:add) do grps = (@state[proc_key][:group] || []) + [DRbQS::Task::DEFAULT_GROUP] grps.each do |gr| @task_group[gr].each do |task_id| task_data = @task_pool[task_id] if !task_data[:calculate] send_task(proc_key, task_id, task_data[:task]) @task_pool[task_id][:calculate] = true throw :add end end end end end @process.respond_signal end
wait(task_id, interval_time)
click to toggle source
Wait finish of task task_id
with sleep interval_time
. @param [Fixnum] task_id @param [Numeric] interval_time An argument of Kernel#sleep.
# File lib/drbqs/worker/worker.rb, line 107 def wait(task_id, interval_time) if @task_pool[task_id] loop do step unless @task_pool[task_id] return true end Kernel.sleep(interval_time) end end end
waitall(interval_time)
click to toggle source
Wait finishes of all tasks with sleep interval_time
. @param [Numeric] interval_time An argument of Kernel#sleep.
# File lib/drbqs/worker/worker.rb, line 121 def waitall(interval_time) while calculating? step Kernel.sleep(interval_time) end end
wakeup(*keys)
click to toggle source
# File lib/drbqs/worker/worker.rb, line 33 def wakeup(*keys) keys.each do |key| @state[key][:sleep] = false end end
Private Instance Methods
send_task(proc_key, task_id, task)
click to toggle source
# File lib/drbqs/worker/worker.rb, line 61 def send_task(proc_key, task_id, task) dumped = [task_id] + task.simple_drb_args @process.send_task(proc_key, dumped) end