class Pwrake::Communicator
Attributes
channel[R]
handler[R]
host[R]
id[R]
ipaddr[R]
ncore[R]
reader[R]
shells[R]
writer[R]
Public Class Methods
new(set,id,host,ncore,selector,option)
click to toggle source
# File lib/pwrake/branch/communicator.rb, line 43 def initialize(set,id,host,ncore,selector,option) @set = set @id = id @host = host @ncore = @ncore_given = ncore @selector = selector @option = option @shells = {} @ipaddr = [] end
Public Instance Methods
common_line(s)
click to toggle source
# File lib/pwrake/branch/communicator.rb, line 121 def common_line(s) x = "Communicator#common_line(id=#{@id},host=#{@host})" case s when /^heartbeat$/ Log.debug "#{x}: #{s.inspect}" when /^exited$/ Log.debug "#{x}: #{s.inspect}" return false when /^log:(.*)$/ Log.info "#{x}: log>#{$1}" when String Log.warn "#{x}: out>#{s.inspect}" when Exception Log.warn "#{x}: err>#{s.class}: #{s.message}" dropout(s) return false else raise ConnectError, "#{x}: invalid for read: #{s.inspect}" end true end
connect(worker_code)
click to toggle source
# File lib/pwrake/branch/communicator.rb, line 82 def connect(worker_code) setup_pipe(worker_code) # send ncore and options opts = Marshal.dump(@option) s = [@ncore||0, opts.size].pack("V2") @iow.write(s) @iow.write(opts) sel = @set.selector @reader = NBIO::MultiReader.new(sel,@ior) @writer = NBIO::Writer.new(sel,@iow) @handler = NBIO::Handler.new(@reader,@writer,@host) # read ncore while s = @reader.get_line case s when /^ip:(.*)$/ a = $1 @ipaddr.push(a) Log.debug "ip=#{a} @#{@host}" when /^ncore:(.*)$/ a = $1 Log.debug "ncore=#{a} @#{@host}" if /^(\d+)$/ =~ a @ncore = $1.to_i return false else raise ConnectError, "invalid for ncore: #{a.inspect}" end else return false if !common_line(s) end end raise ConnectError, "lost connection to #{@host} during setup" rescue => e dropout(e) end
dropout(exc=nil)
click to toggle source
# File lib/pwrake/branch/communicator.rb, line 147 def dropout(exc=nil) # Finish worker begin finish_shells if @handler @handler.exit @handler = nil end rescue => e m = Log.bt(e) $stderr.puts(m) Log.error(m) end # Error output from worker if @ioe err_out = ["standard error from worker:"] while s = @ioe.gets err_out << s.chomp end if err_out.size > 1 m = err_out.join("\n ") $stderr.puts(m) Log.error(m) end end # Exception message if exc m = Log.bt(exc) $stderr.puts(m) Log.error(m) end ensure @set.delete(self) end
finish_shells()
click to toggle source
# File lib/pwrake/branch/communicator.rb, line 143 def finish_shells @shells.keys.each{|sh| sh.finish_task_q} end
inspect()
click to toggle source
# File lib/pwrake/branch/communicator.rb, line 54 def inspect "#<#{self.class} @id=#{@id},@host=#{@host},@ncore=#{@ncore}>" end
new_channel()
click to toggle source
# File lib/pwrake/branch/communicator.rb, line 58 def new_channel i,q = @reader.new_queue CommChannel.new(@host,i,q,@writer,[@ior,@iow,@ioe]) end
setup_pipe(worker_code)
click to toggle source
# File lib/pwrake/branch/communicator.rb, line 63 def setup_pipe(worker_code) rb_cmd = "ruby -e 'eval ARGF.read(#{worker_code.size})'" if %w[127.0.0.1 ::1].include?(IPSocket.getaddress(@host)) cmd = rb_cmd else cmd = "ssh -x -T #{@option[:ssh_option]} #{@host} \"#{rb_cmd}\"" end # @ior,w0 = IO.pipe @ioe,w1 = IO.pipe r2,@iow = IO.pipe @pid = Kernel.spawn(cmd,:pgroup=>true,:out=>w0,:err=>w1,:in=>r2) w0.close w1.close r2.close # send worker_code @iow.write(worker_code) end