class Que::Worker
Constants
- VALID_LOG_LEVELS
Attributes
priority[R]
thread[R]
Public Class Methods
new( job_buffer:, result_queue:, priority: nil, start_callback: nil )
click to toggle source
# File lib/que/worker.rb, line 21 def initialize( job_buffer:, result_queue:, priority: nil, start_callback: nil ) @priority = Que.assert([NilClass, Integer], priority) @job_buffer = Que.assert(JobBuffer, job_buffer) @result_queue = Que.assert(ResultQueue, result_queue) Que.internal_log(:worker_instantiate, self) do { priority: priority, job_buffer: job_buffer.object_id, result_queue: result_queue.object_id, } end @thread = Thread.new do # An error causing this thread to exit is a bug in Que, which we want # to know about ASAP, so propagate the error if it happens. Thread.current.abort_on_exception = true start_callback.call(self) if start_callback.respond_to?(:call) work_loop end end
Public Instance Methods
wait_until_stopped()
click to toggle source
# File lib/que/worker.rb, line 50 def wait_until_stopped @thread.join end
Private Instance Methods
fetch_next_metajob()
click to toggle source
# File lib/que/worker.rb, line 93 def fetch_next_metajob @job_buffer.shift(*priority) end
work_job(metajob)
click to toggle source
# File lib/que/worker.rb, line 97 def work_job(metajob) job = metajob.job start = Time.now klass = Que.constantize(job.fetch(:job_class)) instance = klass.new(job) Que.run_job_middleware(instance) { instance.tap(&:_run) } elapsed = Time.now - start log_level = if instance.que_error :error else instance.log_level(elapsed) end if VALID_LOG_LEVELS.include?(log_level) log_message = { level: log_level, job: metajob.job, elapsed: elapsed, } if error = instance.que_error log_message[:event] = :job_errored log_message[:error] = "#{error.class}: #{error.message}".slice(0, 500) else log_message[:event] = :job_worked end Que.log(**log_message) end instance rescue => error Que.log( level: :debug, event: :job_errored, job: metajob.job, error: { class: error.class.to_s, message: error.message, backtrace: (error.backtrace || []).join("\n").slice(0, 10000), }, ) Que.notify_error(error) begin # If the Job class couldn't be resolved, use the default retry # backoff logic in Que::Job. job_class = (klass && klass <= Job) ? klass : Job error_count = job.fetch(:error_count) + 1 max_retry_count = job_class.resolve_que_setting(:maximum_retry_count) if max_retry_count && error_count > max_retry_count Que.execute :expire_job, [job.fetch(:id)] else delay = job_class. resolve_que_setting( :retry_interval, error_count, ) Que.execute :set_error, [ delay, "#{error.class}: #{error.message}".slice(0, 500), (error.backtrace || []).join("\n").slice(0, 10000), job.fetch(:id), ] end rescue # If we can't reach the database for some reason, too bad, but # don't let it crash the work loop. end error end
work_loop()
click to toggle source
# File lib/que/worker.rb, line 56 def work_loop # Blocks until a job of the appropriate priority is available. # `fetch_next_metajob` normally returns a job to be processed. # If the queue is shutting down it will return false, which breaks the loop and # lets the thread finish. while (metajob = fetch_next_metajob) != false # If metajob is nil instead of false, we've hit a rare race condition where # there was a job in the buffer when the worker code checked, but the job was # picked up by the time we got around to shifting it off the buffer. # Letting this case go unhandled leads to worker threads exiting pre-maturely, so # we check explicitly and continue the loop. next if metajob.nil? id = metajob.id Que.internal_log(:worker_received_job, self) { {id: id} } if Que.execute(:check_job, [id]).first Que.recursively_freeze(metajob.job) Que.internal_log(:worker_fetched_job, self) { {id: id} } work_job(metajob) else # The job was locked but doesn't exist anymore, due to a race # condition that exists because advisory locks don't obey MVCC. Not # necessarily a problem, but if it happens a lot it may be meaningful. Que.internal_log(:worker_job_lock_race_condition, self) { {id: id} } end Que.internal_log(:worker_pushing_finished_job, self) { {id: id} } @result_queue.push( metajob: metajob, message_type: :job_finished, ) end end