class Dynflow::Executors::Sidekiq::Core

Constants

TELEMETRY_UPDATE_INTERVAL

Attributes

logger[R]

Public Class Methods

new(world, *_args) click to toggle source
Calls superclass method Dynflow::Executors::Abstract::Core::new
# File lib/dynflow/executors/sidekiq/core.rb, line 28
def initialize(world, *_args)
  @world = world
  @logger = world.logger
  wait_for_orchestrator_lock
  super
  schedule_update_telemetry
  begin_startup!
end

Public Instance Methods

begin_startup!() click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 81
def begin_startup!
  WorkerJobs::DrainMarker.perform_async(@world.id)
  @recovery = true
end
execution_status(execution_plan_id = nil) click to toggle source

TODO: needs thoughs on how to implement it

# File lib/dynflow/executors/sidekiq/core.rb, line 49
def execution_status(execution_plan_id = nil)
  {}
end
feed_pool(work_items) click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 53
def feed_pool(work_items)
  work_items.each do |new_work|
    WorkerJobs::PerformWork.set(queue: suggest_queue(new_work)).perform_async(new_work)
  end
end
heartbeat() click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 37
def heartbeat
  super
  reacquire_orchestrator_lock
end
start_termination(*args) click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 42
def start_termination(*args)
  super
  release_orchestrator_lock
  finish_termination
end
startup_complete() click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 86
def startup_complete
  logger.info('Performing validity checks')
  @world.perform_validity_checks
  logger.info('Finished performing validity checks')
  if @world.delayed_executor && !@world.delayed_executor.started?
    @world.delayed_executor.start
  end
  @recovery = false
end
update_telemetry() click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 59
def update_telemetry
  sidekiq_queues = ::Sidekiq::Stats.new.queues
  @queues_options.keys.each do |queue|
    queue_size = sidekiq_queues[queue.to_s]
    if queue_size
      Dynflow::Telemetry.with_instance { |t| t.set_gauge(:dynflow_queue_size, queue_size, telemetry_options(queue)) }
    end
  end
  schedule_update_telemetry
end
work_finished(work, delayed_events = nil) click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 70
def work_finished(work, delayed_events = nil)
  # If the work item is sent in reply to a request from the current orchestrator, proceed
  if work.sender_orchestrator_id == @world.id
    super
  else
    # If we're in recovery, we can drop the work as the execution plan will be resumed during validity checks performed when leaving recovery
    # If we're not in recovery and receive an event from another orchestrator, it means it survived the queue draining.
    handle_unknown_work_item(work) unless @recovery
  end
end

Private Instance Methods

fallback_queue() click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 98
def fallback_queue
  :default
end
handle_unknown_work_item(work) click to toggle source

We take a look if an execution lock is already being held by an orchestrator (it should be the current one). If no lock is held we try to resume the execution plan if possible

# File lib/dynflow/executors/sidekiq/core.rb, line 112
def handle_unknown_work_item(work)
  # We are past recovery now, if we receive an event here, the execution plan will be most likely paused
  # We can either try to rescue it or turn it over to stopped
  execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name,
                                                 id: "execution-plan:#{work.execution_plan_id}").first
  if execution_lock.nil?
    plan = @world.persistence.load_execution_plan(work.execution_plan_id)
    should_resume = !plan.error? || plan.prepare_for_rescue == :running
    @world.execute(plan.id) if should_resume
  end
end
schedule_update_telemetry() click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 102
def schedule_update_telemetry
  @world.clock.ping(reference, TELEMETRY_UPDATE_INTERVAL, [:update_telemetry])
end
telemetry_options(queue) click to toggle source
# File lib/dynflow/executors/sidekiq/core.rb, line 106
def telemetry_options(queue)
  { queue: queue.to_s, world: @world.id }
end