class StreamWorkflowTask
Constants
- EOL
Public Class Methods
new(app)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 5 def initialize(app) @app = app end
Public Instance Methods
_merge_chunks(sin, sout)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 139 def _merge_chunks(sin, sout) begin while true chunk_size_str = "" stop = false while chunk_size_str.strip.empty? chunk_size_str = sin.gets raise "Empty chunk size" if chunk_size_str.nil? or chunk_size_str.strip.empty? chunk_size_str = "" if chunk_size_str.nil? end break if stop size = chunk_size_str.strip.to_i(16) break if size == 0 chunk = sin.read(size) bound = sin.read(2) raise "bound not right: #{ bound }" if bound != EOL raise "Size does not match: #{[chunk.length, size] * " != "}" if chunk.length != size sout.write chunk end rescue Aborted raise $! rescue StandardError Log.exception $! raise $! ensure #if sin.respond_to? :close_read # sin.close_read #else # sin.close unless sin.closed? #end #if sin.respond_to? :threads # sin.threads.each do |th| th.raise Aborted end #end end end
call(env)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 215 def call(env) if do_stream(env) begin client = env["rack.hijack"] buffer = client.instance_variable_get('@buffer') tcp_io = client.call Log.low "Hijacking post data #{tcp_io}" content_type = env["CONTENT_TYPE"] encoding = env["HTTP_TRANSFER_ENCODING"] if env["rack.input"] tcp_merged_io = Misc.open_pipe do |sin| rinput = env["rack.input"] sin << rinput.instance_variable_get("@rbuf") while c = rinput.gets sin.puts c end end else if encoding == "chunked" Log.low "Merging chunks #{tcp_io}" tcp_merged_io = Misc.open_pipe do |sin| begin merge_chunks(tcp_io, sin, buffer); rescue StandardError ensure begin tcp_io.close_read; rescue end end end else tcp_merged_io = tcp_io end end #tcp_merged_io = Misc.log_stream(tcp_merged_io) inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io) workflow, task = parse_uri(env) job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename) job_url = File.join("/", workflow.to_s, task, job.name) raise "Job aborted" if job.aborted? raise job.messages.last if job.error? out_stream = TSV.get_stream job begin Log.high "Write response #{Misc.fingerprint tcp_io} " tcp_io.write "HTTP/1.1 200\r\n" tcp_io.write "Connection: close\r\n" tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n" tcp_io.write "\r\n" Log.high "Comsuming response #{Misc.fingerprint tcp_io}" begin while l = out_stream.readpartial(2048) tcp_io.write l end rescue EOFError end Log.high "Comsumed response #{Misc.fingerprint tcp_io}" out_stream.join if out_stream.respond_to? :join rescue Exception Log.exception $! raise $! end if out_stream tcp_io.close_write unless tcp_io.closed? Log.high "Closed io #{tcp_io}" [-1, {}, []] rescue Exception Log.exception $! job.exception $! if job tcp_io.write "HTTP/1.1 500\r\n" tcp_io.write "Connection: close\r\n" tcp_io.write "\r\n" tcp_io.close_write raise $! end else Log.low "NOT Hijacking post data" @app.call(env) end end
copy_until_boundary(sin, sout, boundary)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 58 def copy_until_boundary(sin, sout, boundary) last_line = nil while line = sin.gets break if line.include? boundary sout.write last_line last_line = line end sout.write last_line.strip unless last_line.nil? or last_line == EOL end
do_stream(env)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 197 def do_stream(env) uri = @uri post = @request_method return false unless post == "POST" hijack = !!env["rack.hijack"] return false unless hijack content_type = env["CONTENT_TYPE"] return false unless content_type and content_type.include? "Rbbt_Param_Stream" encoding = env["HTTP_TRANSFER_ENCODING"] return false unless encoding.nil? or encoding == "chunked" true end
get_inputs(content_type, stream)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 68 def get_inputs(content_type, stream) boundary = content_type.match(/boundary=([^\s;]*)/)[1] stream_input = content_type.match(/stream=([^\s;]*)/)[1] inputs, filename = read_normal_inputs(stream, boundary, stream_input) IndiferentHash.setup(inputs) [inputs, stream_input, filename, stream, boundary] end
merge_chunks(sin, sout, buffer)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 177 def merge_chunks(sin, sout, buffer) if buffer.nil? _merge_chunks(sin, sout) else ssin = Misc.open_pipe do |s| begin s << buffer while c = sin.readpartial(Misc::BLOCK_SIZE) s << c end rescue Aborted, IOError rescue Exception ensure s.close end end _merge_chunks(ssin, sout) end end
parse_uri(env)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 11 def parse_uri(env) uri = @uri _n, workflow, task = uri.split("/") workflow = begin Kernel.const_get(workflow) rescue raise "Could not accept task for workflow: #{ workflow }" end [workflow, task] end
read_normal_inputs(io, boundary, stream_input)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 22 def read_normal_inputs(io, boundary, stream_input) inputs = {} input_name = nil variable_chunk = nil filename = nil while line = io.gets line.chomp! chunk_start = line == "--" + boundary if chunk_start if input_name inputs[input_name] = variable_chunk end content_start = false elsif content_start if variable_chunk.empty? variable_chunk << line else variable_chunk << "\n" << line end elsif line =~ /^Content.* name="([^\s;"]*)"/ input_name = $1 filename = line.match(/filename="([^"]+)"/)[1] if line =~ /filename/ elsif line.empty? variable_chunk = "" break if input_name == stream_input content_start = true end end [inputs, filename] end
run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil)
click to toggle source
# File lib/rbbt/rest/workflow/stream_task.rb, line 78 def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) name = inputs.delete "jobname" task_parameters = prepare_job_inputs(workflow, task, inputs) IndiferentHash.setup task_parameters Misc.add_stream_filename(stream, filename) if filename clean_stream = Misc.open_pipe do |sin| begin copy_until_boundary(stream, sin, boundary) rescue Log.exception $! end end ConcurrentStream.setup(clean_stream, :filename => filename) task_parameters[stream_input.to_sym] = clean_stream task = task.to_sym Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}" job = workflow.job(task, name, task_parameters) job.clean if job.aborted? execution_type = type_of_export(workflow, task) execution_type = "exec" if inputs["_cache_type"] == 'exec' begin case execution_type.to_s when "exec", nil job.exec(:stream) when "sync", "synchronous", "async", "asynchronous" if job.done? or job.started? done_consumer = Thread.new do Misc.consume_stream(clean_stream) end job.join unless job.done? else job.run(:stream) end else raise "Unknown execution_type: #{Misc.inspect execution_type}" end rescue Aborted, Interrupt job.abort stream.write "HTTP/1.1 500\r\n" stream.close_write rescue Exception job.exception $! stream.write "HTTP/1.1 500\r\n" stream.close_write end job end