module WorkflowRESTHelpers
Attributes
workflow_resources[RW]
Public Class Methods
workflow_resources()
click to toggle source
# File lib/rbbt/rest/workflow/locate.rb, line 4 def self.workflow_resources @@workflow_resources ||= [Rbbt.share.views.find(:lib)] end
Public Instance Methods
abort_job(workflow, job)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 313 def abort_job(workflow, job) job.abort halt 200, "Aborted #{ job.path }" end
clean_job(workflow, job)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 319 def clean_job(workflow, job) job.clean if @format == :jobname halt 200, job.name elsif @ajax or @format == :json halt 200 else redirect to(File.join("/", workflow.to_s, job.task_name.to_s)) end end
complete_input_set(workflow, task, inputs)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 26 def complete_input_set(workflow, task, inputs) given = [] inputs.each do |key,value| next if value.nil? given << key.to_s end given = given.sort taken = (workflow.task_info(task.to_sym)[:inputs].collect{|i| i.to_s} + ['jobname']).uniq.sort given === taken end
consume_task_parameters(workflow, task, params = nil)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 6 def consume_task_parameters(workflow, task, params = nil) task_inputs = workflow.task_info(task.to_sym)[:inputs] task_parameters = {} task_inputs.each do |input| input_val = consume_parameter(input, params) input_val = input_val.strip if String === input_val input_val = [] if input_val == "EMPTY_ARRAY" task_parameters[input] = input_val unless input_val.nil? # Param files input_val = consume_parameter(input.to_s + '__param_file', params) task_parameters[input.to_s + '__param_file'] = input_val unless input_val.nil? end task_parameters end
execution_type(workflow, task)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 53 def execution_type(workflow, task) export = type_of_export(workflow, task) return :slurm if ENV["RBBT_REST_USE_SLURM"] == 'true' return cache_type if cache_type return :sync if export == :exec and @format == :html return export if export == :exec return :asynchronous end
issue_job(workflow, task, jobname = nil, params = {})
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 217 def issue_job(workflow, task, jobname = nil, params = {}) execution_type = execution_type(workflow, task) inputs = prepare_job_inputs(workflow, task, params) job = workflow.job(task, jobname, inputs) job.clean if job.aborted? clean_job(workflow, job) and clean = true if update.to_s == "clean" recursive_clean_job(workflow, job) and clean = true if update.to_s == "recursive_clean" case execution_type.to_sym when :exec show_exec_result job.exec(:stream), workflow, task when :stream if update == :reload job.abort job.clean end job_url = File.join("/", workflow.to_s, task, job.name) stream_job(job, job_url) when :synchronous, :sync if update == :reload job.abort job.clean end begin if not job.started? job.run job.join end if @format == :jobname job.name else job_url = job.respond_to?(:url)? job.url : File.join("/", workflow.to_s, task, job.name) job_url += "?_format=#{@format}" if @format redirect to(job_url) end rescue Exception Log.exception $! halt 500, $!.message end when :asynchronous, :async, nil if update == :reload job.abort job.clean end begin # $rest_cache_semaphore is defined in rbbt-util etc/app.d/semaphores.rb job.fork($rest_cache_semaphore) unless job.started? if @format == :jobname job.soft_grace content_type :text job.name else job.soft_grace job_url = job.respond_to?(:url)? job.url : File.join("/", workflow.to_s, task, job.name) job_url += "?_format=#{@format}" if @format redirect to(job_url) end rescue Exception Log.exception $! end when :slurm require 'rbbt/hpc' batch_system = ENV["BATCH_SYSTEM"] || "SLURM" system = HPC.batch_system batch_system system.orchestrate_job(job, {}) job_url = job.respond_to?(:url)? job.url : File.join("/", workflow.to_s, task, job.name) job_url += "?_format=#{@format}" if @format iii job_url redirect to(job_url) else raise "Unsupported execution_type: #{ execution_type }" end end
locate_workflow_template(template, workflow = nil, task = nil)
click to toggle source
# File lib/rbbt/rest/workflow/locate.rb, line 12 def locate_workflow_template(template, workflow = nil, task = nil) resources = workflow_resources resources.unshift workflow.libdir.www.views if workflow paths = [template] paths.unshift [workflow.to_s, template.to_s]*"/" if workflow paths.unshift [workflow.to_s, task.to_s, template.to_s]*"/" if workflow and task path = nil paths.each do |location| path ||= locate_server_file(location, resources, 'haml') end raise TemplateMissing, "Template not found: [#{ template }, #{workflow}, #{ task }]" if path.nil? path end
prepare_job_inputs(workflow, task, params)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 62 def prepare_job_inputs(workflow, task, params) inputs = workflow.task_info(task)[:inputs] input_types = workflow.task_info(task)[:input_types] input_options = workflow.task_info(task)[:input_options] task_inputs = {} inputs.each do |input| stream = input_options.include?(input) && input_options[input][:stream] value = prepare_input(params, input, input_types[input], stream) next if value.nil? task_inputs[input] = value end task_inputs end
recursive_clean_job(workflow, job)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 301 def recursive_clean_job(workflow, job) job.recursive_clean if @format == :jobname halt 200, job.name elsif ajax halt 200 else redirect to(File.join("/", workflow.to_s, job.task_name.to_s)) end end
show_exec_result(result, workflow, task)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 85 def show_exec_result(result, workflow, task) case @format.to_sym when :html show_result_html result, workflow, task, nil when :json content_type "application/json" halt 200, result.to_json when :tsv content_type "text/tab-separated-values" result = result.to_s unless String === result or result.respond_to? :gets halt 200, result when :literal, :raw content_type "text/plain" case workflow.task_info(task)[:result_type] when :array halt 200, result * "\n" else result = result.to_s unless String === result or result.respond_to? :gets halt 200, result end when :binary content_type "application/octet-stream" result = result.to_s unless String === result or result.respond_to? :gets halt 200, result.to_s when :jobname halt 200, nil else raise "Unsupported format: #{ @format }" end end
show_result(job, workflow, task, params = nil)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 116 def show_result(job, workflow, task, params = nil) return show_result_html nil, workflow, task, job.name, job, params if @fragment case @format.to_sym when :html show_result_html :load, workflow, task, job.name, job, params when :table halt 200, tsv2html(job.path, :url => "/" << [workflow.to_s, task, job.name] * "/") when :entities tsv = tsv_process(load_tsv(job.path).first) list = tsv.column_values(tsv.fields.first).flatten if not AnnotatedArray === list and Annotated === list.first list.first.annotate list list.extend AnnotatedArray end type = list.annotation_types.last list_id = "TMP #{type} in #{ [workflow.to_s, task, job.name] * " - " }" Entity::List.save_list(type.to_s, list_id, list, user) redirect to(Entity::REST.entity_list_url(list_id, type)) when :map tsv = tsv_process(load_tsv(job.path).first) type = tsv.keys.annotation_types.last column = tsv.fields.first map_id = "MAP #{type}-#{column} in #{ [workflow.to_s, task, job.name] * " - " }" Entity::Map.save_map(type.to_s, column, map_id, tsv, user) redirect to(Entity::REST.entity_map_url(map_id, type, column)) when :json content_type "application/json" halt 200, job.load.to_json when :tsv content_type "text/tab-separated-values" job.path ? send_file(job.path, :filename => (params[:filename] || File.basename(job.path))) : halt(200, job.load.to_s) when :literal, :raw path = job.path mime = file_mimetype path content_type mime || "text/plain" if job.path if Open.remote? job.path Open.open(job.path, :nocache => true) elsif File.exist? job.path send_file(job.path, :filename => (params[:filename] || File.basename(job.path))) else halt 200, job.load end else halt(200, job.load.to_s) end when :binary content_type "application/octet-stream" job.path ? send_file(job.path) : halt(200, job.load.to_s) when :excel require 'rbbt/tsv/excel' data = nil excel_file = TmpFile.tmp_file result = job.load result.excel(excel_file, :name => @excel_use_name,:sort_by => @excel_sort_by, :sort_by_cast => @excel_sort_by_cast, :remove_links => true) send_file excel_file, :type => 'application/vnd.ms-excel', :filename => job.clean_name + '.xls' when :heatmap tsv = job.load content_type "text/html" data = nil png_file = TmpFile.tmp_file width = tsv.fields.length * 10 + 500 height = tsv.size * 10 + 500 width = 10000 if width > 10000 height = 10000 if height > 10000 tsv.R <<-EOF rbbt.pheatmap(file='#{png_file}', data, width=#{width}, height=#{height}) EOF send_file png_file, :type => 'image/png', :filename => job.name + ".heatmap.png" else raise "Unsupported format: #{ @format }" end end
show_result_html(result, workflow, task, jobname = nil, job = nil, params = nil)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 78 def show_result_html(result, workflow, task, jobname = nil, job = nil, params = nil) params ||= {} result_type = workflow.task_info(task)[:result_type] result_description = workflow.task_info(task)[:result_description] workflow_render('job_result', workflow, task, {:result => result, :type => result_type, :description => result_description, :jobname => jobname, :job => job}.merge(params)) end
stream_job(job, job_url = nil)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 191 def stream_job(job, job_url = nil) job.clean if job.recoverable_error? unless job.started? or job.done? job.fork(:stream) job.soft_grace end raise "Error in #{job.path}: " + job.messages.last if job.error? s = TSV.get_stream job sout, sin = Misc.pipe Misc.consume_stream(s, true, sin) headers "RBBT-STREAMING-JOB-URL" => to(job_url) if job_url ConcurrentStream.setup(sout, :pair => s, :autojoin => true) do job.abort unless job.done? end halt 200, sout end
type_of_export(workflow, task)
click to toggle source
# File lib/rbbt/rest/workflow/jobs.rb, line 39 def type_of_export(workflow, task) task = task.to_sym case when workflow.exec_exports.include?(task) :exec when workflow.synchronous_exports.include?(task) :synchronous when (workflow.asynchronous_exports.include?(task) or workflow.stream_exports.include?(task)) :asynchronous else raise "Access denied: no known export type for #{ workflow }##{ task }." end end
workflow_partial(template, workflow = nil, task = nil, params = {})
click to toggle source
# File lib/rbbt/rest/workflow/render.rb, line 45 def workflow_partial(template, workflow = nil, task = nil, params = {}) workflow = consume_parameter(:workflow, params) if workflow.nil? task = consume_parameter(:task, params) if workflow.nil? template_file = locate_workflow_template(template, workflow, task) locals = params.dup locals[:workflow] = workflow if workflow locals[:task] = task if task render_partial(template_file, locals) end
workflow_render(template, workflow = nil, task = nil, params = {})
click to toggle source
# File lib/rbbt/rest/workflow/render.rb, line 6 def workflow_render(template, workflow = nil, task = nil, params = {}) workflow = consume_parameter(:workflow, params) if workflow.nil? task = consume_parameter(:task, params) if workflow.nil? job = consume_parameter(:job, params) if job.nil? template_file = locate_workflow_template(template, workflow, task) locals = params.dup locals[:workflow] = workflow if workflow locals[:task] = task if task if layout layout_file = workflow.libdir.www.views[workflow.to_s]["layout.haml"] if workflow.libdir layout_file = locate_template("layout") unless layout_file and layout_file.exists? else layout_file = nil end if job locals[:job] = job @step = job @title = [[workflow.to_s,task] * "#", job.clean_name] * " " cache_type = execution_type(workflow, task) server_key = $app_name html_dir = job.file('.html') other_params = params.dup other_params.delete_if{|k,v| k[0] == "_"} other_params.delete :result other_params[:template_file] = template_file cache_file = html_dir[server_key + "_" << Misc.obj2digest(other_params)] cache_type = false if params[:cache] == FalseClass render(template_file, locals, layout_file, [task,workflow,job.name] * "-", :cache_type => cache_type, :cache_file => cache_file) else cache_type = :async cache_type = false if params[:cache] == FalseClass render(template_file, locals, layout_file, [workflow.to_s, task, template_file.to_s].compact * "-", :cache_type => :async ) end end