class Pwrake::Shell

Constants

BY_FIBER
OPEN_LIST

Attributes

host[R]
id[R]
profile[R]
status[R]

Public Class Methods

current() click to toggle source
# File lib/pwrake/branch/shell.rb, line 21
def self.current
  BY_FIBER[Fiber.current]
end
new(chan,comm,task_q,opt={}) click to toggle source
# File lib/pwrake/branch/shell.rb, line 25
def initialize(chan,comm,task_q,opt={})
  @chan = chan
  @id   = chan.id
  @host = chan.host
  @comm = comm
  @task_q = task_q
  @lock = DummyMutex.new
  @option = opt
  @work_dir = @option[:work_dir] || Dir.pwd
  @comm.shells[self] = true
end
profiler() click to toggle source
# File lib/pwrake/branch/shell.rb, line 17
def self.profiler
  @@profiler
end

Public Instance Methods

backquote(*command) click to toggle source
# File lib/pwrake/branch/shell.rb, line 75
def backquote(*command)
  command = command.join(' ')
  @lock.synchronize do
    a = []
    _execute(command){|x| a << x}
    a.join("\n")
  end
end
cd(dir="") click to toggle source
# File lib/pwrake/branch/shell.rb, line 92
def cd(dir="")
  _system("cd #{dir}") or die
end
create_fiber(master_w) click to toggle source
# File lib/pwrake/branch/shell.rb, line 218
def create_fiber(master_w)
  @master_w = master_w
  if !@opened
    Log.warn "not opened: host=#{@host} id=#{@id}"
  end
  Fiber.new do
    BY_FIBER[Fiber.current] = self
    Log.debug "shell start id=#{@id} host=#{@host}"
    begin
      while task_str = @task_q.deq
        #Log.debug "task_str=#{task_str}"
        if /^(\d+):(.*)$/ =~ task_str
          task_id, task_name = $1.to_i, $2
        else
          raise RuntimeError, "invalid task_str: #{task_str}"
        end
        @task_id = task_id
        @task_name = task_name
        task = Rake.application[task_name]
        @ncore = task.wrapper.n_used_cores
        begin
          Rake.application.current_flow[Fiber.current] = task.property.subflow
          task.execute(task.arguments) if task.needed?
          result = "taskend:#{@id}:#{task.name}"
        rescue Exception=>e
          Rake.application.display_error_message(e)
          Log.error e
          result = "taskfail:#{@id}:#{task.name}"
          break if @exited
        ensure
          Rake.application.current_flow[Fiber.current] = nil
          master_w.put_line result
        end
      end
      Log.debug "shell id=#{@id} fiber end"
      master_w.put_line "retire:#{@comm.id}"
      @comm.shells.delete(self)
      exit
      if @comm.shells.empty?
        @comm.dropout
      end
      @chan.halt
    rescue => e
      m = Log.bt(e)
      #$stderr.puts m
      Log.error(m)
    end
  end
end
die() click to toggle source
# File lib/pwrake/branch/shell.rb, line 96
def die
  raise "Failed at #{@host}, id=#{@id}, cmd='#{@cmd}'"
end
exit() click to toggle source
# File lib/pwrake/branch/shell.rb, line 55
def exit
  if !@opened
    Log.debug "already exited: host=#{@host} id=#{@id}"
    return
  end
  @opened = false
  _puts("exit")
  if (s = _gets) == "exit"
    OPEN_LIST.delete(__id__)
    Log.debug("Shell#exit: recieve #{s.inspect}")
    true
  else
    Log.debug("Shell#exit: recieve #{s.inspect}")
    false
  end
rescue IOError,Errno::EPIPE => e
  Log.debug("Shell#exit: #{Log.bt(e)}")
  false
end
finish_task_q() click to toggle source
# File lib/pwrake/branch/shell.rb, line 268
def finish_task_q
  @task_q.finish
  #Log.debug "finish_task_q: @task_q=#{@task_q.inspect}"
  while task_str = @task_q.deq_nonblock
    if /^(\d+):(.*)$/ =~ task_str
      task_id, task_name = $1.to_i, $2
    else
      raise RuntimeError, "invalid task_str: #{task_str}"
    end
    @master_w.put_line "taskfail:#{@id}:#{task_name}"
    Log.warn "unexecuted task: #{result}"
  end
  @chan.halt
