class GoodJob::Scheduler

Schedulers are generic thread pools that are responsible for periodically checking for available tasks, executing tasks within a thread, and efficiently scaling active threads.

Every scheduler has a single {JobPerformer} that will execute tasks. The scheduler is responsible for calling its performer efficiently across threads managed by an instance of Concurrent::ThreadPoolExecutor. If a performer does not have work, the thread will go to sleep. The scheduler maintains an instance of Concurrent::TimerTask, which wakes sleeping threads and causes them to check whether the performer has new work.

Constants

DEFAULT_EXECUTOR_OPTIONS

Defaults for instance of Concurrent::ThreadPoolExecutor The thread pool executor is where work is performed.

Attributes

executor[R]
performer[R]
timer_set[R]

Public Class Methods

from_configuration(configuration, warm_cache_on_initialize: false) click to toggle source

Creates GoodJob::Scheduler(s) and Performers from a GoodJob::Configuration instance. @param configuration [GoodJob::Configuration] @param warm_cache_on_initialize [Boolean] @return [GoodJob::Scheduler, GoodJob::MultiScheduler]

   # File lib/good_job/scheduler.rb
41 def self.from_configuration(configuration, warm_cache_on_initialize: false)
42   schedulers = configuration.queue_string.split(';').map do |queue_string_and_max_threads|
43     queue_string, max_threads = queue_string_and_max_threads.split(':')
44     max_threads = (max_threads || configuration.max_threads).to_i
45 
46     job_performer = GoodJob::JobPerformer.new(queue_string)
47     GoodJob::Scheduler.new(
48       job_performer,
49       max_threads: max_threads,
50       max_cache: configuration.max_cache,
51       warm_cache_on_initialize: warm_cache_on_initialize
52     )
53   end
54 
55   if schedulers.size > 1
56     GoodJob::MultiScheduler.new(schedulers)
57   else
58     schedulers.first
59   end
60 end
new(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false) click to toggle source

@param performer [GoodJob::JobPerformer] @param max_threads [Numeric, nil] number of seconds between polls for jobs @param max_cache [Numeric, nil] maximum number of scheduled jobs to cache in memory @param warm_cache_on_initialize [Boolean] whether to warm the cache immediately, or manually by calling warm_cache

   # File lib/good_job/scheduler.rb
66 def initialize(performer, max_threads: nil, max_cache: nil, warm_cache_on_initialize: false)
67   raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
68 
69   self.class.instances << self
70 
71   @performer = performer
72 
73   @max_cache = max_cache || 0
74   @executor_options = DEFAULT_EXECUTOR_OPTIONS.dup
75   if max_threads.present?
76     @executor_options[:max_threads] = max_threads
77     @executor_options[:max_queue] = max_threads
78   end
79   @executor_options[:name] = "GoodJob::Scheduler(queues=#{@performer.name} max_threads=#{@executor_options[:max_threads]})"
80 
81   create_executor
82   warm_cache if warm_cache_on_initialize
83 end

Public Instance Methods

create_thread(state = nil) click to toggle source

