class DRbQS::Server::Queue
Attributes
calculating[R]
history[R]
Public Class Methods
new(queue, result, logger = DRbQS::Misc::LoggerDummy.new)
click to toggle source
# File lib/drbqs/server/queue.rb, line 9 def initialize(queue, result, logger = DRbQS::Misc::LoggerDummy.new) @queue = queue @result = result @task_id = 0 @cache = {} @calculating = Hash.new { |hash, key| hash[key] = Array.new } @history = DRbQS::Server::TaskHistory.new @logger = logger end
Public Instance Methods
add(task)
click to toggle source
@return [Fixnum] task ID (for debug)
# File lib/drbqs/server/queue.rb, line 25 def add(task) @task_id += 1 @logger.info("New task: #{@task_id}") @cache[@task_id] = task queue_task(@task_id) @history.set(@task_id, :add, task.note) @task_id end
all_logs()
click to toggle source
# File lib/drbqs/server/queue.rb, line 141 def all_logs @history.log_strings end
calculating_nodes()
click to toggle source
# File lib/drbqs/server/queue.rb, line 145 def calculating_nodes nodes = [] @calculating.each do |node_id, tasks| if tasks.size > 0 nodes << node_id end end nodes.sort! end
calculating_task_message()
click to toggle source
Return a hash of which keys are node ID number and values are an array of pairs of task ID number and its message.
# File lib/drbqs/server/queue.rb, line 105 def calculating_task_message mes = {} @calculating.each do |node_id, task_id_ary| mes[node_id] = task_id_ary.map do |n| [n, @history.events(n)[0][2]] end end mes end
calculating_task_number()
click to toggle source
# File lib/drbqs/server/queue.rb, line 115 def calculating_task_number @calculating.inject(0) { |s, key_val| s + key_val[1].size } end
empty?()
click to toggle source
If queue is empty, that is, there is no tasks to calculate next, this method returns true. Otherwise, false. Even if there are calculating tasks, the method can return true.
# File lib/drbqs/server/queue.rb, line 131 def empty? stocked_task_number == 0 end
exec_task_hook(main_server, task_id, result)
click to toggle source
# File lib/drbqs/server/queue.rb, line 73 def exec_task_hook(main_server, task_id, result) if task = @cache.delete(task_id) if task.exec_hook(main_server, result) @history.set(task_id, :hook) end true else @logger.error("Task #{task_id} is not cached.") false end end
finished?()
click to toggle source
If there are no tasks in queue and calculating, return true. Otherwise, false.
# File lib/drbqs/server/queue.rb, line 137 def finished? @cache.size == 0 end
finished_task_number()
click to toggle source
# File lib/drbqs/server/queue.rb, line 123 def finished_task_number @history.finished_task_number end
get_accept_signal()
click to toggle source
# File lib/drbqs/server/queue.rb, line 34 def get_accept_signal count = 0 begin loop do sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0) count += 1 @calculating[node_id] << task_id @history.set(task_id, :calculate, node_id) @logger.info("Accept: task #{task_id} by node #{node_id}.") end rescue Rinda::RequestExpiredError @logger.debug("Accept: #{count} signals.") end count end
get_result(main_server)
click to toggle source
# File lib/drbqs/server/queue.rb, line 85 def get_result(main_server) count = 0 begin loop do get_accept_signal sym, task_id, node_id, result = @result.take([:result, Fixnum, Fixnum, nil], 0) count += 1 @history.set(task_id, :result, node_id) @logger.info("Get: result of #{task_id} from node #{node_id}.") delete_task_id(node_id, task_id) exec_task_hook(main_server, task_id, result) end rescue Rinda::RequestExpiredError @logger.debug("Get: #{count} results.") end count end
requeue_for_deleted_node_id(deleted)
click to toggle source
# File lib/drbqs/server/queue.rb, line 50 def requeue_for_deleted_node_id(deleted) deleted.each do |node_id| if task_id_ary = @calculating[node_id] task_id_ary.each do |task_id| queue_task(task_id) @history.set(task_id, :requeue) @logger.info("Requeue: task #{task_id}.") end @calculating.delete(node_id) end end end
stocked_task_number()
click to toggle source
# File lib/drbqs/server/queue.rb, line 119 def stocked_task_number @cache.size - calculating_task_number end
Private Instance Methods
delete_task_id(node_id, task_id)
click to toggle source
# File lib/drbqs/server/queue.rb, line 63 def delete_task_id(node_id, task_id) unless @calculating[node_id].delete(task_id) @logger.error("Task #{task_id} does not exist in list of calculating tasks.") end if ary = @calculating.find { |k, v| v.include?(task_id) } @logger.error("Node #{ary[0]} is calculating task #{task_id}, too.") end end
queue_task(task_id)
click to toggle source
# File lib/drbqs/server/queue.rb, line 19 def queue_task(task_id) @queue.write(@cache[task_id].drb_args(task_id)) end