end
open() click to toggle source
# File lib/pwrake/branch/shell.rb, line 39
def open
  if @opened
    Log.warn "already opened: host=#{@host} id=#{@id}"
    return
  end
  @opened = true
  _puts("open")
  if (s = _gets) == "open"
    OPEN_LIST[__id__] = self
    true
  else
    Log.error("Shell#open failed: recieve #{s.inspect}")
    false
  end
end
system(*command) click to toggle source
# File lib/pwrake/branch/shell.rb, line 84
def system(*command)
  command = command.join(' ')
  @lock.synchronize do
    _execute(command){|x| print x+"\n"}
  end
  @status == 0
end

Private Instance Methods

_backquote(cmd) click to toggle source
# File lib/pwrake/branch/shell.rb, line 131
def _backquote(cmd)
  @cmd = cmd
  a = []
  @lock.synchronize do
    _puts(cmd)
    @status = io_read_loop{|x| a << x}
  end
  a.join("\n")
end
_execute(cmd,quote=nil,&block) click to toggle source
# File lib/pwrake/branch/shell.rb, line 141
def _execute(cmd,quote=nil,&block)
  @cmd = cmd
  if !@opened
    raise "non opened"
  end
  @status = nil
  start_time = Time.now
  start_clock = Pwrake.clock
  begin
    _puts(cmd)
    @status = io_read_loop(&block)
  ensure
    end_time = Time.now
    end_clock = Pwrake.clock
    @status = @@profiler.profile(@task_id, @task_name, cmd,
                 start_time, end_time, end_clock-start_clock,
                 host, @ncore, @status, @gnu_time_status)
  end
end
_gets() click to toggle source
# File lib/pwrake/branch/shell.rb, line 111
def _gets
  s = @chan.get_line
  Log.debug "Shell#_gets(host=#{@host},id=#{@id}): #{s.inspect}"
  case s
  when Exception
    @chan.halt
    Log.error Log.bt(s)
  end
  s
end
_puts(s) click to toggle source
# File lib/pwrake/branch/shell.rb, line 106
def _puts(s)
  #Log.debug "Shell#_puts(host=#{@host},id=#{@id}): #{s.inspect}"
  @chan.put_line(s)
end
_system(cmd) click to toggle source
# File lib/pwrake/branch/shell.rb, line 122
def _system(cmd)
  @cmd = cmd
  @lock.synchronize do
    _puts(cmd)
    status = io_read_loop{}
    Integer(status||1) == 0
  end
end
io_read_loop() { |x| ... } click to toggle source
# File lib/pwrake/branch/shell.rb, line 161
def io_read_loop
  @gnu_time_status = nil
  while s = _gets
    case s
    when /^(\w+):(.*)$/
      x = [$1,$2]
      case x[0]
      when "o"
        yield x[1]
        next
      when "e"
        $stderr.print x[1]+"\n"
        next
      when "t"
        @gnu_time_status = x[1].split(',')
        next
      when "z"
        # see Executor#status_to_str
        status = x[1]
        case status
        when /^\d+$/
          status = status.to_i
        end
        return status
      when "err"
        # see Executor#status_to_str
        status = x[1]
        case status
        when /^\d+$/
          status = status.to_i
        end
        return status
      end
    when "exit"
      msg = "Shell#io_read_loop: exit"
      $stderr.puts(msg)
      Log.error(msg)
      @exited = true
      @chan.halt
      return "exit"
    when IOError
      @exited = true
      @chan.halt
      return "ioerror"
    when NBIO::TimeoutError
      @exited = true
      @chan.halt
      return "timeout"
    end
    msg = "Shell#io_read_loop: Invalid result: #{s.inspect}"
    $stderr.puts(msg)
    Log.error(msg)
  end
end