class OFlow::Actors::ShellRepeat

Attributes

cmd[R]
dir[R]
out[R]
timeout[R]

Public Class Methods

new(task, options) click to toggle source
Calls superclass method OFlow::Actor::new
# File lib/oflow/actors/shellrepeat.rb, line 16
def initialize(task, options)
  super
  @dir = options[:dir]
  @dir = '.' if @dir.nil?
  @dir = File.expand_path(@dir.strip)
  
  @cmd = options[:cmd]
  @timeout = options.fetch(:timeout, 1.0).to_f
  @timeout = 0.001 if 0.001 > @timeout
  @in = nil
  @out = nil
  @err = nil
  @pid = nil
  @outThread = nil
  @ctxs = {}
  @ctxCnt = 0
  @killLock = Mutex.new
end

Public Instance Methods

busy?() click to toggle source
# File lib/oflow/actors/shellrepeat.rb, line 73
def busy?()
  !@ctxs.empty?
end
clearCtx(ctx) click to toggle source
# File lib/oflow/actors/shellrepeat.rb, line 85
def clearCtx(ctx)
  @ctxs.delete(ctx)
end
getCtx(ctx) click to toggle source
# File lib/oflow/actors/shellrepeat.rb, line 77
def getCtx(ctx)
  @ctxs[ctx]
end
hasCtx?(ctx) click to toggle source
# File lib/oflow/actors/shellrepeat.rb, line 81
def hasCtx?(ctx)
  @ctxs.has_key?(ctx)
end
kill() click to toggle source
# File lib/oflow/actors/shellrepeat.rb, line 89
def kill()
  status = nil
  @killLock.synchronize do
    # kill but don't wait for an exit. Leave it orphaned so a new app can be
    # started.
    status = Process.kill("HUP", @pid) unless @pid.nil?
    @in.close() unless @in.nil?
    @out.close() unless @out.nil?
    @err.close() unless @err.nil?
    Thread.kill(@outThread) unless @outThread.nil?
    @in = nil
    @out = nil
    @err = nil
    @pid = nil
    @outThread = nil
  end
  status
end
perform(op, box) click to toggle source
# File lib/oflow/actors/shellrepeat.rb, line 35
def perform(op, box)
  if :kill == op
    status = kill()
    task.ship(:killed, Box.new(status, box.tracker))
    return
  end
  if @pid.nil?
    @in, @out, @err, wt = Open3.popen3(@cmd, chdir: @dir)
    @pid = wt[:pid]
    @outThread = Thread.start(self) do |me|
      Thread.current[:name] = me.task.full_name() + "-out"
      Oj.load(me.out, mode: :compat) do |o|
        begin
          k = o["ctx"]
          raise Exception.new("missing context in #{cmd} reply") if k.nil?
          raise Exception.new("context not found in #{cmd} reply for #{k}") unless me.hasCtx?(k)
          ctx = me.clearCtx(k)
          me.task.ship(nil, Box.new(o["out"], ctx))
        rescue Exception => e
          me.task.handle_error(e)
        end
      end
      @outThread = nil
      kill()
    end
  end
  if @in.closed?
    kill()
    return
  end
  @ctxCnt += 1
  @ctxs[@ctxCnt] = box.tracker
  wrap = { "ctx" => @ctxCnt, "in" => box.contents }
  input = Oj.dump(wrap, mode: :compat, indent: 0)
  @in.write(input + "\n")
  @in.flush
end