class DRbQS::Server::Queue

Attributes

calculating[R]
history[R]

Public Class Methods

new(queue, result, logger = DRbQS::Misc::LoggerDummy.new) click to toggle source
# File lib/drbqs/server/queue.rb, line 9
def initialize(queue, result, logger = DRbQS::Misc::LoggerDummy.new)
  @queue = queue
  @result = result
  @task_id = 0
  @cache = {}
  @calculating = Hash.new { |hash, key| hash[key] = Array.new }
  @history = DRbQS::Server::TaskHistory.new
  @logger = logger
end

Public Instance Methods

add(task) click to toggle source

@return [Fixnum] task ID (for debug)

# File lib/drbqs/server/queue.rb, line 25
def add(task)
  @task_id += 1
  @logger.info("New task: #{@task_id}")
  @cache[@task_id] = task
  queue_task(@task_id)
  @history.set(@task_id, :add, task.note)
  @task_id
end
all_logs() click to toggle source
# File lib/drbqs/server/queue.rb, line 141
def all_logs
  @history.log_strings
end
calculating_nodes() click to toggle source
# File lib/drbqs/server/queue.rb, line 145
def calculating_nodes
  nodes = []
  @calculating.each do |node_id, tasks|
    if tasks.size > 0
      nodes << node_id
    end
  end
  nodes.sort!
end
calculating_task_message() click to toggle source

Return a hash of which keys are node ID number and values are an array of pairs of task ID number and its message.

# File lib/drbqs/server/queue.rb, line 105
def calculating_task_message
  mes = {}
  @calculating.each do |node_id, task_id_ary|
    mes[node_id] = task_id_ary.map do |n|
      [n, @history.events(n)[0][2]]
    end
  end
  mes
end
calculating_task_number() click to toggle source
# File lib/drbqs/server/queue.rb, line 115
def calculating_task_number
  @calculating.inject(0) { |s, key_val| s + key_val[1].size }
end
empty?() click to toggle source

If queue is empty, that is, there is no tasks to calculate next, this method returns true. Otherwise, false. Even if there are calculating tasks, the method can return true.

# File lib/drbqs/server/queue.rb, line 131
def empty?
  stocked_task_number == 0
end
exec_task_hook(main_server, task_id, result) click to toggle source
# File lib/drbqs/server/queue.rb, line 73
def exec_task_hook(main_server, task_id, result)
  if task = @cache.delete(task_id)
    if task.exec_hook(main_server, result)
      @history.set(task_id, :hook)
    end
    true
  else
    @logger.error("Task #{task_id} is not cached.")
    false
  end
end
finished?() click to toggle source

If there are no tasks in queue and calculating, return true. Otherwise, false.

# File lib/drbqs/server/queue.rb, line 137
def finished?
  @cache.size == 0
end
finished_task_number() click to toggle source
# File lib/drbqs/server/queue.rb, line 123
def finished_task_number
  @history.finished_task_number
end
get_accept_signal() click to toggle source
# File lib/drbqs/server/queue.rb, line 34
def get_accept_signal
  count = 0
  begin
    loop do
      sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0)
      count += 1
      @calculating[node_id] << task_id
      @history.set(task_id, :calculate, node_id)
      @logger.info("Accept: task #{task_id} by node #{node_id}.")
    end
  rescue Rinda::RequestExpiredError
    @logger.debug("Accept: #{count} signals.")
  end
  count
end
get_result(main_server) click to toggle source
# File lib/drbqs/server/queue.rb, line 85
def get_result(main_server)
  count = 0
  begin
    loop do
      get_accept_signal
      sym, task_id, node_id, result = @result.take([:result, Fixnum, Fixnum, nil], 0)
      count += 1
      @history.set(task_id, :result, node_id)
      @logger.info("Get: result of #{task_id} from node #{node_id}.")
      delete_task_id(node_id, task_id)
      exec_task_hook(main_server, task_id, result)
    end
  rescue Rinda::RequestExpiredError
    @logger.debug("Get: #{count} results.")
  end
  count
end
requeue_for_deleted_node_id(deleted) click to toggle source
# File lib/drbqs/server/queue.rb, line 50
def requeue_for_deleted_node_id(deleted)
  deleted.each do |node_id|
    if task_id_ary = @calculating[node_id]
      task_id_ary.each do |task_id|
        queue_task(task_id)
        @history.set(task_id, :requeue)
        @logger.info("Requeue: task #{task_id}.")
      end
      @calculating.delete(node_id)
    end
  end
end
stocked_task_number() click to toggle source
# File lib/drbqs/server/queue.rb, line 119
def stocked_task_number
  @cache.size - calculating_task_number
end

Private Instance Methods

delete_task_id(node_id, task_id) click to toggle source
# File lib/drbqs/server/queue.rb, line 63
def delete_task_id(node_id, task_id)
  unless @calculating[node_id].delete(task_id)
    @logger.error("Task #{task_id} does not exist in list of calculating tasks.")
  end
  if ary = @calculating.find { |k, v| v.include?(task_id) }
    @logger.error("Node #{ary[0]} is calculating task #{task_id}, too.")
  end
end
queue_task(task_id) click to toggle source
# File lib/drbqs/server/queue.rb, line 19
def queue_task(task_id)
  @queue.write(@cache[task_id].drb_args(task_id))
end