class Pwrake::Executor

Constants

ENV

Public Class Methods

new(selector,dir_class,id,option) click to toggle source
# File lib/pwrake/worker/executor.rb, line 42
def initialize(selector,dir_class,id,option)
  @selector = selector
  @id = id
  @option = option
  @out = Writer.instance
  @log = LogExecutor.instance
  @queue = FiberQueue.new(@log)
  @rd_list = []
  @dir = dir_class.new
  @dir.open
  @dir.open_messages.each{|m| @log.info(m)}
  @out.puts "#{@id}:open"

  r,w = IO.pipe
  @command_pipe_r = NBIO::Reader.new(@selector,r)
  @command_pipe_w = NBIO::Writer.new(@selector,w)
  @start_process_fiber = Fiber.new do
    while line = @queue.deq
      cmd = line
      while /\\$/ =~ line  # line continues
        line = @queue.deq
        break if !line
        cmd += line
      end
      break if @stopped
      cmd.chomp!
      if !cmd.empty?
        start_process(cmd)
      end
      Fiber.yield
    end
  end
end

Public Instance Methods

callback(rd,mode) click to toggle source
# File lib/pwrake/worker/executor.rb, line 166
def callback(rd,mode)
  while s = rd.gets
    s.chomp!
    @out.puts "#{@id}:#{mode}:#{s}"
  end
  if rd.eof?
    @rd_list.delete(rd)
    if @rd_list.empty?  # process_end
      @thread = @pid = nil
      @log.info inspect_status
      if @option[:gnu_time]
        if gt = @sh_gtm.gets
          @out.puts "#{@id}:t:#{gt.chomp}"
        end
        @sh_gtm.close
      end
      @out.puts "#{@id}:z:#{exit_status}"
      @sh_in.close
      @sh_out.close
      @sh_err.close
    end
  end
rescue => exc
  @log.error(([exc.to_s]+exc.backtrace).join("\n"))
  stop
end
close() click to toggle source
# File lib/pwrake/worker/executor.rb, line 81
def close
  if @thread
    @thread.join(15)
    sleep 0.1
  end
  @thread = Thread.new do
    @dir.close_messages.each{|m| @log.info(m)}
    @dir.close
  end
rescue => exc
  @log.error(([exc.to_s]+exc.backtrace).join("\n"))
end
execute(cmd) click to toggle source
# File lib/pwrake/worker/executor.rb, line 102
def execute(cmd)
  return if @stopped
  @queue.enq(cmd)
  @start_process_fiber.resume
end
exit_status() click to toggle source
# File lib/pwrake/worker/executor.rb, line 211
def exit_status
  s = @status
  case
  when s.signaled?
    if s.coredump?
      "core_dumped"
    else
      "killed:#{s.termsig}"
    end
  when s.stopped?
    "stopped:#{s.stopsig}"
  when s.exited?
    "#{s.exitstatus}"
  else
    "unknown:%#x" % s.to_i
  end
end
inspect_status() click to toggle source
# File lib/pwrake/worker/executor.rb, line 193
def inspect_status
  s = @status
  case
  when s.signaled?
    if s.coredump?
      "pid=#{s.pid} dumped core."
    else
      "pid=#{s.pid} was killed by signal #{s.termsig}"
    end
  when s.stopped?
    "pid=#{s.pid} was stopped by signal #{s.stopsig}"
  when s.exited?
    "pid=#{s.pid} exited normally. status=#{s.exitstatus}"
  else
    "unknown status %#x" % s.to_i
  end
end
join() click to toggle source
# File lib/pwrake/worker/executor.rb, line 94
def join
  if @thread
    @thread.join(15)
  end
rescue => exc
  @log.error(([exc.to_s]+exc.backtrace).join("\n"))
end
kill(sig) click to toggle source
# File lib/pwrake/worker/executor.rb, line 229
def kill(sig)
  stop
  if @pid
    Process.kill(sig,-@pid)
    @log.warn "Executor(id=#{@id})#kill pid=#{@pid} sig=#{sig}"
  end
end
start_process(command) click to toggle source
# File lib/pwrake/worker/executor.rb, line 108
def start_process(command)
  return if @thread      # running
  return if !command     # empty queue
  @dir.check_mountpoint
  @spawn_in, @sh_in = IO.pipe
  @sh_out, @spawn_out = IO.pipe
  @sh_err, @spawn_err = IO.pipe
  if @option[:gnu_time]
    @sh_gtm, @spawn_gtm = IO.pipe
    @pid = Kernel.spawn(ENV, wrap_gnu_time(command),
                        in:@spawn_in,
                        out:@spawn_out,
                        err:@spawn_err,
                        3=>@spawn_gtm,
                        chdir:@dir.current,
                        pgroup:true,
                       )
    @thread = Thread.new do
      @pid2,@status = Process.waitpid2(@pid)
      @spawn_in.close
      @spawn_out.close
      @spawn_err.close
      @spawn_gtm.close
    end
  else
    @pid = Kernel.spawn(ENV, command,
                        in:@spawn_in,
                        out:@spawn_out,
                        err:@spawn_err,
                        chdir:@dir.current,
                        pgroup:true
                       )
    @thread = Thread.new do
      @pid2,@status = Process.waitpid2(@pid)
      @spawn_in.close
      @spawn_out.close
      @spawn_err.close
    end
  end
  @log.info "pid=#{@pid} started. command=#{command.inspect}"

  @rd_out = NBIO::Reader.new(@selector,@sh_out)
  @rd_err = NBIO::Reader.new(@selector,@sh_err)
  @rd_list = [@rd_out,@rd_err]

  Fiber.new{callback(@rd_err,"e")}.resume
  Fiber.new{callback(@rd_out,"o")}.resume
end
stop() click to toggle source
# File lib/pwrake/worker/executor.rb, line 76
def stop
  @stopped = true
  @queue.finish
end
wrap_gnu_time(cmd) click to toggle source
# File lib/pwrake/worker/executor.rb, line 157
def wrap_gnu_time(cmd)
  if /\[|\]|<|>|\(|\)|\&|\||\\|\$|;|`|'|"|\n/ =~ cmd
    cmd = cmd.gsub(/'/,"'\"'\"'")
    cmd = (ENV['SHELL']||"sh")+" -c '#{cmd}'"
  end
  f = "%e,%S,%U,%M,%t,%K,%D,%p,%X,%Z,%F,%R,%W,%c,%w,%I,%O,%r,%s,%k"
  "/usr/bin/time -o /dev/fd/3 -f '#{f}' #{cmd}"
end