class Pwrake::Shell
Constants
- BY_FIBER
- OPEN_LIST
Attributes
host[R]
id[R]
profile[R]
status[R]
Public Class Methods
current()
click to toggle source
# File lib/pwrake/branch/shell.rb, line 21 def self.current BY_FIBER[Fiber.current] end
new(chan,comm,task_q,opt={})
click to toggle source
# File lib/pwrake/branch/shell.rb, line 25 def initialize(chan,comm,task_q,opt={}) @chan = chan @id = chan.id @host = chan.host @comm = comm @task_q = task_q @lock = DummyMutex.new @option = opt @work_dir = @option[:work_dir] || Dir.pwd @comm.shells[self] = true end
profiler()
click to toggle source
# File lib/pwrake/branch/shell.rb, line 17 def self.profiler @@profiler end
Public Instance Methods
backquote(*command)
click to toggle source
# File lib/pwrake/branch/shell.rb, line 75 def backquote(*command) command = command.join(' ') @lock.synchronize do a = [] _execute(command){|x| a << x} a.join("\n") end end
cd(dir="")
click to toggle source
# File lib/pwrake/branch/shell.rb, line 92 def cd(dir="") _system("cd #{dir}") or die end
create_fiber(master_w)
click to toggle source
# File lib/pwrake/branch/shell.rb, line 218 def create_fiber(master_w) @master_w = master_w if !@opened Log.warn "not opened: host=#{@host} id=#{@id}" end Fiber.new do BY_FIBER[Fiber.current] = self Log.debug "shell start id=#{@id} host=#{@host}" begin while task_str = @task_q.deq #Log.debug "task_str=#{task_str}" if /^(\d+):(.*)$/ =~ task_str task_id, task_name = $1.to_i, $2 else raise RuntimeError, "invalid task_str: #{task_str}" end @task_id = task_id @task_name = task_name task = Rake.application[task_name] @ncore = task.wrapper.n_used_cores begin Rake.application.current_flow[Fiber.current] = task.property.subflow task.execute(task.arguments) if task.needed? result = "taskend:#{@id}:#{task.name}" rescue Exception=>e Rake.application.display_error_message(e) Log.error e result = "taskfail:#{@id}:#{task.name}" break if @exited ensure Rake.application.current_flow[Fiber.current] = nil master_w.put_line result end end Log.debug "shell id=#{@id} fiber end" master_w.put_line "retire:#{@comm.id}" @comm.shells.delete(self) exit if @comm.shells.empty? @comm.dropout end @chan.halt rescue => e m = Log.bt(e) #$stderr.puts m Log.error(m) end end end
die()
click to toggle source
# File lib/pwrake/branch/shell.rb, line 96 def die raise "Failed at #{@host}, id=#{@id}, cmd='#{@cmd}'" end
exit()
click to toggle source
# File lib/pwrake/branch/shell.rb, line 55 def exit if !@opened Log.debug "already exited: host=#{@host} id=#{@id}" return end @opened = false _puts("exit") if (s = _gets) == "exit" OPEN_LIST.delete(__id__) Log.debug("Shell#exit: recieve #{s.inspect}") true else Log.debug("Shell#exit: recieve #{s.inspect}") false end rescue IOError,Errno::EPIPE => e Log.debug("Shell#exit: #{Log.bt(e)}") false end
finish_task_q()
click to toggle source
# File lib/pwrake/branch/shell.rb, line 268 def finish_task_q @task_q.finish #Log.debug "finish_task_q: @task_q=#{@task_q.inspect}" while task_str = @task_q.deq_nonblock if /^(\d+):(.*)$/ =~ task_str task_id, task_name = $1.to_i, $2 else raise RuntimeError, "invalid task_str: #{task_str}" end @master_w.put_line "taskfail:#{@id}:#{task_name}" Log.warn "unexecuted task: #{result}" end @chan.halt end
open()
click to toggle source
# File lib/pwrake/branch/shell.rb, line 39 def open if @opened Log.warn "already opened: host=#{@host} id=#{@id}" return end @opened = true _puts("open") if (s = _gets) == "open" OPEN_LIST[__id__] = self true else Log.error("Shell#open failed: recieve #{s.inspect}") false end end
system(*command)
click to toggle source
# File lib/pwrake/branch/shell.rb, line 84 def system(*command) command = command.join(' ') @lock.synchronize do _execute(command){|x| print x+"\n"} end @status == 0 end
Private Instance Methods
_backquote(cmd)
click to toggle source
# File lib/pwrake/branch/shell.rb, line 131 def _backquote(cmd) @cmd = cmd a = [] @lock.synchronize do _puts(cmd) @status = io_read_loop{|x| a << x} end a.join("\n") end
_execute(cmd,quote=nil,&block)
click to toggle source
# File lib/pwrake/branch/shell.rb, line 141 def _execute(cmd,quote=nil,&block) @cmd = cmd if !@opened raise "non opened" end @status = nil start_time = Time.now start_clock = Pwrake.clock begin _puts(cmd) @status = io_read_loop(&block) ensure end_time = Time.now end_clock = Pwrake.clock @status = @@profiler.profile(@task_id, @task_name, cmd, start_time, end_time, end_clock-start_clock, host, @ncore, @status, @gnu_time_status) end end
_gets()
click to toggle source
# File lib/pwrake/branch/shell.rb, line 111 def _gets s = @chan.get_line Log.debug "Shell#_gets(host=#{@host},id=#{@id}): #{s.inspect}" case s when Exception @chan.halt Log.error Log.bt(s) end s end
_puts(s)
click to toggle source
# File lib/pwrake/branch/shell.rb, line 106 def _puts(s) #Log.debug "Shell#_puts(host=#{@host},id=#{@id}): #{s.inspect}" @chan.put_line(s) end
_system(cmd)
click to toggle source
# File lib/pwrake/branch/shell.rb, line 122 def _system(cmd) @cmd = cmd @lock.synchronize do _puts(cmd) status = io_read_loop{} Integer(status||1) == 0 end end
io_read_loop() { |x| ... }
click to toggle source
# File lib/pwrake/branch/shell.rb, line 161 def io_read_loop @gnu_time_status = nil while s = _gets case s when /^(\w+):(.*)$/ x = [$1,$2] case x[0] when "o" yield x[1] next when "e" $stderr.print x[1]+"\n" next when "t" @gnu_time_status = x[1].split(',') next when "z" # see Executor#status_to_str status = x[1] case status when /^\d+$/ status = status.to_i end return status when "err" # see Executor#status_to_str status = x[1] case status when /^\d+$/ status = status.to_i end return status end when "exit" msg = "Shell#io_read_loop: exit" $stderr.puts(msg) Log.error(msg) @exited = true @chan.halt return "exit" when IOError @exited = true @chan.halt return "ioerror" when NBIO::TimeoutError @exited = true @chan.halt return "timeout" end msg = "Shell#io_read_loop: Invalid result: #{s.inspect}" $stderr.puts(msg) Log.error(msg) end end