class Dynflow::Director

Director is responsible for telling what to do next when:

* new execution starts
* an event accurs
* some work is finished

It's public methods (except terminate) return work items that the executor should understand

Constants

Event
UnprocessableEvent

Attributes

logger[R]

Public Class Methods

new(world) click to toggle source
# File lib/dynflow/director.rb, line 166
def initialize(world)
  @world = world
  @logger = world.logger
  @execution_plan_managers = {}
  @rescued_steps = {}
  @planning_plans = Set.new
end

Public Instance Methods

current_execution_plan_ids() click to toggle source
# File lib/dynflow/director.rb, line 174
def current_execution_plan_ids
  @execution_plan_managers.keys
end
halt(event) click to toggle source
# File lib/dynflow/director.rb, line 249
def halt(event)
  halt_execution(event.execution_plan_id)
end
handle_event(event) click to toggle source
# File lib/dynflow/director.rb, line 191
def handle_event(event)
  Type! event, Event
  execution_plan_manager = @execution_plan_managers[event.execution_plan_id]
  if execution_plan_manager
    execution_plan_manager.event(event)
  elsif event.optional
    event.result.reject "no manager for #{event.inspect}"
    []
  else
    raise Dynflow::Error, "no manager for #{event.inspect}"
  end
rescue Dynflow::Error => e
  event.result.reject e.message
  raise e
end
handle_planning(execution_plan_uuid) click to toggle source
# File lib/dynflow/director.rb, line 178
def handle_planning(execution_plan_uuid)
  return [] if @planning_plans.include? execution_plan_uuid

  @planning_plans << execution_plan_uuid
  [PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)]
end
start_execution(execution_plan_id, finished) click to toggle source
# File lib/dynflow/director.rb, line 185
def start_execution(execution_plan_id, finished)
  manager = track_execution_plan(execution_plan_id, finished)
  return [] unless manager
  unless_done(manager, manager.start)
end
terminate() click to toggle source
# File lib/dynflow/director.rb, line 233
def terminate
  unless @execution_plan_managers.empty?
    logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..."
    begin
      @execution_plan_managers.values.each do |manager|
        manager.terminate
      end
    rescue Errors::PersistenceError
      logger.error "could not to clean the data properly"
    end
    @execution_plan_managers.values.each do |manager|
      finish_manager(manager)
    end
  end
end
work_failed(work) click to toggle source

called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)

# File lib/dynflow/director.rb, line 224
def work_failed(work)
  if (manager = @execution_plan_managers[work.execution_plan_id])
    manager.terminate
    # Don't try to store when the execution plan went missing
    plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty?
    finish_manager(manager, store: !plan_missing)
  end
end
work_finished(work) click to toggle source
# File lib/dynflow/director.rb, line 207
def work_finished(work)
  case work
  when PlanningWorkItem
    @planning_plans.delete(work.execution_plan_id)
    @world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id)
    []
  else
    manager = @execution_plan_managers[work.execution_plan_id]
    return [] unless manager # skip case when getting event from execution plan that is not running anymore
    unless_done(manager, manager.what_is_next(work))
  end
end

Private Instance Methods

finish_manager(manager, store: true) click to toggle source
# File lib/dynflow/director.rb, line 284
def finish_manager(manager, store: true)
  update_execution_plan_state(manager) if store
  return []
ensure
  @execution_plan_managers.delete(manager.execution_plan.id)
  set_future(manager)
end
halt_execution(execution_plan_id) click to toggle source
# File lib/dynflow/director.rb, line 255
def halt_execution(execution_plan_id)
  manager = @execution_plan_managers[execution_plan_id]
  @logger.warn "Halting execution plan #{execution_plan_id}"
  return halt_inactive(execution_plan_id) unless manager

  manager.halt
  finish_manager manager
end
halt_inactive(execution_plan_id) click to toggle source
# File lib/dynflow/director.rb, line 264
def halt_inactive(execution_plan_id)
  plan = @world.persistence.load_execution_plan(execution_plan_id)
  plan.update_state(:stopped)
rescue => e
  @logger.error e
end
rescue!(manager) click to toggle source
# File lib/dynflow/director.rb, line 306
def rescue!(manager)
  # TODO: after moving to concurrent-ruby actors, there should be better place
  # to put this logic of making sure we don't run rescues in endless loop
  @rescued_steps[manager.execution_plan.id] ||= Set.new
  @rescued_steps[manager.execution_plan.id].merge(manager.execution_plan.failed_steps.map(&:id))
  new_state = manager.execution_plan.prepare_for_rescue
  if new_state == :running
    return manager.restart
  else
    manager.execution_plan.update_state(new_state)
    return false
  end
end
rescue?(manager) click to toggle source
# File lib/dynflow/director.rb, line 292
def rescue?(manager)
  if @world.terminating? || !(@world.auto_rescue && manager.execution_plan.error?)
    false
  elsif !@rescued_steps.key?(manager.execution_plan.id)
    # we have not rescued this plan yet
    true
  else
    # we have rescued this plan already, but a different step has failed now
    # we do this check to prevent endless loop, if we always failed on the same steps
    failed_step_ids = manager.execution_plan.failed_steps.map(&:id).to_set
    (failed_step_ids - @rescued_steps[manager.execution_plan.id]).any?
  end
end
set_future(manager) click to toggle source
# File lib/dynflow/director.rb, line 362
def set_future(manager)
  @rescued_steps.delete(manager.execution_plan.id)
  manager.future.fulfill manager.execution_plan
end
track_execution_plan(execution_plan_id, finished) click to toggle source
# File lib/dynflow/director.rb, line 320
def track_execution_plan(execution_plan_id, finished)
  execution_plan = @world.persistence.load_execution_plan(execution_plan_id)

  if @execution_plan_managers[execution_plan_id]
    raise Dynflow::Error,
      "cannot execute execution_plan_id:#{execution_plan_id} it's already running"
  end

  if execution_plan.state == :stopped
    raise Dynflow::Error,
      "cannot execute execution_plan_id:#{execution_plan_id} it's stopped"
  end

  lock_class = Coordinator::ExecutionInhibitionLock
  filters = { class: lock_class.to_s, owner_id: lock_class.lock_id(execution_plan_id) }
  if @world.coordinator.find_records(filters).any?
    halt_execution(execution_plan_id)
    raise Dynflow::Error,
      "cannot execute execution_plan_id:#{execution_plan_id} it's execution is inhibited"
  end

  @execution_plan_managers[execution_plan_id] =
    ExecutionPlanManager.new(@world, execution_plan, finished)
rescue Dynflow::Error => e
  finished.reject e
  nil
end
try_to_rescue(manager) click to toggle source
# File lib/dynflow/director.rb, line 280
def try_to_rescue(manager)
  rescue!(manager) if rescue?(manager)
end
unless_done(manager, work_items) click to toggle source
# File lib/dynflow/director.rb, line 271
def unless_done(manager, work_items)
  return [] unless manager
  if manager.done?
    try_to_rescue(manager) || finish_manager(manager)
  else
    return work_items
  end
end
update_execution_plan_state(manager) click to toggle source
# File lib/dynflow/director.rb, line 348
def update_execution_plan_state(manager)
  execution_plan = manager.execution_plan
  case execution_plan.state
  when :running
    if execution_plan.error?
      execution_plan.update_state(:paused)
    elsif manager.done?
      execution_plan.update_state(:stopped)
    end
    # If the state is marked as running without errors but manager is not done,
    # we let the invalidation procedure to handle re-execution on other executor
  end
end