class Dynflow::Director
Director
is responsible for telling what to do next when:
* new execution starts * an event accurs * some work is finished
It's public methods (except terminate) return work items that the executor should understand
Constants
- Event
- UnprocessableEvent
Attributes
logger[R]
Public Class Methods
new(world)
click to toggle source
# File lib/dynflow/director.rb, line 166 def initialize(world) @world = world @logger = world.logger @execution_plan_managers = {} @rescued_steps = {} @planning_plans = Set.new end
Public Instance Methods
current_execution_plan_ids()
click to toggle source
# File lib/dynflow/director.rb, line 174 def current_execution_plan_ids @execution_plan_managers.keys end
halt(event)
click to toggle source
# File lib/dynflow/director.rb, line 249 def halt(event) halt_execution(event.execution_plan_id) end
handle_event(event)
click to toggle source
# File lib/dynflow/director.rb, line 191 def handle_event(event) Type! event, Event execution_plan_manager = @execution_plan_managers[event.execution_plan_id] if execution_plan_manager execution_plan_manager.event(event) elsif event.optional event.result.reject "no manager for #{event.inspect}" [] else raise Dynflow::Error, "no manager for #{event.inspect}" end rescue Dynflow::Error => e event.result.reject e.message raise e end
handle_planning(execution_plan_uuid)
click to toggle source
# File lib/dynflow/director.rb, line 178 def handle_planning(execution_plan_uuid) return [] if @planning_plans.include? execution_plan_uuid @planning_plans << execution_plan_uuid [PlanningWorkItem.new(execution_plan_uuid, :default, @world.id)] end
start_execution(execution_plan_id, finished)
click to toggle source
# File lib/dynflow/director.rb, line 185 def start_execution(execution_plan_id, finished) manager = track_execution_plan(execution_plan_id, finished) return [] unless manager unless_done(manager, manager.start) end
terminate()
click to toggle source
# File lib/dynflow/director.rb, line 233 def terminate unless @execution_plan_managers.empty? logger.error "... cleaning #{@execution_plan_managers.size} execution plans ..." begin @execution_plan_managers.values.each do |manager| manager.terminate end rescue Errors::PersistenceError logger.error "could not to clean the data properly" end @execution_plan_managers.values.each do |manager| finish_manager(manager) end end end
work_failed(work)
click to toggle source
called when there was an unhandled exception during the execution of the work (such as persistence issue) - in this case we just clean up the runtime from the execution plan and let it go (common cause for this is the execution plan being removed from database by external user)
# File lib/dynflow/director.rb, line 224 def work_failed(work) if (manager = @execution_plan_managers[work.execution_plan_id]) manager.terminate # Don't try to store when the execution plan went missing plan_missing = @world.persistence.find_execution_plans(:filters => { uuid: work.execution_plan_id }).empty? finish_manager(manager, store: !plan_missing) end end
work_finished(work)
click to toggle source
# File lib/dynflow/director.rb, line 207 def work_finished(work) case work when PlanningWorkItem @planning_plans.delete(work.execution_plan_id) @world.persistence.delete_delayed_plans(:execution_plan_uuid => work.execution_plan_id) [] else manager = @execution_plan_managers[work.execution_plan_id] return [] unless manager # skip case when getting event from execution plan that is not running anymore unless_done(manager, manager.what_is_next(work)) end end
Private Instance Methods
finish_manager(manager, store: true)
click to toggle source
# File lib/dynflow/director.rb, line 284 def finish_manager(manager, store: true) update_execution_plan_state(manager) if store return [] ensure @execution_plan_managers.delete(manager.execution_plan.id) set_future(manager) end
halt_execution(execution_plan_id)
click to toggle source
# File lib/dynflow/director.rb, line 255 def halt_execution(execution_plan_id) manager = @execution_plan_managers[execution_plan_id] @logger.warn "Halting execution plan #{execution_plan_id}" return halt_inactive(execution_plan_id) unless manager manager.halt finish_manager manager end
halt_inactive(execution_plan_id)
click to toggle source
# File lib/dynflow/director.rb, line 264 def halt_inactive(execution_plan_id) plan = @world.persistence.load_execution_plan(execution_plan_id) plan.update_state(:stopped) rescue => e @logger.error e end
rescue!(manager)
click to toggle source
# File lib/dynflow/director.rb, line 306 def rescue!(manager) # TODO: after moving to concurrent-ruby actors, there should be better place # to put this logic of making sure we don't run rescues in endless loop @rescued_steps[manager.execution_plan.id] ||= Set.new @rescued_steps[manager.execution_plan.id].merge(manager.execution_plan.failed_steps.map(&:id)) new_state = manager.execution_plan.prepare_for_rescue if new_state == :running return manager.restart else manager.execution_plan.update_state(new_state) return false end end
rescue?(manager)
click to toggle source
# File lib/dynflow/director.rb, line 292 def rescue?(manager) if @world.terminating? || !(@world.auto_rescue && manager.execution_plan.error?) false elsif !@rescued_steps.key?(manager.execution_plan.id) # we have not rescued this plan yet true else # we have rescued this plan already, but a different step has failed now # we do this check to prevent endless loop, if we always failed on the same steps failed_step_ids = manager.execution_plan.failed_steps.map(&:id).to_set (failed_step_ids - @rescued_steps[manager.execution_plan.id]).any? end end
set_future(manager)
click to toggle source
# File lib/dynflow/director.rb, line 362 def set_future(manager) @rescued_steps.delete(manager.execution_plan.id) manager.future.fulfill manager.execution_plan end
track_execution_plan(execution_plan_id, finished)
click to toggle source
# File lib/dynflow/director.rb, line 320 def track_execution_plan(execution_plan_id, finished) execution_plan = @world.persistence.load_execution_plan(execution_plan_id) if @execution_plan_managers[execution_plan_id] raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's already running" end if execution_plan.state == :stopped raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's stopped" end lock_class = Coordinator::ExecutionInhibitionLock filters = { class: lock_class.to_s, owner_id: lock_class.lock_id(execution_plan_id) } if @world.coordinator.find_records(filters).any? halt_execution(execution_plan_id) raise Dynflow::Error, "cannot execute execution_plan_id:#{execution_plan_id} it's execution is inhibited" end @execution_plan_managers[execution_plan_id] = ExecutionPlanManager.new(@world, execution_plan, finished) rescue Dynflow::Error => e finished.reject e nil end
try_to_rescue(manager)
click to toggle source
# File lib/dynflow/director.rb, line 280 def try_to_rescue(manager) rescue!(manager) if rescue?(manager) end
unless_done(manager, work_items)
click to toggle source
# File lib/dynflow/director.rb, line 271 def unless_done(manager, work_items) return [] unless manager if manager.done? try_to_rescue(manager) || finish_manager(manager) else return work_items end end
update_execution_plan_state(manager)
click to toggle source
# File lib/dynflow/director.rb, line 348 def update_execution_plan_state(manager) execution_plan = manager.execution_plan case execution_plan.state when :running if execution_plan.error? execution_plan.update_state(:paused) elsif manager.done? execution_plan.update_state(:stopped) end # If the state is marked as running without errors but manager is not done, # we let the invalidation procedure to handle re-execution on other executor end end