class GoodJob::Scheduler
Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.
Every scheduler has a single {JobPerformer} that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor
. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask
, which wakes sleeping threads and causes them to check whether the performer has new work.
Constants
- DEFAULT_EXECUTOR_OPTIONS
Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool executor is where work is performed.
Attributes
Public Class Methods
Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration
instance. @param configuration [GoodJob::Configuration] @param warm_cache_on_initialize [Boolean] @return [GoodJob::Scheduler, GoodJob::MultiScheduler]
# File lib/good_job/scheduler.rb 41 def self.from_configuration(configuration, warm_cache_on_initialize: false) 42 schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads| 43 queue_string, max_threads = queue_string_and_max_threads.split(':') 44 max_threads = (max_threads || configuration.max_threads).to_i 45 46 job_performer = GoodJob::JobPerformer.new(queue_string) 47 GoodJob::Scheduler.new( 48 job_performer, 49 max_threads: max_threads, 50 max_cache: configuration.max_cache, 51 warm_cache_on_initialize: warm_cache_on_initialize 52 ) 53 end 54 55 if schedulers.size > 1 56 GoodJob::MultiScheduler.new(schedulers) 57 else 58 schedulers.first 59 end 60 end
@param performer [GoodJob::JobPerformer] @param max_threads [Numeric, nil] number of seconds between polls for jobs @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling warm_cache
# File lib/good_job/scheduler.rb 66 def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false) 67 raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) 68 69 self.class.instances << self 70 71 @performer = performer 72 73 @max_cache = max_cache || 0 74 @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup 75 if max_threads.present? 76 @executor_options[:max_threads] = max_threads 77 @executor_options[:max_queue] = max_threads 78 end 79 @executor_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})" 80 81 create_executor 82 warm_cache if warm_cache_on_initialize 83 end
Public Instance Methods
Wakes a thread to allow the performer to execute a task. @param state [Hash, nil] Contextual information for the performer. See {JobPerformer#next?}. @return [Boolean, nil] Whether work was started.
* +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity. * +true+ if the performer started executing work. * +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it.
# File lib/good_job/scheduler.rb 138 def create_thread(state = nil) 139 return nil unless executor.running? 140 141 if state 142 return false unless performer.next?(state) 143 144 if state[:scheduled_at] 145 scheduled_at = if state[:scheduled_at].is_a? String 146 Time.zone.parse state[:scheduled_at] 147 else 148 state[:scheduled_at] 149 end 150 delay = [(scheduled_at - Time.current).to_f, 0].max 151 end 152 end 153 154 delay ||= 0 155 run_now = delay <= 0.01 156 if run_now 157 return nil unless executor.ready_worker_count.positive? 158 elsif @max_cache.positive? 159 return nil unless remaining_cache_count.positive? 160 end 161 162 create_task(delay) 163 164 run_now ? true : nil 165 end
Restart the Scheduler
. When shutdown, start; or shutdown and start. @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish; shares same values as {#shutdown}. @return [void]
# File lib/good_job/scheduler.rb 123 def restart(timeout: -1) 124 instrument("scheduler_restart_pools") do 125 shutdown(timeout: timeout) if running? 126 create_executor 127 warm_cache 128 end 129 end
Shut down the scheduler. This stops all threads in the thread pool. Use {#shutdown?} to determine whether threads have stopped. @param timeout [Numeric, nil] Seconds to wait for actively executing jobs to finish
* +nil+, the scheduler will trigger a shutdown but not wait for it to complete. * +-1+, the scheduler will wait until the shutdown is complete. * +0+, the scheduler will immediately shutdown and stop any active tasks. * A positive number will wait that many seconds before stopping any remaining active tasks.
@return [void]
# File lib/good_job/scheduler.rb 102 def shutdown(timeout: -1) 103 return if executor.nil? || executor.shutdown? 104 105 instrument("scheduler_shutdown_start", { timeout: timeout }) 106 instrument("scheduler_shutdown", { timeout: timeout }) do 107 if executor.running? 108 @timer_set.shutdown 109 executor.shutdown 110 end 111 112 if executor.shuttingdown? && timeout 113 executor_wait = timeout.negative? ? nil : timeout 114 executor.kill unless executor.wait_for_termination(executor_wait) 115 end 116 end 117 end
Information about the Scheduler
@return [Hash]
# File lib/good_job/scheduler.rb 180 def stats 181 { 182 name: performer.name, 183 max_threads: @executor_options[:max_threads], 184 active_threads: @executor_options[:max_threads] - executor.ready_worker_count, 185 available_threads: executor.ready_worker_count, 186 max_cache: @max_cache, 187 active_cache: cache_count, 188 available_cache: remaining_cache_count, 189 } 190 end
Invoked on completion of ThreadPoolExecutor
task @!visibility private @return [void]
# File lib/good_job/scheduler.rb 170 def task_observer(time, output, thread_error) 171 error = thread_error || (output.is_a?(GoodJob::ExecutionResult) ? output.unhandled_error : nil) 172 GoodJob.on_thread_error.call(error) if error && GoodJob.on_thread_error.respond_to?(:call) 173 174 instrument("finished_job_task", { result: output, error: thread_error, time: time }) 175 create_task if output 176 end
Preload existing runnable and future-scheduled jobs @return [void]
# File lib/good_job/scheduler.rb 194 def warm_cache 195 return if @max_cache.zero? 196 197 future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |thr_scheduler, thr_performer| 198 Rails.application.executor.wrap do 199 thr_performer.next_at( 200 limit: @max_cache, 201 now_limit: @executor_options[:max_threads] 202 ).each do |scheduled_at| 203 thr_scheduler.create_thread({ scheduled_at: scheduled_at }) 204 end 205 end 206 end 207 208 observer = lambda do |_time, _output, thread_error| 209 GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call) 210 create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining 211 end 212 future.add_observer(observer, :call) 213 214 future.execute 215 end
Private Instance Methods
# File lib/good_job/scheduler.rb 254 def cache_count 255 timer_set.length 256 end
@return [void]
# File lib/good_job/scheduler.rb 222 def create_executor 223 instrument("scheduler_create_pool", { performer_name: performer.name, max_threads: @executor_options[:max_threads] }) do 224 @timer_set = TimerSet.new 225 @executor = ThreadPoolExecutor.new(@executor_options) 226 end 227 end
@param delay [Integer] @return [void]
# File lib/good_job/scheduler.rb 231 def create_task(delay = 0) 232 future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer| 233 Rails.application.executor.wrap do 234 thr_performer.next 235 end 236 end 237 future.add_observer(self, :task_observer) 238 future.execute 239 end
@param name [String] @param payload [Hash] @return [void]
# File lib/good_job/scheduler.rb 244 def instrument(name, payload = {}, &block) 245 payload = payload.reverse_merge({ 246 scheduler: self, 247 process_id: GoodJob::CurrentExecution.process_id, 248 thread_name: GoodJob::CurrentExecution.thread_name, 249 }) 250 251 ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block) 252 end
# File lib/good_job/scheduler.rb 258 def remaining_cache_count 259 @max_cache - cache_count 260 end