class DRbQS::Server::Message

Public Class Methods

new(message, logger = DRbQS::Misc::LoggerDummy.new) click to toggle source
# File lib/drbqs/server/message.rb, line 8
def initialize(message, logger = DRbQS::Misc::LoggerDummy.new)
  @message = message
  @node_list = DRbQS::Server::NodeList.new
  @logger = logger
end

Public Instance Methods

check_connection() click to toggle source
# File lib/drbqs/server/message.rb, line 77
def check_connection
  deleted = @node_list.delete_not_alive
  @logger.info("IDs of deleted nodes") { deleted } if deleted.size > 0 && @logger
  @node_list.each do |id, str|
    @message.write([id, :alive_p])
  end
  @node_list.set_check_connection
  deleted
end
each_node_history(&block) click to toggle source
# File lib/drbqs/server/message.rb, line 124
def each_node_history(&block)
  @node_list.history.each(&block)
end
get_all_nodes() click to toggle source
# File lib/drbqs/server/message.rb, line 152
def get_all_nodes
  @node_list.list.dup
end
get_message() click to toggle source

One of the following messages is returned.

  • :exit_server, nil
  • :request_status, nil
  • :request_history, nil
  • :exit_after_task, node_id
  • :wake_node, node_id
  • :sleep_node, node_id
  • :node_error, [node_id, error_message]
  • :initialize, Array
  • :finalize, Array

@return [Array] Message array

# File lib/drbqs/server/message.rb, line 27
def get_message
  begin
    mes = @message.take([:server, Symbol, nil], 0)
    manage_message(*mes[1..2])
  rescue Rinda::RequestExpiredError
    nil
  end
end
node_exist?(node_id) click to toggle source
# File lib/drbqs/server/message.rb, line 160
def node_exist?(node_id)
  @node_list.exist?(node_id)
end
node_not_exist?() click to toggle source
# File lib/drbqs/server/message.rb, line 156
def node_not_exist?
  @node_list.empty?
end
send_exit() click to toggle source

Send all nodes a message to exit.

# File lib/drbqs/server/message.rb, line 102
def send_exit
  send_signal_to_all_nodes(:exit)
end
send_exit_after_task(node_id) click to toggle source
# File lib/drbqs/server/message.rb, line 119
def send_exit_after_task(node_id)
  @node_list.add_to_preparation_to_exit(node_id)
  send_signal(node_id, :exit_after_task)
end
send_finalization() click to toggle source

Send all nodes a message to finalize and exit.

# File lib/drbqs/server/message.rb, line 115
def send_finalization
  send_signal_to_all_nodes(:finalize)
end
send_history(history_string) click to toggle source
# File lib/drbqs/server/message.rb, line 144
def send_history(history_string)
  begin
    @message.take([:history, nil], 0)
  rescue Rinda::RequestExpiredError
  end
  @message.write([:history, history_string])
end
send_only_response(sender_id, request_time) click to toggle source
# File lib/drbqs/server/message.rb, line 136
def send_only_response(sender_id, request_time)
  begin
    @message.take([:response, sender_id, nil], 0)
  rescue Rinda::RequestExpiredError
  end
  @message.write([:response, sender_id, request_time])
end
send_sleep(node_id) click to toggle source
# File lib/drbqs/server/message.rb, line 106
def send_sleep(node_id)
  send_signal(node_id, :sleep)
end
send_status(server_status_string) click to toggle source
# File lib/drbqs/server/message.rb, line 128
def send_status(server_status_string)
  begin
    @message.take([:status, nil], 0)
  rescue Rinda::RequestExpiredError
  end
  @message.write([:status, server_status_string])
end
send_wake(node_id) click to toggle source
# File lib/drbqs/server/message.rb, line 110
def send_wake(node_id)
  send_signal(node_id, :wake)
end
set_finalization_tasks(task_ary) click to toggle source
# File lib/drbqs/server/message.rb, line 183
def set_finalization_tasks(task_ary)
  set_special_task(:finalize, *task_ary)
end
set_initialization_tasks(task_ary) click to toggle source

If the task has already set, the method overwrite old task of initialization by new task.

# File lib/drbqs/server/message.rb, line 179
def set_initialization_tasks(task_ary)
  set_special_task(:initialize, *task_ary)
end
shutdown_unused_nodes(calculating_nodes) click to toggle source
# File lib/drbqs/server/message.rb, line 187
def shutdown_unused_nodes(calculating_nodes)
  shutdown_nodes = []
  @node_list.each do |node_id, id_str|
    if !@node_list.prepare_to_exit?(node_id) && !calculating_nodes.include?(node_id)
      send_exit_after_task(node_id)
      shutdown_nodes << node_id
    end
  end
  shutdown_nodes
end

Private Instance Methods

manage_message(mes, arg) click to toggle source
# File lib/drbqs/server/message.rb, line 36
def manage_message(mes, arg)
  @logger.info("Get message") { [mes, arg] }
  case mes
  when :connect
    a = [arg, @node_list.get_new_id(arg)]
    @logger.info("New node") { a }
    @message.write(a)
  when :alive
    @node_list.set_alive(arg)
  when :exit_server
    @logger.info("Get exit message from #{arg.to_s}")
  when :exit_after_task
    @logger.info("Get exit message for node #{arg.to_s} after current task")
    return [mes, arg]
  when :request_status
    @logger.info("Get status request from #{arg.to_s}")
  when :request_response
    @logger.info("Get respond request from #{arg[0].to_s} at #{arg[1].to_s}")
    return [mes, arg]
  when :request_history
    @logger.info("Get history request from #{arg.to_s}")
  when :sleep_node
    @logger.info("Get sleep node message for node #{arg.to_s}")
    return [mes, arg]
  when :wake_node
    @logger.info("Get wake node message for node #{arg.to_s}")
    return [mes, arg]
  when :node_error
    @node_list.delete(arg[0], :error)
    @logger.info("Node Error (#{arg[0]})") { arg[1] }
    return [mes, arg[0]]
  when :new_data
    return [mes, arg]
  else
    @logger.error("Invalid message from #{arg.to_s}")
    return nil
  end
  [mes]
end
send_signal(node_id, signal) click to toggle source
# File lib/drbqs/server/message.rb, line 87
def send_signal(node_id, signal)
  if node_exist?(node_id)
    @message.write([node_id, signal])
  end
end
send_signal_to_all_nodes(signal) click to toggle source
# File lib/drbqs/server/message.rb, line 94
def send_signal_to_all_nodes(signal)
  @node_list.each do |node_id, id_str|
    send_signal(node_id, signal)
  end
end
set_special_task(label, *tasks) click to toggle source
# File lib/drbqs/server/message.rb, line 164
def set_special_task(label, *tasks)
  task_ary = []
  begin
    task_ary = @message.take([label, Array], 0)[1]
  rescue Rinda::RequestExpiredError
  end
  tasks.each do |task|
    task_ary << task.simple_drb_args
  end
  @message.write([label, task_ary])
end