class Libis::Workflow::Task

noinspection RubyTooManyMethodsInspection

Attributes

name[RW]
parent[RW]
processing_item[RW]
workitem[RW]

Public Class Methods

default_values() click to toggle source
# File lib/libis/workflow/task.rb, line 373
def self.default_values
  parameter_defs.each_with_object({}) do |parameter, hash|
    hash[parameter.first] = parameter.last[:default]
  end
end
new(parent, cfg = {}) click to toggle source
# File lib/libis/workflow/task.rb, line 30
def initialize(parent, cfg = {})
  @subitems_stopper = false
  @subtasks_stopper = false
  self.parent = parent
  configure cfg
end
task_classes() click to toggle source
# File lib/libis/workflow/task.rb, line 24
def self.task_classes
  # noinspection RubyArgCount
  ObjectSpace.each_object(::Class)
      .select {|klass| klass < self && klass != Libis::Workflow::TaskRunner}
end

Public Instance Methods

<<(_task) click to toggle source
# File lib/libis/workflow/task.rb, line 37
def <<(_task)
  raise Libis::WorkflowError, "Processing task '#{namepath}' is not allowed to have subtasks."
end
apply_options(opts) click to toggle source
# File lib/libis/workflow/task.rb, line 124
def apply_options(opts)
  o = {}
  o.merge!(opts[self.class.to_s] || {})
  o.merge!(opts[name] || opts[names.join('/')] || {})

  if o && o.is_a?(Hash)
    default_values.each do |name, _|
      next unless o.key?(name.to_s)
      next if o[name.to_s].nil?
      paramdef = get_parameter_definition name.to_sym
      value = paramdef.parse(o[name.to_s])
      parameter(name.to_sym, value)
    end
  end
end
logger() click to toggle source
# File lib/libis/workflow/task.rb, line 152
def logger
  (parent || get_run).logger
end
message(severity, msg, *args) click to toggle source
Calls superclass method
# File lib/libis/workflow/task.rb, line 140
def message(severity, msg, *args)
  taskname = namepath rescue nil
  set_application(taskname)
  item = workitem rescue nil
  item = args.shift if args.size > 0 and args[0].is_a?(::Libis::Workflow::Base::WorkItem)
  subject = item.namepath rescue nil
  subject ||= item.name rescue nil
  subject ||= item.to_s rescue nil
  set_subject(subject)
  super(severity, msg, *args)
end
namepath() click to toggle source
# File lib/libis/workflow/task.rb, line 120
def namepath
  names.join('/')
end
names() click to toggle source
# File lib/libis/workflow/task.rb, line 116
def names
  (parent.names rescue []).push(name).compact
end
root_task() click to toggle source
# File lib/libis/workflow/task.rb, line 41
def root_task
  parent&.root_task || self
end
run(item) click to toggle source

@param [Libis::Workflow::Base::WorkItem] item

# File lib/libis/workflow/task.rb, line 46
def run(item)
  check_item_type ::Libis::Workflow::Base::WorkItem, item
  self.workitem = item

  # case action
  # when :retry
  #   if !parameter(:run_always) && item.check_status(:DONE, namepath)
  #     debug 'Retry: skipping task %s because it has finished successfully.', item, namepath
  #     return item
  #   end
  # when :failed
  #   return item unless parameter(:run_always)
  # else
  #   # type code here
  # end

  return item if action == :failed && !parameter(:run_always)

  (parameter(:retry_count) + 1).times do

    i = run_item(item)
    item = i if i.is_a?(Libis::Workflow::Base::WorkItem)

    # noinspection RubyScope
    case item.status(namepath)
    when :DONE
      # self.action = :run
      return item
    when :ASYNC_WAIT
      self.action = :retry
    when :ASYNC_HALT
      break
    when :FAILED
      break
    else
      return item
    end

    self.action = :retry

    sleep(parameter(:retry_interval))

  end

  item.get_run.action = :failed

  return item

rescue WorkflowError => e
  error e.message, item
  set_status item, :FAILED

rescue WorkflowAbort => e
  set_status item, :FAILED
  raise e if parent

