class Spark::Command::Pipe


Pipe

Public Instance Methods

before_run() click to toggle source
# File lib/spark/command/basic.rb, line 296
def before_run
  require 'open3'

  @in, @out, @threads = Open3.pipeline_rw(*@cmds)
end
lazy_run(iterator, *) click to toggle source
# File lib/spark/command/basic.rb, line 318
def lazy_run(iterator, *)
  create_writing_thread(iterator)

  Enumerator::Lazy.new([nil]) do |yielder, _|
    begin
      loop {
        yielder << @out.readline.rstrip
      }
    rescue EOFError
    end
  end
end
run(iterator, *) click to toggle source
# File lib/spark/command/basic.rb, line 302
def run(iterator, *)
  create_writing_thread(iterator)

  new_iterator = []

  # Read full input
  begin
    loop {
      new_iterator << @out.readline.rstrip
    }
  rescue EOFError
  end

  new_iterator
end

Private Instance Methods

create_writing_thread(iterator) click to toggle source
# File lib/spark/command/basic.rb, line 333
def create_writing_thread(iterator)
  @writing_thread = Thread.new do
    # Send complete iterator to the pipe
    iterator.each do |item|
      @in.puts(item.to_s.rstrip)
    end

    # Input must be closed for EOFError
    @in.close
  end
end