class Hasta::ExecutionContext

Executes each local EMR job in isolation

Attributes

sub_process[R]

Public Class Methods

new(ruby_files = [], env = {}) click to toggle source
# File lib/hasta/execution_context.rb, line 52
def initialize(ruby_files = [], env = {})
  @sub_process = Subprocess.new(ruby_files, env)
end

Public Instance Methods

execute(source_file, data_source, data_sink) click to toggle source
# File lib/hasta/execution_context.rb, line 56
def execute(source_file, data_source, data_sink)
  sub_process.start(source_file) do |sub_process|
    [
      stream_input(data_source, sub_process.stdin),
      stream_output(sub_process.stdout) { |line| data_sink << line },
      stream_output(sub_process.stderr) { |line| Hasta.logger.error line },
    ].each(&:join)
  end

  data_sink.close
end

Private Instance Methods

stream_input(data_source, io) click to toggle source
# File lib/hasta/execution_context.rb, line 72
def stream_input(data_source, io)
  Thread.new do
    data_source.each_line do |line|
      io.puts line
    end

    io.close_write
  end
end
stream_output(io) { |rstrip| ... } click to toggle source
# File lib/hasta/execution_context.rb, line 82
def stream_output(io)
  Thread.new do
    StringIO.new(io.read).each_line do |line|
      yield line.rstrip
    end
  end
end