class Pwrake::Executor
Constants
- ENV
Public Class Methods
new(selector,dir_class,id,option)
click to toggle source
# File lib/pwrake/worker/executor.rb, line 42 def initialize(selector,dir_class,id,option) @selector = selector @id = id @option = option @out = Writer.instance @log = LogExecutor.instance @queue = FiberQueue.new(@log) @rd_list = [] @dir = dir_class.new @dir.open @dir.open_messages.each{|m| @log.info(m)} @out.puts "#{@id}:open" r,w = IO.pipe @command_pipe_r = NBIO::Reader.new(@selector,r) @command_pipe_w = NBIO::Writer.new(@selector,w) @start_process_fiber = Fiber.new do while line = @queue.deq cmd = line while /\\$/ =~ line # line continues line = @queue.deq break if !line cmd += line end break if @stopped cmd.chomp! if !cmd.empty? start_process(cmd) end Fiber.yield end end end
Public Instance Methods
callback(rd,mode)
click to toggle source
# File lib/pwrake/worker/executor.rb, line 166 def callback(rd,mode) while s = rd.gets s.chomp! @out.puts "#{@id}:#{mode}:#{s}" end if rd.eof? @rd_list.delete(rd) if @rd_list.empty? # process_end @thread = @pid = nil @log.info inspect_status if @option[:gnu_time] if gt = @sh_gtm.gets @out.puts "#{@id}:t:#{gt.chomp}" end @sh_gtm.close end @out.puts "#{@id}:z:#{exit_status}" @sh_in.close @sh_out.close @sh_err.close end end rescue => exc @log.error(([exc.to_s]+exc.backtrace).join("\n")) stop end
close()
click to toggle source
# File lib/pwrake/worker/executor.rb, line 81 def close if @thread @thread.join(15) sleep 0.1 end @thread = Thread.new do @dir.close_messages.each{|m| @log.info(m)} @dir.close end rescue => exc @log.error(([exc.to_s]+exc.backtrace).join("\n")) end
execute(cmd)
click to toggle source
# File lib/pwrake/worker/executor.rb, line 102 def execute(cmd) return if @stopped @queue.enq(cmd) @start_process_fiber.resume end
exit_status()
click to toggle source
# File lib/pwrake/worker/executor.rb, line 211 def exit_status s = @status case when s.signaled? if s.coredump? "core_dumped" else "killed:#{s.termsig}" end when s.stopped? "stopped:#{s.stopsig}" when s.exited? "#{s.exitstatus}" else "unknown:%#x" % s.to_i end end
inspect_status()
click to toggle source
# File lib/pwrake/worker/executor.rb, line 193 def inspect_status s = @status case when s.signaled? if s.coredump? "pid=#{s.pid} dumped core." else "pid=#{s.pid} was killed by signal #{s.termsig}" end when s.stopped? "pid=#{s.pid} was stopped by signal #{s.stopsig}" when s.exited? "pid=#{s.pid} exited normally. status=#{s.exitstatus}" else "unknown status %#x" % s.to_i end end
join()
click to toggle source
# File lib/pwrake/worker/executor.rb, line 94 def join if @thread @thread.join(15) end rescue => exc @log.error(([exc.to_s]+exc.backtrace).join("\n")) end
kill(sig)
click to toggle source
# File lib/pwrake/worker/executor.rb, line 229 def kill(sig) stop if @pid Process.kill(sig,-@pid) @log.warn "Executor(id=#{@id})#kill pid=#{@pid} sig=#{sig}" end end
start_process(command)
click to toggle source
# File lib/pwrake/worker/executor.rb, line 108 def start_process(command) return if @thread # running return if !command # empty queue @dir.check_mountpoint @spawn_in, @sh_in = IO.pipe @sh_out, @spawn_out = IO.pipe @sh_err, @spawn_err = IO.pipe if @option[:gnu_time] @sh_gtm, @spawn_gtm = IO.pipe @pid = Kernel.spawn(ENV, wrap_gnu_time(command), in:@spawn_in, out:@spawn_out, err:@spawn_err, 3=>@spawn_gtm, chdir:@dir.current, pgroup:true, ) @thread = Thread.new do @pid2,@status = Process.waitpid2(@pid) @spawn_in.close @spawn_out.close @spawn_err.close @spawn_gtm.close end else @pid = Kernel.spawn(ENV, command, in:@spawn_in, out:@spawn_out, err:@spawn_err, chdir:@dir.current, pgroup:true ) @thread = Thread.new do @pid2,@status = Process.waitpid2(@pid) @spawn_in.close @spawn_out.close @spawn_err.close end end @log.info "pid=#{@pid} started. command=#{command.inspect}" @rd_out = NBIO::Reader.new(@selector,@sh_out) @rd_err = NBIO::Reader.new(@selector,@sh_err) @rd_list = [@rd_out,@rd_err] Fiber.new{callback(@rd_err,"e")}.resume Fiber.new{callback(@rd_out,"o")}.resume end
stop()
click to toggle source
# File lib/pwrake/worker/executor.rb, line 76 def stop @stopped = true @queue.finish end
wrap_gnu_time(cmd)
click to toggle source
# File lib/pwrake/worker/executor.rb, line 157 def wrap_gnu_time(cmd) if /\[|\]|<|>|\(|\)|\&|\||\\|\$|;|`|'|"|\n/ =~ cmd cmd = cmd.gsub(/'/,"'\"'\"'") cmd = (ENV['SHELL']||"sh")+" -c '#{cmd}'" end f = "%e,%S,%U,%M,%t,%K,%D,%p,%X,%Z,%F,%R,%W,%c,%w,%I,%O,%r,%s,%k" "/usr/bin/time -o /dev/fd/3 -f '#{f}' #{cmd}" end