class Tumugi::Executor::LocalExecutor

Public Class Methods

new(dag, worker_num: 1, run_all: false) click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 10
def initialize(dag, worker_num: 1, run_all: false)
  @dag = dag
  @main_task = dag.tsort.last
  @worker_num = worker_num
  @run_all = run_all
  @mutex = Mutex.new
end

Public Instance Methods

execute() click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 18
def execute
  pool = Concurrent::ThreadPoolExecutor.new(
    min_threads: @worker_num,
    max_threads: @worker_num
  )

  setup_task_queue(@dag)
  loop do
    task = dequeue_task
    break if task.nil?

    Concurrent::Future.execute(executor: pool) do
      if !task.runnable?(Time.now)
        logger.trace { "task_not_runnable: #{task.id}" }
        enqueue_task(task)
      else
        begin
          logger.info { "task_start: #{task.id}" }
          task.trigger!(:start)
          MuchTimeout.optional_timeout(task_timeout(task), Tumugi::TimeoutError) do
            task.run
          end
          task.trigger!(:complete)
          logger.info { "task_#{task.state}: #{task.id}, elapsed_time: #{task.elapsed_time}" }
          task.on_success
        rescue => e
          handle_error(task, e)
        end
      end
    end
  end

  pool.shutdown
  pool.wait_for_termination

  @dag.tsort.all? { |t| t.success? }
end

Private Instance Methods

dequeue_task() click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 70
def dequeue_task
  loop do
    task = @mutex.synchronize {
      logger.trace { "task_queue_dump: #{@queue.map(&:id)}" } unless @queue.empty?
      @queue.shift
    }

    if task.nil?
      if @main_task.finished?
        break nil
      else
        sleep(0.1)
      end
    else
      logger.trace { "task_queue_dequeue: #{task.id}" }

      if task.requires_failed?
        task.trigger!(:requires_fail)
        logger.info { "task_#{task.state}: #{task.id} has failed requires task, elapsed_time: #{task.elapsed_time}" }
      elsif task.completed? && !@run_all
        task.trigger!(:skip)
        logger.info { "task_#{task.state}: #{task.id} is already completed, elapsed_time: #{task.elapsed_time}" }
      else
        break task
      end
    end
  end
end
enqueue_task(task) click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 99
def enqueue_task(task)
  logger.trace { "task_queue_enqueue: #{task.id}" }
  @mutex.synchronize { @queue.push(task) }
end
handle_error(task, err) click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 104
def handle_error(task, err)
  if task.retry
    task.trigger!(:pend)
    logger.info { "task_#{task.state}: #{task.id} failed, elapsed_time: #{task.elapsed_time}" }
    logger.error { "#{err.class}: '#{err.message}' - #{task.tries} tries and wait #{task.retry_interval} seconds until the next try." }
    enqueue_task(task)
    task.on_retry
  else
    task.trigger!(:fail)
    logger.info { "task_#{task.state}: #{task.id} failed, elapsed_time: #{task.elapsed_time}" }
    logger.error { "#{err.class}: '#{err.message}' - #{task.tries} tries and reached max retry count, so task #{task.id} failed." }
    task.on_failure
  end
  logger.error { err.backtrace.join("\n") }
end
logger() click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 120
def logger
  @logger ||= Tumugi::ScopedLogger.new("tumugi-executor")
end
setup_task_queue(dag) click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 64
def setup_task_queue(dag)
  @queue = []
  dag.tsort.each { |t| enqueue_task(t) }
  @queue
end
task_timeout(task) click to toggle source
# File lib/tumugi/executor/local_executor.rb, line 58
def task_timeout(task)
  timeout = task.timeout || Tumugi.config.timeout
  timeout = nil if !timeout.nil? && timeout == 0 # for backward compatibility
  timeout
end