rescue WorkflowAbortForget => e
  set_status item, :FAILED
  raise e

rescue Exception => e
  set_status item, :FAILED
  fatal_error "Aborting ingest because of error: %s @ %s\n%s", item, e.message, e.backtrace.first, e.backtrace.map{|t| ' -- ' + t}.join("\n")
  raise Libis::WorkflowAbort, "#{e.message} @ #{e.backtrace.first}"

ensure
  item.save!

end

Protected Instance Methods

action() click to toggle source
# File lib/libis/workflow/task.rb, line 310
def action
  get_run.action
end
action=(action) click to toggle source
# File lib/libis/workflow/task.rb, line 306
def action=(action)
  get_run.action = action
end
capture_cmd(cmd, *opts) click to toggle source
# File lib/libis/workflow/task.rb, line 294
def capture_cmd(cmd, *opts)
  out = StringIO.new
  err = StringIO.new
  $stdout = out
  $stderr = err
  status = system cmd, *opts
  return [status, out.string, err.string]
ensure
  $stdout = STDOUT
  $stderr = STDERR
end
check_item_type(klass, item = nil) click to toggle source
# File lib/libis/workflow/task.rb, line 347
def check_item_type(klass, item = nil)
  item ||= workitem
  unless item.is_a? klass.to_s.constantize
    raise WorkflowError, "Workitem is of wrong type : #{item.class} - expected #{klass}"
  end
end
check_processing_subitems() click to toggle source
# File lib/libis/workflow/task.rb, line 330
def check_processing_subitems
  if @subitems_stopper
    @subitems_stopper = false
    return false
  end
  true
end
configure(cfg) click to toggle source
# File lib/libis/workflow/task.rb, line 158
def configure(cfg)
  self.name = cfg['name'] || (cfg['class'] || self.class).to_s.split('::').last
  (cfg['options'] || {}).merge(
      cfg.reject {|k, _| %w[options name class].include? k}
  ).symbolize_keys.each do |k, v|
    parameter(k, v)
  end
end
get_root_item(item = nil) click to toggle source
# File lib/libis/workflow/task.rb, line 318
def get_root_item(item = nil)
  (item || workitem).get_root
end
get_run(item = nil) click to toggle source
# File lib/libis/workflow/task.rb, line 314
def get_run(item = nil)
  get_root_item(item).get_run
end
get_work_dir(item = nil) click to toggle source
# File lib/libis/workflow/task.rb, line 322
def get_work_dir(item = nil)
  get_root_item(item).work_dir
end
global_status(item) click to toggle source
# File lib/libis/workflow/task.rb, line 252
def global_status(item)
  results = Hash.new(0)
  self.tasks.each { |subtask| results[item.status(subtask.namepath)] += 1 }
  [:FAILED, :ASYNC_WAIT, :ASYNC_HALT].each { |status| return status if results[status] > 0 }
  return :STARTED if results[:STARTED] > 0
  :DONE
end
item_type?(klass, item = nil) click to toggle source
# File lib/libis/workflow/task.rb, line 354
def item_type?(klass, item = nil)
  item ||= workitem
  item.is_a? klass.to_s.constantize
end
post_process(_) click to toggle source
# File lib/libis/workflow/task.rb, line 195
def post_process(_)
  # optional implementation
end
pre_process(_) click to toggle source
# File lib/libis/workflow/task.rb, line 190
def pre_process(_)
  true
  # optional implementation
end
run_item(item) click to toggle source
# File lib/libis/workflow/task.rb, line 167
def run_item(item)
  @item_skipper = false

  return item if item.status(namepath) == :DONE && !parameter(:run_always)

  pre_process(item)

  if @item_skipper
    run_subitems(item) if parameter(:recursive)
  else
    set_status item, :STARTED
    self.processing_item = item
    process item
    item = processing_item
    run_subitems(item) if parameter(:recursive)
    set_status item, :DONE if item.check_status(:STARTED, namepath)
  end

  post_process item

  item
