class DRbQS::Node::TaskClient

Attributes

group[R]
node_number[R]

Public Class Methods

new(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new) click to toggle source
# File lib/drbqs/node/task_client.rb, line 6
def initialize(node_number, queue, result, group, logger = DRbQS::Misc::LoggerDummy.new)
  @node_number = node_number
  @queue = queue
  @result = result
  @task_queue = Queue.new
  @result_queue = Queue.new
  @group = group || []
  @logger = logger
end

Public Instance Methods

add_new_task(num) click to toggle source
# File lib/drbqs/node/task_client.rb, line 61
def add_new_task(num)
  get_task_id = []
  num.times do |i|
    if ary = get_task
      task_id = ary[0]
      @logger.info("Send accept signal: node #{@node_number} caluclating #{task_id}")
      @result.write([:accept, task_id, @node_number])
      queue_task(ary)
      get_task_id << task_id
    else
      break
    end
  end
  get_task_id.empty? ? nil : get_task_id
end
dequeue_task() click to toggle source

@return [nil,Array] If @task_queue is empty then return nil.

Otherwise, an array [task_id, obj, method_name, args] is returned.
# File lib/drbqs/node/task_client.rb, line 36
def dequeue_task
  if @task_queue.empty?
    nil
  else
    @task_queue.deq
  end
end
dump_result_queue() click to toggle source
# File lib/drbqs/node/task_client.rb, line 93
def dump_result_queue
  results = []
  while !result_empty?
    task_id, res = dequeue_result
    results << res
  end
  if results.size > 0
    Marshal.dump(results)
  else
    nil
  end
end
get_task() click to toggle source
# File lib/drbqs/node/task_client.rb, line 52
def get_task
  @group.each do |grp|
    if task = get_task_by_group(grp)
      return task
    end
  end
  get_task_by_group(DRbQS::Task::DEFAULT_GROUP)
end
get_task_by_group(grp) click to toggle source
# File lib/drbqs/node/task_client.rb, line 44
def get_task_by_group(grp)
  begin
    @queue.take([grp, Fixnum, nil, Symbol, nil], 0)[1..-1]
  rescue Rinda::RequestExpiredError
    nil
  end
end
queue_result(task_id, result) click to toggle source
# File lib/drbqs/node/task_client.rb, line 89
def queue_result(task_id, result)
  @result_queue.enq([task_id, result])
end
queue_task(ary) click to toggle source

@param [Array] ary An array is [task_id, obj, method_name, args]

# File lib/drbqs/node/task_client.rb, line 30
def queue_task(ary)
  @task_queue.enq(ary)
end
result_empty?() click to toggle source
# File lib/drbqs/node/task_client.rb, line 20
def result_empty?
  @result_queue.empty?
end
send_result() click to toggle source

Return an array of task ID that is sent to the server.

# File lib/drbqs/node/task_client.rb, line 78
def send_result
  sent_task_id = []
  while !result_empty?
    task_id, result = dequeue_result
    @logger.info("Send result: #{task_id}") { result.inspect }
    @result.write([:result, task_id, @node_number, result])
    sent_task_id << task_id
  end
  sent_task_id.empty? ? nil : sent_task_id
end
task_empty?() click to toggle source
# File lib/drbqs/node/task_client.rb, line 16
def task_empty?
  @task_queue.empty?
end

Private Instance Methods

dequeue_result() click to toggle source
# File lib/drbqs/node/task_client.rb, line 24
def dequeue_result
  @result_queue.deq
end