class Hasta::ExecutionContext
Executes each local EMR job in isolation
Attributes
sub_process[R]
Public Class Methods
new(ruby_files = [], env = {})
click to toggle source
# File lib/hasta/execution_context.rb, line 52 def initialize(ruby_files = [], env = {}) @sub_process = Subprocess.new(ruby_files, env) end
Public Instance Methods
execute(source_file, data_source, data_sink)
click to toggle source
# File lib/hasta/execution_context.rb, line 56 def execute(source_file, data_source, data_sink) sub_process.start(source_file) do |sub_process| [ stream_input(data_source, sub_process.stdin), stream_output(sub_process.stdout) { |line| data_sink << line }, stream_output(sub_process.stderr) { |line| Hasta.logger.error line }, ].each(&:join) end data_sink.close end
Private Instance Methods
stream_input(data_source, io)
click to toggle source
# File lib/hasta/execution_context.rb, line 72 def stream_input(data_source, io) Thread.new do data_source.each_line do |line| io.puts line end io.close_write end end
stream_output(io) { |rstrip| ... }
click to toggle source
# File lib/hasta/execution_context.rb, line 82 def stream_output(io) Thread.new do StringIO.new(io.read).each_line do |line| yield line.rstrip end end end