class AsyncActiveJob::Runner
Attributes
logger[R]
queues[R]
reactor[R]
task_count[RW]
Public Class Methods
new(queues: nil)
click to toggle source
@param queues [Array,nil]
# File lib/async_active_job/runner.rb, line 14 def initialize(queues: nil) @reactor = nil @logger = Rails.logger @interrupted = false @task_count = 0 @queues = queues&.presence end
start(queues: nil)
click to toggle source
# File lib/async_active_job/runner.rb, line 8 def start(queues: nil) new(queues: queues).start end
Public Instance Methods
run_once()
click to toggle source
# File lib/async_active_job/runner.rb, line 38 def run_once task_limit = AsyncActiveJob.configuration.task_limit task_limit_sleep_duration = AsyncActiveJob.configuration.task_limit_sleep_duration if task_limit && task_count >= task_limit logger.debug { "Task limit #{task_limit} reached, sleeping for #{task_limit_sleep_duration} seconds" } reactor.sleep(task_limit_sleep_duration) return end async_active_job = AsyncActiveJob::Job.next_with_lock(queues) if async_active_job run_task do with_optional_timeout(AsyncActiveJob.configuration.max_run_timeout) do run_job(async_active_job) end end reactor.sleep(0) else no_job_sleep_duration = AsyncActiveJob.configuration.no_job_sleep_duration logger.debug { "No jobs, sleeping for #{no_job_sleep_duration} seconds" } reactor.sleep(no_job_sleep_duration) end end
start()
click to toggle source
# File lib/async_active_job/runner.rb, line 22 def start trap('TERM') { interrupt! } trap('INT') { interrupt! } ::Async::Reactor.run do @reactor = Async::Task.current.reactor loop do if interrupted? logger.info { 'Exiting...' } break end run_once end end end
Private Instance Methods
interrupt!()
click to toggle source
# File lib/async_active_job/runner.rb, line 91 def interrupt! @interrupted = true end
interrupted?()
click to toggle source
# File lib/async_active_job/runner.rb, line 95 def interrupted? @interrupted end
run_job(async_active_job)
click to toggle source
# File lib/async_active_job/runner.rb, line 74 def run_job(async_active_job) self.task_count += 1 job_name = "AsyncActiveJob::Job##{async_active_job.id} (Job ID: #{async_active_job.active_job_id})" logger.debug { "Performing #{job_name}" } ms = Benchmark.ms { AsyncActiveJob::Job.perform_job(async_active_job) } logger.debug { format("#{job_name} performed in %.2fms", ms) } self.task_count -= 1 end
run_task(&block)
click to toggle source
# File lib/async_active_job/runner.rb, line 83 def run_task(&block) Async::Task.new(reactor, &block).run end
schedule_task(&block)
click to toggle source
# File lib/async_active_job/runner.rb, line 87 def schedule_task(&block) reactor << Async::Task.new(reactor, &block).fiber end
with_optional_timeout(duration) { || ... }
click to toggle source
# File lib/async_active_job/runner.rb, line 68 def with_optional_timeout(duration, &block) return yield if duration.nil? reactor.with_timeout(duration, &block) end