class AsyncActiveJob::Runner

Attributes

logger[R]
queues[R]
reactor[R]
task_count[RW]

Public Class Methods

new(queues: nil) click to toggle source

@param queues [Array,nil]

# File lib/async_active_job/runner.rb, line 14
def initialize(queues: nil)
  @reactor = nil
  @logger = Rails.logger
  @interrupted = false
  @task_count = 0
  @queues = queues&.presence
end
start(queues: nil) click to toggle source
# File lib/async_active_job/runner.rb, line 8
def start(queues: nil)
  new(queues: queues).start
end

Public Instance Methods

run_once() click to toggle source
# File lib/async_active_job/runner.rb, line 38
def run_once
  task_limit = AsyncActiveJob.configuration.task_limit
  task_limit_sleep_duration = AsyncActiveJob.configuration.task_limit_sleep_duration
  if task_limit && task_count >= task_limit

    logger.debug { "Task limit #{task_limit} reached, sleeping for #{task_limit_sleep_duration} seconds" }
    reactor.sleep(task_limit_sleep_duration)
    return
  end

  async_active_job = AsyncActiveJob::Job.next_with_lock(queues)
  if async_active_job
    run_task do
      with_optional_timeout(AsyncActiveJob.configuration.max_run_timeout) do
        run_job(async_active_job)
      end
    end
    reactor.sleep(0)
  else
    no_job_sleep_duration = AsyncActiveJob.configuration.no_job_sleep_duration
    logger.debug { "No jobs, sleeping for #{no_job_sleep_duration} seconds" }
    reactor.sleep(no_job_sleep_duration)
  end
end
start() click to toggle source
# File lib/async_active_job/runner.rb, line 22
def start
  trap('TERM') { interrupt! }
  trap('INT') { interrupt! }

  ::Async::Reactor.run do
    @reactor = Async::Task.current.reactor
    loop do
      if interrupted?
        logger.info { 'Exiting...' }
        break
      end
      run_once
    end
  end
end

Private Instance Methods

interrupt!() click to toggle source
# File lib/async_active_job/runner.rb, line 91
def interrupt!
  @interrupted = true
end
interrupted?() click to toggle source
# File lib/async_active_job/runner.rb, line 95
def interrupted?
  @interrupted
end
run_job(async_active_job) click to toggle source
# File lib/async_active_job/runner.rb, line 74
def run_job(async_active_job)
  self.task_count += 1
  job_name = "AsyncActiveJob::Job##{async_active_job.id} (Job ID: #{async_active_job.active_job_id})"
  logger.debug { "Performing #{job_name}" }
  ms = Benchmark.ms { AsyncActiveJob::Job.perform_job(async_active_job) }
  logger.debug { format("#{job_name} performed in %.2fms", ms) }
  self.task_count -= 1
end
run_task(&block) click to toggle source
# File lib/async_active_job/runner.rb, line 83
def run_task(&block)
  Async::Task.new(reactor, &block).run
end
schedule_task(&block) click to toggle source
# File lib/async_active_job/runner.rb, line 87
def schedule_task(&block)
  reactor << Async::Task.new(reactor, &block).fiber
end
with_optional_timeout(duration) { || ... } click to toggle source
# File lib/async_active_job/runner.rb, line 68
def with_optional_timeout(duration, &block)
  return yield if duration.nil?

  reactor.with_timeout(duration, &block)
end