end
run_subitems(parent_item) click to toggle source
# File lib/libis/workflow/task.rb, line 199
def run_subitems(parent_item)
  return unless check_processing_subitems

  items = subitems(parent_item)
  return if items.empty?

  status_count = Hash.new(0)
  parent_item.status_progress(namepath, 0, items.count)
  items.each_with_index do |item, i|
    debug 'Processing subitem (%d/%d): %s', parent_item, i + 1, items.size, item.to_s
    new_item = item

    begin
      new_item = run_item(item)

    rescue Libis::WorkflowError => e
      set_status(item, :FAILED)
      error 'Error processing subitem (%d/%d): %s @ %s', item, i + 1, items.size, e.message, e.backtrace.first
      break if parameter(:abort_recursion_on_failure)

    rescue Libis::WorkflowAbort => e
      fatal_error "Fatal error processing subitem (%d/%d): %s\n%s", item, i + 1, items.size, e.message, e.backtrace[..9].map{|t| ' -- ' + t}.join("\n")
      set_status(item, :FAILED)
      break

    rescue Libis::WorkflowAbortForget => e
      fatal_error "Fatal error processing subitem (%d/%d): %s\n%s", item, i + 1, items.size, e.message, e.backtrace[..9].map{|t| ' -- ' + t}.join("\n")
      set_status(item, :FAILED)
      raise e

    rescue Exception => e
      fatal_error "Fatal error processing subitem (%d/%d): %s @ %s\n%s", item, i + 1, items.size, e.message, e.backtrace[..9].map{|t| ' -- ' + t}.join("\n")
      set_status(item, :FAILED)
      raise Libis::WorkflowAbort, "#{e.message} @ #{e.backtrace.first}"

    else
      item = new_item if new_item.is_a?(Libis::Workflow::Base::WorkItem)
      parent_item.status_progress(namepath, i + 1)

    ensure
      # noinspection RubyScope
      item_status = item.status(namepath)
      # noinspection RubyScope
      status_count[item_status] += 1
      break if parameter(:abort_recursion_on_failure) && item_status != :DONE

    end

  end

  substatus_check(status_count, parent_item, 'item')
end
set_status(item, state) click to toggle source
# File lib/libis/workflow/task.rb, line 342
def set_status(item, state)
  item.set_status(namepath, state)# unless parameter(:run_always)
  state
end
skip_processing_item() click to toggle source
# File lib/libis/workflow/task.rb, line 338
def skip_processing_item
  @item_skipper = true
end
stop_processing_subitems() click to toggle source
# File lib/libis/workflow/task.rb, line 326
def stop_processing_subitems
  @subitems_stopper = true if parameter(:recursive)
end
substatus_check(status_count, item, task_or_item) click to toggle source
# File lib/libis/workflow/task.rb, line 260
def substatus_check(status_count, item, task_or_item)
  item_status = :DONE

  if (not_started = status_count[:NOT_STARTED]) > 0
    debug "%d sub#{task_or_item}(s) skipped", item, not_started
  end

  if (started = status_count[:STARTED]) > 0
    error "%d sub#{task_or_item}(s) started but not done", item, started
    item_status = :FAILED
  end

  if (waiting = status_count[:ASYNC_WAIT]) > 0
    info "waiting for %d sub#{task_or_item}(s) in async process", item, waiting
    item_status = :ASYNC_WAIT
  end

  if (halted = status_count[:ASYNC_HALT]) > 0
    warn "%d sub#{task_or_item}(s) halted in async process", item, halted
    item_status = :ASYNC_HALT
  end

  if (failed = status_count[:FAILED]) > 0
    error "%d sub#{task_or_item}(s) failed", item, failed
    item_status = :FAILED
  end

  if (done = status_count[:DONE]) > 0
    debug "%d sub#{task_or_item}(s) passed", item, done
  end

  set_status(item, item_status)
end

Private Instance Methods

default_values() click to toggle source
# File lib/libis/workflow/task.rb, line 369
def default_values
  self.class.default_values
end
subitems(item = nil) click to toggle source
# File lib/libis/workflow/task.rb, line 365
def subitems(item = nil)
  (item || workitem).get_item_list
end
subtasks() click to toggle source
# File lib/libis/workflow/task.rb, line 361
def subtasks
  tasks
end