class OFlow::Actors::ShellOne
Attributes
cmd[R]
dir[R]
timeout[R]
Public Class Methods
new(task, options)
click to toggle source
Calls superclass method
OFlow::Actor::new
# File lib/oflow/actors/shellone.rb, line 14 def initialize(task, options) super @dir = options[:dir] @dir = '.' if @dir.nil? @dir = File.expand_path(@dir.strip) @cmd = options[:cmd] @timeout = options.fetch(:timeout, 1.0).to_f @timeout = 0.001 if 0.001 > @timeout end
Public Instance Methods
perform(op, box)
click to toggle source
# File lib/oflow/actors/shellone.rb, line 25 def perform(op, box) input = Oj.dump(box.contents, mode: :compat, indent: 0) i, o, e, _ = Open3.popen3(@cmd, chdir: @dir) i.write(input) i.close giveup = Time.now + @timeout ra = [e, o] out = '' err = '' ec = false # stderr closed flag oc = false # stdout closed flag while true rem = giveup - Time.now raise Exception.new("Timed out waiting for output.") if 0.0 > rem rs, _, es = select(ra, nil, ra, rem) unless es.nil? es.each do |io| ec |= io == e oc |= io == o end end break if ec && oc unless rs.nil? rs.each do |io| if io == e && !ec if io.closed? || io.eof? ec = true next end err += io.read_nonblock(1000) elsif io == o && !oc if io.closed? || io.eof? oc = true next end out += io.read_nonblock(1000) end end end break if ec && oc end if 0 < err.length raise Exception.new(err) end output = Oj.load(out, mode: :compat) o.close e.close task.ship(nil, Box.new(output, box.tracker)) end