class Libis::Workflow::Task
noinspection RubyTooManyMethodsInspection
Attributes
name[RW]
parent[RW]
processing_item[RW]
workitem[RW]
Public Class Methods
default_values()
click to toggle source
# File lib/libis/workflow/task.rb, line 373 def self.default_values parameter_defs.each_with_object({}) do |parameter, hash| hash[parameter.first] = parameter.last[:default] end end
new(parent, cfg = {})
click to toggle source
# File lib/libis/workflow/task.rb, line 30 def initialize(parent, cfg = {}) @subitems_stopper = false @subtasks_stopper = false self.parent = parent configure cfg end
task_classes()
click to toggle source
# File lib/libis/workflow/task.rb, line 24 def self.task_classes # noinspection RubyArgCount ObjectSpace.each_object(::Class) .select {|klass| klass < self && klass != Libis::Workflow::TaskRunner} end
Public Instance Methods
<<(_task)
click to toggle source
# File lib/libis/workflow/task.rb, line 37 def <<(_task) raise Libis::WorkflowError, "Processing task '#{namepath}' is not allowed to have subtasks." end
apply_options(opts)
click to toggle source
# File lib/libis/workflow/task.rb, line 124 def apply_options(opts) o = {} o.merge!(opts[self.class.to_s] || {}) o.merge!(opts[name] || opts[names.join('/')] || {}) if o && o.is_a?(Hash) default_values.each do |name, _| next unless o.key?(name.to_s) next if o[name.to_s].nil? paramdef = get_parameter_definition name.to_sym value = paramdef.parse(o[name.to_s]) parameter(name.to_sym, value) end end end
logger()
click to toggle source
# File lib/libis/workflow/task.rb, line 152 def logger (parent || get_run).logger end
message(severity, msg, *args)
click to toggle source
Calls superclass method
# File lib/libis/workflow/task.rb, line 140 def message(severity, msg, *args) taskname = namepath rescue nil set_application(taskname) item = workitem rescue nil item = args.shift if args.size > 0 and args[0].is_a?(::Libis::Workflow::Base::WorkItem) subject = item.namepath rescue nil subject ||= item.name rescue nil subject ||= item.to_s rescue nil set_subject(subject) super(severity, msg, *args) end
namepath()
click to toggle source
# File lib/libis/workflow/task.rb, line 120 def namepath names.join('/') end
names()
click to toggle source
# File lib/libis/workflow/task.rb, line 116 def names (parent.names rescue []).push(name).compact end
root_task()
click to toggle source
# File lib/libis/workflow/task.rb, line 41 def root_task parent&.root_task || self end
run(item)
click to toggle source
@param [Libis::Workflow::Base::WorkItem] item
# File lib/libis/workflow/task.rb, line 46 def run(item) check_item_type ::Libis::Workflow::Base::WorkItem, item self.workitem = item # case action # when :retry # if !parameter(:run_always) && item.check_status(:DONE, namepath) # debug 'Retry: skipping task %s because it has finished successfully.', item, namepath # return item # end # when :failed # return item unless parameter(:run_always) # else # # type code here # end return item if action == :failed && !parameter(:run_always) (parameter(:retry_count) + 1).times do i = run_item(item) item = i if i.is_a?(Libis::Workflow::Base::WorkItem) # noinspection RubyScope case item.status(namepath) when :DONE # self.action = :run return item when :ASYNC_WAIT self.action = :retry when :ASYNC_HALT break when :FAILED break else return item end self.action = :retry sleep(parameter(:retry_interval)) end item.get_run.action = :failed return item rescue WorkflowError => e error e.message, item set_status item, :FAILED rescue WorkflowAbort => e set_status item, :FAILED raise e if parent rescue WorkflowAbortForget => e set_status item, :FAILED raise e rescue Exception => e set_status item, :FAILED fatal_error "Aborting ingest because of error: %s @ %s\n%s", item, e.message, e.backtrace.first, e.backtrace.map{|t| ' -- ' + t}.join("\n") raise Libis::WorkflowAbort, "#{e.message} @ #{e.backtrace.first}" ensure item.save! end
Protected Instance Methods
action()
click to toggle source
# File lib/libis/workflow/task.rb, line 310 def action get_run.action end
action=(action)
click to toggle source
# File lib/libis/workflow/task.rb, line 306 def action=(action) get_run.action = action end
capture_cmd(cmd, *opts)
click to toggle source
# File lib/libis/workflow/task.rb, line 294 def capture_cmd(cmd, *opts) out = StringIO.new err = StringIO.new $stdout = out $stderr = err status = system cmd, *opts return [status, out.string, err.string] ensure $stdout = STDOUT $stderr = STDERR end
check_item_type(klass, item = nil)
click to toggle source
# File lib/libis/workflow/task.rb, line 347 def check_item_type(klass, item = nil) item ||= workitem unless item.is_a? klass.to_s.constantize raise WorkflowError, "Workitem is of wrong type : #{item.class} - expected #{klass}" end end
check_processing_subitems()
click to toggle source
# File lib/libis/workflow/task.rb, line 330 def check_processing_subitems if @subitems_stopper @subitems_stopper = false return false end true end
configure(cfg)
click to toggle source
# File lib/libis/workflow/task.rb, line 158 def configure(cfg) self.name = cfg['name'] || (cfg['class'] || self.class).to_s.split('::').last (cfg['options'] || {}).merge( cfg.reject {|k, _| %w[options name class].include? k} ).symbolize_keys.each do |k, v| parameter(k, v) end end
get_root_item(item = nil)
click to toggle source
# File lib/libis/workflow/task.rb, line 318 def get_root_item(item = nil) (item || workitem).get_root end
get_run(item = nil)
click to toggle source
# File lib/libis/workflow/task.rb, line 314 def get_run(item = nil) get_root_item(item).get_run end
get_work_dir(item = nil)
click to toggle source
# File lib/libis/workflow/task.rb, line 322 def get_work_dir(item = nil) get_root_item(item).work_dir end
global_status(item)
click to toggle source
# File lib/libis/workflow/task.rb, line 252 def global_status(item) results = Hash.new(0) self.tasks.each { |subtask| results[item.status(subtask.namepath)] += 1 } [:FAILED, :ASYNC_WAIT, :ASYNC_HALT].each { |status| return status if results[status] > 0 } return :STARTED if results[:STARTED] > 0 :DONE end
item_type?(klass, item = nil)
click to toggle source
# File lib/libis/workflow/task.rb, line 354 def item_type?(klass, item = nil) item ||= workitem item.is_a? klass.to_s.constantize end
post_process(_)
click to toggle source
# File lib/libis/workflow/task.rb, line 195 def post_process(_) # optional implementation end
pre_process(_)
click to toggle source
# File lib/libis/workflow/task.rb, line 190 def pre_process(_) true # optional implementation end
run_item(item)
click to toggle source
# File lib/libis/workflow/task.rb, line 167 def run_item(item) @item_skipper = false return item if item.status(namepath) == :DONE && !parameter(:run_always) pre_process(item) if @item_skipper run_subitems(item) if parameter(:recursive) else set_status item, :STARTED self.processing_item = item process item item = processing_item run_subitems(item) if parameter(:recursive) set_status item, :DONE if item.check_status(:STARTED, namepath) end post_process item item end
run_subitems(parent_item)
click to toggle source
# File lib/libis/workflow/task.rb, line 199 def run_subitems(parent_item) return unless check_processing_subitems items = subitems(parent_item) return if items.empty? status_count = Hash.new(0) parent_item.status_progress(namepath, 0, items.count) items.each_with_index do |item, i| debug 'Processing subitem (%d/%d): %s', parent_item, i + 1, items.size, item.to_s new_item = item begin new_item = run_item(item) rescue Libis::WorkflowError => e set_status(item, :FAILED) error 'Error processing subitem (%d/%d): %s @ %s', item, i + 1, items.size, e.message, e.backtrace.first break if parameter(:abort_recursion_on_failure) rescue Libis::WorkflowAbort => e fatal_error "Fatal error processing subitem (%d/%d): %s\n%s", item, i + 1, items.size, e.message, e.backtrace[..9].map{|t| ' -- ' + t}.join("\n") set_status(item, :FAILED) break rescue Libis::WorkflowAbortForget => e fatal_error "Fatal error processing subitem (%d/%d): %s\n%s", item, i + 1, items.size, e.message, e.backtrace[..9].map{|t| ' -- ' + t}.join("\n") set_status(item, :FAILED) raise e rescue Exception => e fatal_error "Fatal error processing subitem (%d/%d): %s @ %s\n%s", item, i + 1, items.size, e.message, e.backtrace[..9].map{|t| ' -- ' + t}.join("\n") set_status(item, :FAILED) raise Libis::WorkflowAbort, "#{e.message} @ #{e.backtrace.first}" else item = new_item if new_item.is_a?(Libis::Workflow::Base::WorkItem) parent_item.status_progress(namepath, i + 1) ensure # noinspection RubyScope item_status = item.status(namepath) # noinspection RubyScope status_count[item_status] += 1 break if parameter(:abort_recursion_on_failure) && item_status != :DONE end end substatus_check(status_count, parent_item, 'item') end
set_status(item, state)
click to toggle source
# File lib/libis/workflow/task.rb, line 342 def set_status(item, state) item.set_status(namepath, state)# unless parameter(:run_always) state end
skip_processing_item()
click to toggle source
# File lib/libis/workflow/task.rb, line 338 def skip_processing_item @item_skipper = true end
stop_processing_subitems()
click to toggle source
# File lib/libis/workflow/task.rb, line 326 def stop_processing_subitems @subitems_stopper = true if parameter(:recursive) end
substatus_check(status_count, item, task_or_item)
click to toggle source
# File lib/libis/workflow/task.rb, line 260 def substatus_check(status_count, item, task_or_item) item_status = :DONE if (not_started = status_count[:NOT_STARTED]) > 0 debug "%d sub#{task_or_item}(s) skipped", item, not_started end if (started = status_count[:STARTED]) > 0 error "%d sub#{task_or_item}(s) started but not done", item, started item_status = :FAILED end if (waiting = status_count[:ASYNC_WAIT]) > 0 info "waiting for %d sub#{task_or_item}(s) in async process", item, waiting item_status = :ASYNC_WAIT end if (halted = status_count[:ASYNC_HALT]) > 0 warn "%d sub#{task_or_item}(s) halted in async process", item, halted item_status = :ASYNC_HALT end if (failed = status_count[:FAILED]) > 0 error "%d sub#{task_or_item}(s) failed", item, failed item_status = :FAILED end if (done = status_count[:DONE]) > 0 debug "%d sub#{task_or_item}(s) passed", item, done end set_status(item, item_status) end
Private Instance Methods
default_values()
click to toggle source
# File lib/libis/workflow/task.rb, line 369 def default_values self.class.default_values end
subitems(item = nil)
click to toggle source
# File lib/libis/workflow/task.rb, line 365 def subitems(item = nil) (item || workitem).get_item_list end
subtasks()
click to toggle source
# File lib/libis/workflow/task.rb, line 361 def subtasks tasks end