class GoodJob::Job
ActiveRecord model that represents an ActiveJob
job. Parent class can be configured with GoodJob.active_record_parent_class
. @!parse
class Job < ActiveRecord::Base; end
Constants
Public Class Methods
Places an ActiveJob
job on a queue by creating a new {Job} record. @param active_job [ActiveJob::Base]
The job to enqueue.
@param scheduled_at [Float]
Epoch timestamp when the job should be executed.
@param create_with_advisory_lock [Boolean]
Whether to establish a lock on the {Job} record after it is created.
@return [Job]
The new {Job} instance representing the queued ActiveJob job.
# File lib/good_job/job.rb 208 def self.enqueue(active_job, scheduled_at: nil, create_with_advisory_lock: false) 209 ActiveSupport::Notifications.instrument("enqueue_job.good_job", { active_job: active_job, scheduled_at: scheduled_at, create_with_advisory_lock: create_with_advisory_lock }) do |instrument_payload| 210 good_job_args = { 211 active_job_id: active_job.job_id, 212 queue_name: active_job.queue_name.presence || DEFAULT_QUEUE_NAME, 213 priority: active_job.priority || DEFAULT_PRIORITY, 214 serialized_params: active_job.serialize, 215 scheduled_at: scheduled_at, 216 create_with_advisory_lock: create_with_advisory_lock, 217 } 218 219 good_job_args[:concurrency_key] = active_job.good_job_concurrency_key if active_job.respond_to?(:good_job_concurrency_key) 220 221 if CurrentExecution.cron_key 222 good_job_args[:cron_key] = CurrentExecution.cron_key 223 elsif CurrentExecution.active_job_id == active_job.job_id 224 good_job_args[:cron_key] = CurrentExecution.good_job.cron_key 225 end 226 227 good_job = GoodJob::Job.new(**good_job_args) 228 229 instrument_payload[:good_job] = good_job 230 231 good_job.save! 232 active_job.provider_job_id = good_job.id 233 234 CurrentExecution.good_job.retried_good_job_id = good_job.id if CurrentExecution.good_job && CurrentExecution.good_job.active_job_id == active_job.job_id 235 236 good_job 237 end 238 end
Fetches the scheduled execution time of the next eligible Job(s). @param after [DateTime] @param limit [Integer] @param now_limit [Integer, nil] @return [Array<DateTime>]
# File lib/good_job/job.rb 184 def self.next_scheduled_at(after: nil, limit: 100, now_limit: nil) 185 query = advisory_unlocked.unfinished.schedule_ordered 186 187 after ||= Time.current 188 after_query = query.where('scheduled_at > ?', after).or query.where(scheduled_at: nil).where('created_at > ?', after) 189 after_at = after_query.limit(limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } 190 191 if now_limit&.positive? 192 now_query = query.where('scheduled_at < ?', Time.current).or query.where(scheduled_at: nil) 193 now_at = now_query.limit(now_limit).pluck(:scheduled_at, :created_at).map { |timestamps| timestamps.compact.first } 194 end 195 196 Array(now_at) + after_at 197 end
Finds the next eligible Job
, acquire an advisory lock related to it, and executes the job. @return [ExecutionResult, nil]
If a job was executed, returns an array with the {Job} record, the return value for the job's +#perform+ method, and the exception the job raised, if any (if the job raised, then the second array entry will be +nil+). If there were no jobs to execute, returns +nil+.
# File lib/good_job/job.rb 163 def self.perform_with_advisory_lock 164 unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |good_jobs| 165 good_job = good_jobs.first 166 break if good_job.blank? 167 break :unlocked unless good_job&.executable? 168 169 begin 170 good_job.with_advisory_lock(key: "good_jobs-#{good_job.active_job_id}") do 171 good_job.perform 172 end 173 rescue RecordAlreadyAdvisoryLockedError => e 174 ExecutionResult.new(value: nil, handled_error: e) 175 end 176 end 177 end
Parse a string representing a group of queues into a more readable data structure. @param string [String] Queue string @return [Hash]
How to match a given queue. It can have the following keys and values: - +{ all: true }+ indicates that all queues match. - +{ exclude: Array<String> }+ indicates the listed queue names should not match. - +{ include: Array<String> }+ indicates the listed queue names should match.
@example
GoodJob::Job.queue_parser('-queue1,queue2') => { exclude: [ 'queue1', 'queue2' ] }
# File lib/good_job/job.rb 34 def self.queue_parser(string) 35 string = string.presence || '*' 36 37 if string.first == '-' 38 exclude_queues = true 39 string = string[1..-1] 40 end 41 42 queues = string.split(',').map(&:strip) 43 44 if queues.include?('*') 45 { all: true } 46 elsif exclude_queues 47 { exclude: queues } 48 else 49 { include: queues } 50 end 51 end
Public Instance Methods
Tests whether this job is safe to be executed by this thread. @return [Boolean]
# File lib/good_job/job.rb 270 def executable? 271 self.class.unscoped.unfinished.owns_advisory_locked.exists?(id: id) 272 end
Execute the ActiveJob
job this {Job} represents. @return [ExecutionResult]
An array of the return value of the job's +#perform+ method and the exception raised by the job, if any. If the job completed successfully, the second array entry (the exception) will be +nil+ and vice versa.
# File lib/good_job/job.rb 245 def perform 246 raise PreviouslyPerformedError, 'Cannot perform a job that has already been performed' if finished_at 247 248 self.performed_at = Time.current 249 save! if GoodJob.preserve_job_records 250 251 result = execute 252 253 job_error = result.handled_error || result.unhandled_error 254 self.error = "#{job_error.class}: #{job_error.message}" if job_error 255 256 if result.unhandled_error && GoodJob.retry_on_unhandled_error 257 save! 258 elsif GoodJob.preserve_job_records == true || (result.unhandled_error && GoodJob.preserve_job_records == :on_unhandled_error) 259 self.finished_at = Time.current 260 save! 261 else 262 destroy! 263 end 264 265 result 266 end
Private Instance Methods
@return [ExecutionResult]
# File lib/good_job/job.rb 277 def execute 278 GoodJob::CurrentExecution.reset 279 GoodJob::CurrentExecution.good_job = self 280 281 job_data = serialized_params.deep_dup 282 job_data["provider_job_id"] = id 283 284 ActiveSupport::Notifications.instrument("perform_job.good_job", { good_job: self, process_id: GoodJob::CurrentExecution.process_id, thread_name: GoodJob::CurrentExecution.thread_name }) do 285 value = ActiveJob::Base.execute(job_data) 286 287 if value.is_a?(Exception) 288 handled_error = value 289 value = nil 290 end 291 handled_error ||= GoodJob::CurrentExecution.error_on_retry || GoodJob::CurrentExecution.error_on_discard 292 293 ExecutionResult.new(value: value, handled_error: handled_error) 294 rescue StandardError => e 295 ExecutionResult.new(value: nil, unhandled_error: e) 296 end 297 end