class DRbQS::ProcessDefinition
Constants
- PATH_CPUINFO
- TIME_INTERVAL_EXECUTE_NODE
- TIME_INTERVAL_WAIT_SERVER_FINISH
Attributes
register[R]
Public Class Methods
new(server, node, port, io = nil)
click to toggle source
@param [Symbol] server Symbol of server name @param [Array] node An array of Symbol which means node name @param [String] port Port number @param [IO,nil] io IO object to output
# File lib/drbqs/execute/process_define.rb, line 12 def initialize(server, node, port, io = nil) @server = server @node = node @port = port @register = DRbQS::ProcessDefinition::Register.new @io = io end
Public Instance Methods
execute_node()
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 225 def execute_node uri = server_uri(@server) if uri && /^drbunix/ !~ uri each_node_to_execute do |name, data| execute_one_node(name, data, uri) # If there is no time interval then drb does not work properly. sleep(TIME_INTERVAL_EXECUTE_NODE) end end end
execute_server(server_args)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 133 def execute_server(server_args) if ary = get_server_setting(@server) name = ary[0].to_s data = ary[1] puts_progress "Execute server '#{name}' (#{data[:ssh] ? 'ssh' : 'local'})" setting = data[:setting] hostname = data[:args][0] type = data[:type] if data[:ssh] setting.value.connect name unless setting.set?(:connect) server_setting = setting.mode_setting else server_setting = setting server_setting.value.daemon FileName.create(local_log_directory, "server_execute.log", :position => :middle) end server_setting.set_server_argument(*server_args) if data[:unix_domain_socket] unless server_setting.value.unix server_setting.value.unix DRbQS::Temporary.socket_path end unless server_setting.value.execute_node server_setting.value.execute_node get_suitable_process_num end else server_setting.value.port server_port unless server_setting.set?(:sftp_host) server_setting.value.sftp_host hostname end end setting.parse! unless data[:ssh] server_setting.value.argument.each do |path| unless File.exist?(path) raise "File '#{path}' does not exist." end end end setting.exec end rescue Exception => err puts_progress "Fail to execute server '#{data[:name].to_s}'" mes = "Invalid server definition: #{err.to_s} (#{err.class.to_s})" begin mes = "#{setting.string_for_shell}; " << mes if setting.respond_to?(:string_for_shell) rescue end new_err = err.class.new(mes) new_err.set_backtrace(err.backtrace) raise new_err end
information()
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 236 def information info = {} info[:server] = @register.__server__.map do |name, data| new_data = data.dup new_data.delete(:setting) [name, new_data] end info[:node] = @register.__node__.map do |name, data| new_data = data.dup new_data.delete(:setting) [name, new_data] end if ary = get_server_setting(@server) default_server = ary[0] else default_server = nil end default_nodes = each_node_to_execute.map do |node_name, node_data| node_name end info[:default] = { :server => default_server, :node => default_nodes, :port => server_port } info end
information_string()
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 260 def information_string info = information str = "Server:\n" ary = (info[:server] + info[:node]).map do |name, data| name.size end string_name_size = ary.max info[:server].each do |name, data| if data[:unix_domain_socket] prop = "local(unix socket domain)" elsif data[:ssh] prop = "ssh" else prop = "local(ssh)" end str << (info[:default][:server] == name ? " * " : (data[:template] ? " - " : " ")) str << sprintf("%- #{string_name_size}s %s\n", name, prop) end str << "\nNode:\n" info[:node].each do |name, data| if data[:type] == :group prop = 'group: ' << data[:args].map(&:to_s).join(',') else prop = (data[:ssh] ? 'ssh' : 'local') end if info[:default][:node].include?(name) str << " * " elsif data[:type] == :group str << " # " elsif data[:template] str << " - " else str << " " end str << sprintf("%- #{string_name_size}s %s\n", name, prop) end str << "\nDefault port:\n #{info[:default][:port]}" str << "\n\nHelp:\n" str << " ssh: Process over SSH\n" str << " local: Process on localhost\n" str << " *: default, -: template, #: node group" end
load(path)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 31 def load(path) @register.__load__(path) end
local_log_directory()
click to toggle source
Log directory for processes on localhost. Processes over ssh does not use this directory.
# File lib/drbqs/execute/process_define.rb, line 27 def local_log_directory @logal_log_directory ||= FileName.create(default_value(:log) || 'drbqs_execute_log') end
test_consistency()
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 318 def test_consistency # Test existence of default server if @server && !get_server_setting(@server) raise "Invalid default server: #{@server.inspect}" end # Test existences of default nodes if node_names = default_value(:node) all_node_find_p = true node_names.each do |node| unless get_node_data(node) all_node_find_p = false $stderr.puts "Node definition #{node.inspect} does not exist!" end end unless all_node_find_p raise "Invalid default node." end end end
usage()
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 303 def usage if data = @register.__usage__ str = data[:message] ? "\nDescription:\n#{data[:message]}" : "" if (server_file = data[:server]) && File.exist?(server_file) Kernel.load(server_file) if server_help = DRbQS.option_help_message str << "\n\n" << server_help end end str else '' end end
wait_server_finish()
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 340 def wait_server_finish if uri = server_uri(@server) puts_progress "Wait finish of server #{uri}" manage = DRbQS::Manage.new(:uri => uri) while manage.server_respond? sleep(TIME_INTERVAL_EXECUTE_NODE) end else puts_progress "We tried to wait finish, however, we can not determine server uri" end end
Private Instance Methods
default_value(key)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 93 def default_value(key) @register.__default__[key] end
each_node(names = nil) { |*data| ... }
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 61 def each_node(names = nil, &block) if block_given? if names node_data = [] i = 0 while i < names.size name = names[i] if data = get_node_data(name) if data[:template] if :group == data[:type] data[:args].each do |n| names << n unless names.include?(n) end end else node_data << [name, data] end end i += 1 end node_data.each do |data| yield(*data) end else each_node(@register.__node__.map { |name, data| name }, &block) end else to_enum(:each_node, names) end end
each_node_to_execute(&block)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 218 def each_node_to_execute(&block) each_node(@node || @register.__default__[:node], &block) end
execute_one_node(name, data, uri)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 184 def execute_one_node(name, data, uri) puts_progress "Execute node '#{name}' (#{data[:ssh] ? 'ssh' : 'local'})" setting = data[:setting] node_setting = (data[:ssh] ? setting.mode_setting : setting) node_setting.value.argument.clear node_setting.value.connect uri if data[:ssh] unless setting.set?(:connect) setting.value.connect name.to_s end else node_log_dir = FileName.create(local_log_directory, "node_#{name}_log", :directory => :self) setting.clear :log_stdout setting.value.log_prefix File.join(node_log_dir, 'node') setting.value.daemon File.join(node_log_dir, 'execute.log') end setting.parse! # TODO: # If node is on localhost then program is terminated here, # because the node is executed as daemon process. setting.exec rescue Exception => err puts_progress "Fail to execute node '#{name.to_s}'" mes = "Invalid node definition: #{err.to_s} (#{err.class.to_s})" begin mes = "#{setting.string_for_shell}; " << mes if setting.respond_to?(:string_for_shell) rescue end new_err = err.class.new(mes) new_err.set_backtrace(err.backtrace) raise new_err end
get_node_data(name)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 52 def get_node_data(name) if ary = @register.__node__.assoc(name) ary[1] else nil end end
get_server_setting(name = nil)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 35 def get_server_setting(name = nil) if !name data = nil @register.__server__.each do |server_data| unless server_data[1][:template] data = server_data break end end return nil unless data elsif !(data = @register.__server__.assoc(name.intern)) return get_server_setting(nil) end data end
get_suitable_process_num()
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 119 def get_suitable_process_num n = 0 if File.exist?(PATH_CPUINFO) n = File.read(PATH_CPUINFO).lines.count { |l| /^processor/ =~ l } end if n <= 0 n = 1 puts_progress "Can not determine suitable process number, that is, can not count 'processor' lines in /proc/cpuinfo" end puts_progress "Execute #{n} processes to deal with tasks" n end
puts_progress(str)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 20 def puts_progress(str) @io.puts str if @io end
server_port()
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 98 def server_port @port || default_value(:port) || ROOT_DEFAULT_PORT end
server_uri(name)
click to toggle source
# File lib/drbqs/execute/process_define.rb, line 103 def server_uri(name) uri = nil if ary = get_server_setting(name) data = ary[1] if data[:unix_domain_socket] uri = DRbQS::Misc.uri_drbunix(data[:setting].value.unix.first) else uri = DRbQS::Misc.create_uri(:host => data[:args][0], :port => server_port) end end uri end