class DRbQS::Node::TaskClient
Attributes
group[R]
node_number[R]
Public Class Methods
new(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new)
click to toggle source
# File lib/drbqs/node/task_client.rb, line 6 def initialize(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new) @node_number = node_number @queue = queue @result = result @task_queue = Queue.new @result_queue = Queue.new @group = group || [] @logger = logger end
Public Instance Methods
add_new_task(num)
click to toggle source
# File lib/drbqs/node/task_client.rb, line 61 def add_new_task(num) get_task_id = [] num.times do |i| if ary = get_task task_id = ary[0] @logger.info("Send accept signal: node #{@node_number} caluclating #{task_id}") @result.write([:accept, task_id, @node_number]) queue_task(ary) get_task_id << task_id else break end end get_task_id.empty? ? nil : get_task_id end
dequeue_task()
click to toggle source
@return [nil,Array] If @task_queue is empty then return nil.
Otherwise, an array [task_id, obj, method_name, args] is returned.
# File lib/drbqs/node/task_client.rb, line 36 def dequeue_task if @task_queue.empty? nil else @task_queue.deq end end
dump_result_queue()
click to toggle source
# File lib/drbqs/node/task_client.rb, line 93 def dump_result_queue results = [] while !result_empty? task_id, res = dequeue_result results << res end if results.size > 0 Marshal.dump(results) else nil end end
get_task()
click to toggle source
# File lib/drbqs/node/task_client.rb, line 52 def get_task @group.each do |grp| if task = get_task_by_group(grp) return task end end get_task_by_group(DRbQS::Task::DEFAULT_GROUP) end
get_task_by_group(grp)
click to toggle source
# File lib/drbqs/node/task_client.rb, line 44 def get_task_by_group(grp) begin @queue.take([grp, Fixnum, nil, Symbol, nil], 0)[1..-1] rescue Rinda::RequestExpiredError nil end end
queue_result(task_id, result)
click to toggle source
# File lib/drbqs/node/task_client.rb, line 89 def queue_result(task_id, result) @result_queue.enq([task_id, result]) end
queue_task(ary)
click to toggle source
@param [Array] ary An array is [task_id, obj, method_name, args]
# File lib/drbqs/node/task_client.rb, line 30 def queue_task(ary) @task_queue.enq(ary) end
result_empty?()
click to toggle source
# File lib/drbqs/node/task_client.rb, line 20 def result_empty? @result_queue.empty? end
send_result()
click to toggle source
Return an array of task ID that is sent to the server.
# File lib/drbqs/node/task_client.rb, line 78 def send_result sent_task_id = [] while !result_empty? task_id, result = dequeue_result @logger.info("Send result: #{task_id}") { result.inspect } @result.write([:result, task_id, @node_number, result]) sent_task_id << task_id end sent_task_id.empty? ? nil : sent_task_id end
task_empty?()
click to toggle source
# File lib/drbqs/node/task_client.rb, line 16 def task_empty? @task_queue.empty? end
Private Instance Methods
dequeue_result()
click to toggle source
# File lib/drbqs/node/task_client.rb, line 24 def dequeue_result @result_queue.deq end