class GoodJob::Job

ActiveRecord model that represents an ActiveJob job. Parent class can be configured with GoodJob.active_record_parent_class. @!parse

class Job < ActiveRecord::Base; end

Constants

DEFAULT_PRIORITY

ActiveJob jobs without a priority attribute are given this priority.

DEFAULT_QUEUE_NAME

ActiveJob jobs without a queue_name attribute are placed on this queue.

PreviouslyPerformedError

Raised if something attempts to execute a previously completed Job again.

Public Class Methods

enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) click to toggle source

Places an ActiveJob job on a queue by creating a new {Job} record. @param active_job [ActiveJob::Base]

The job to enqueue.

@param scheduled_at [Float]

Epoch timestamp when the job should be executed.

@param create_with_advisory_lock [Boolean]

Whether to establish a lock on the {Job} record after it is created.

@return [Job]

The new {Job} instance representing the queued ActiveJob job.
    # File lib/good_job/job.rb
208 def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false)
209   ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload|
210     good_job_args = {
211       active_job_id: active_job.job_id,
212       queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME,
213       priority: active_job.priority || DEFAULT_PRIORITY,
214       serialized_params: active_job.serialize,
215       scheduled_at: scheduled_at,
216       create_with_advisory_lock: create_with_advisory_lock,
217     }
218 
219     good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key)
220 
221     if CurrentExecution.cron_key
222       good_job_args[:cron_key] = CurrentExecution.cron_key
223     elsif CurrentExecution.active_job_id == active_job.job_id
224       good_job_args[:cron_key] = CurrentExecution.good_job.cron_key
225     end
226 
227     good_job = GoodJob::Job.new(**good_job_args)
228 
229     instrument_payload[:good_job] = good_job
230 
231     good_job.save!
232     active_job.provider_job_id = good_job.id
233 
234     CurrentExecution.good_job.retried_good_job_id = good_job.id if CurrentExecution.good_job && CurrentExecution.good_job.active_job_id == active_job.job_id
235 
236     good_job
237   end
238 end
next_scheduled_at(after: nil, limit: 100, now_limit: nil) click to toggle source

Fetches the scheduled execution time of the next eligible Job(s). @param after [DateTime] @param limit [Integer] @param now_limit [Integer, nil] @return [Array<DateTime>]

    # File lib/good_job/job.rb
184 def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil)
185   query = advisory_unlocked.unfinished.schedule_ordered
186 
187   after ||= Time.current
188   after_query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after)
189   after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first }
190 
191   if now_limit&.positive?
192     now_query = query.where('scheduled_at < ?', Time.current).or query.where(scheduled_at: nil)
193     now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first }
194   end
195 
196   Array(now_at) + after_at
197 end
perform_with_advisory_lock() click to toggle source

Finds the next eligible Job, acquire an advisory lock related to it, and executes the job. @return [ExecutionResult, nil]

If a job was executed, returns an array with the {Job} record, the
return value for the job's +#perform+ method, and the exception the job
raised, if any (if the job raised, then the second array entry will be
+nil+). If there were no jobs to execute, returns +nil+.
    # File lib/good_job/job.rb
163 def self.perform_with_advisory_lock
164   unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |good_jobs|
165     good_job = good_jobs.first
166     break if good_job.blank?
167     break :unlocked unless good_job&.executable?
168 
169     begin
170       good_job.with_advisory_lock(key: "good_jobs-#{good_job.active_job_id}") do
171         good_job.perform
172       end
173     rescue RecordAlreadyAdvisoryLockedError => e
174       ExecutionResult.new(value: nil, handled_error: e)
175     end
176   end
177 end
queue_parser(string) click to toggle source

Parse a string representing a group of queues into a more readable data structure. @param string [String] Queue string @return [Hash]

How to match a given queue. It can have the following keys and values:
- +{ all: true }+ indicates that all queues match.
- +{ exclude: Array<String> }+ indicates the listed queue names should
  not match.
- +{ include: Array<String> }+ indicates the listed queue names should
  match.

@example

GoodJob::Job.queue_parser('-queue1,queue2')
=> { exclude: [ 'queue1', 'queue2' ] }
   # File lib/good_job/job.rb
34 def self.queue_parser(string)
35   string = string.presence || '*'
36 
37   if string.first == '-'
38     exclude_queues = true
39     string = string[1..-1]
40   end
41 
42   queues = string.split(',').map(&:strip)
43 
44   if queues.include?('*')
45     { all: true }
46   elsif exclude_queues
47     { exclude: queues }
48   else
49     { include: queues }
50   end
51 end

Public Instance Methods

executable?() click to toggle source

Tests whether this job is safe to be executed by this thread. @return [Boolean]

    # File lib/good_job/job.rb
270 def executable?
271   self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id)
272 end
perform() click to toggle source

Execute the ActiveJob job this {Job} represents. @return [ExecutionResult]

An array of the return value of the job's +#perform+ method and the
exception raised by the job, if any. If the job completed successfully,
the second array entry (the exception) will be +nil+ and vice versa.
    # File lib/good_job/job.rb
245 def perform
246   raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at
247 
248   self.performed_at = Time.current
249   save! if GoodJob.preserve_job_records
250 
251   result = execute
252 
253   job_error = result.handled_error || result.unhandled_error
254   self.error = "#{job_error.class}: #{job_error.message}" if job_error
255 
256   if result.unhandled_error && GoodJob.retry_on_unhandled_error
257     save!
258   elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error)
259     self.finished_at = Time.current
260     save!
261   else
262     destroy!
263   end
264 
265   result
266 end

Private Instance Methods

execute() click to toggle source

@return [ExecutionResult]

    # File lib/good_job/job.rb
277 def execute
278   GoodJob::CurrentExecution.reset
279   GoodJob::CurrentExecution.good_job = self
280 
281   job_data = serialized_params.deep_dup
282   job_data["provider_job_id"] = id
283 
284   ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do
285     value = ActiveJob::Base.execute(job_data)
286 
287     if value.is_a?(Exception)
288       handled_error = value
289       value = nil
290     end
291     handled_error ||= GoodJob::CurrentExecution.error_on_retry || GoodJob::CurrentExecution.error_on_discard
292 
293     ExecutionResult.new(value: value, handled_error: handled_error)
294   rescue StandardError => e
295     ExecutionResult.new(value: nil, unhandled_error: e)
296   end
297 end