Wakes a thread to allow the performer to execute a task. @param state [Hash, nil] Contextual information for the performer. See {JobPerformer#next?}. @return [Boolean, nil] Whether work was started.

* +nil+ if the scheduler is unable to take new work, for example if the thread pool is shut down or at capacity.
* +true+ if the performer started executing work.
* +false+ if the performer decides not to attempt to execute a task based on the +state+ that is passed to it.
    # File lib/good_job/scheduler.rb
138 def create_thread(state = nil)
139   return nil unless executor.running?
140 
141   if state
142     return false unless performer.next?(state)
143 
144     if state[:scheduled_at]
145       scheduled_at = if state[:scheduled_at].is_a? String
146                        Time.zone.parse state[:scheduled_at]
147                      else
148                        state[:scheduled_at]
149                      end
150       delay = [(scheduled_at - Time.current).to_f, 0].max
151     end
152   end
153 
154   delay ||= 0
155   run_now = delay <= 0.01
156   if run_now
157     return nil unless executor.ready_worker_count.positive?
158   elsif @max_cache.positive?
159     return nil unless remaining_cache_count.positive?
160   end
161 
162   create_task(delay)
163 
164   run_now ? true : nil
165 end
restart(timeout: -1) click to toggle source

Restart the Scheduler. When shutdown, start; or shutdown and start. @param timeout [nil, Numeric] Seconds to wait for actively executing jobs to finish; shares same values as {#shutdown}. @return [void]

    # File lib/good_job/scheduler.rb
123 def restart(timeout: -1)
124   instrument("scheduler_restart_pools") do
125     shutdown(timeout: timeout) if running?
126     create_executor
127     warm_cache
128   end
129 end
shutdown(timeout: -1) click to toggle source

Shut down the scheduler. This stops all threads in the thread pool. Use {#shutdown?} to determine whether threads have stopped. @param timeout [Numeric, nil] Seconds to wait for actively executing jobs to finish

* +nil+, the scheduler will trigger a shutdown but not wait for it to complete.
* +-1+, the scheduler will wait until the shutdown is complete.
* +0+, the scheduler will immediately shutdown and stop any active tasks.
* A positive number will wait that many seconds before stopping any remaining active tasks.

@return [void]

    # File lib/good_job/scheduler.rb
102 def shutdown(timeout: -1)
103   return if executor.nil? || executor.shutdown?
104 
105   instrument("scheduler_shutdown_start", { timeout: timeout })
106   instrument("scheduler_shutdown", { timeout: timeout }) do
107     if executor.running?
108       @timer_set.shutdown
109       executor.shutdown
110     end
111 
112     if executor.shuttingdown? && timeout
113       executor_wait = timeout.negative? ? nil : timeout
114       executor.kill unless executor.wait_for_termination(executor_wait)
115     end
116   end
117 end
stats() click to toggle source

Information about the Scheduler @return [Hash]

    # File lib/good_job/scheduler.rb
180 def stats
181   {
182     name: performer.name,
183     max_threads: @executor_options[:max_threads],
184     active_threads: @executor_options[:max_threads] - executor.ready_worker_count,
185     available_threads: executor.ready_worker_count,
186     max_cache: @max_cache,
187     active_cache: cache_count,
188     available_cache: remaining_cache_count,
189   }
190 end
task_observer(time, output, thread_error) click to toggle source

Invoked on completion of ThreadPoolExecutor task @!visibility private @return [void]

    # File lib/good_job/scheduler.rb
170 def task_observer(time, output, thread_error)
171   error = thread_error || (output.is_a?(GoodJob::ExecutionResult) ? output.unhandled_error : nil)
172   GoodJob.on_thread_error.call(error) if error && GoodJob.on_thread_error.respond_to?(:call)
173 
174   instrument("finished_job_task", { result: output, error: thread_error, time: time })
175   create_task if output
176 end
warm_cache() click to toggle source

Preload existing runnable and future-scheduled jobs @return [void]

    # File lib/good_job/scheduler.rb
194 def warm_cache
195   return if @max_cache.zero?
196 
197   future = Concurrent::Future.new(args: [self, @performer], executor: executor) do |thr_scheduler, thr_performer|
198     Rails.application.executor.wrap do
199       thr_performer.next_at(
200         limit: @max_cache,
201         now_limit: @executor_options[:max_threads]
202       ).each do |scheduled_at|
203         thr_scheduler.create_thread({ scheduled_at: scheduled_at })
204       end
205     end
206   end
207 
208   observer = lambda do |_time, _output, thread_error|
209     GoodJob.on_thread_error.call(thread_error) if thread_error && GoodJob.on_thread_error.respond_to?(:call)
210     create_task # If cache-warming exhausts the threads, ensure there isn't an executable task remaining
211   end
212   future.add_observer(observer, :call)
213 
214   future.execute
215 end

Private Instance Methods

cache_count() click to toggle source
    # File lib/good_job/scheduler.rb
254 def cache_count
255   timer_set.length
256 end
create_executor() click to toggle source

@return [void]

    # File lib/good_job/scheduler.rb
222 def create_executor
223   instrument("scheduler_create_pool", { performer_name: performer.name, max_threads: @executor_options[:max_threads] }) do
224     @timer_set = TimerSet.new
225     @executor = ThreadPoolExecutor.new(@executor_options)
226   end
227 end
create_task(delay = 0) click to toggle source

@param delay [Integer] @return [void]

    # File lib/good_job/scheduler.rb
231 def create_task(delay = 0)
232   future = Concurrent::ScheduledTask.new(delay, args: [performer], executor: executor, timer_set: timer_set) do |thr_performer|
233     Rails.application.executor.wrap do
234       thr_performer.next
235     end
236   end
237   future.add_observer(self, :task_observer)
238   future.execute
239 end
instrument(name, payload = {}, &block) click to toggle source

@param name [String] @param payload [Hash] @return [void]

    # File lib/good_job/scheduler.rb
244 def instrument(name, payload = {}, &block)
245   payload = payload.reverse_merge({
246                                     scheduler: self,
247                                     process_id: GoodJob::CurrentExecution.process_id,
248                                     thread_name: GoodJob::CurrentExecution.thread_name,
249                                   })
250 
251   ActiveSupport::Notifications.instrument("#{name}.good_job", payload, &block)
252 end
remaining_cache_count() click to toggle source
    # File lib/good_job/scheduler.rb
258 def remaining_cache_count
259   @max_cache - cache_count
260 end