class Que::Job
Constants
- MAXIMUM_TAGS_COUNT
- MAXIMUM_TAG_LENGTH
Attributes
maximum_retry_count[RW]
Job
class configuration options.
priority[RW]
Job
class configuration options.
queue[RW]
Job
class configuration options.
retry_interval[RW]
Job
class configuration options.
run_at[RW]
Job
class configuration options.
run_synchronously[RW]
Job
class configuration options.
que_attrs[R]
que_error[RW]
que_resolved[RW]
Public Class Methods
_bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:)
click to toggle source
# File lib/que/job.rb, line 155 def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) } if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})" end job_options[:tags].each do |tag| if tag.length > MAXIMUM_TAG_LENGTH raise Que::Error, "Can't enqueue a job with a tag longer than 100 characters! (\"#{tag}\")" end end end args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs| args_and_kwargs.merge( args: args_and_kwargs.fetch(:args, []), kwargs: args_and_kwargs.fetch(:kwargs, {}), ) end attrs = { queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, priority: job_options[:priority] || resolve_que_setting(:priority), run_at: job_options[:run_at] || resolve_que_setting(:run_at), args_and_kwargs_array: args_and_kwargs_array, data: job_options[:tags] ? { tags: job_options[:tags] } : {}, job_class: \ job_options[:job_class] || name || raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), } if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array))) args_and_kwargs_array.map do |args_and_kwargs| _run_attrs( attrs.merge( args: args_and_kwargs.fetch(:args), kwargs: args_and_kwargs.fetch(:kwargs), ), ) end else attrs.merge!( args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]), data: Que.serialize_json(attrs[:data]), ) values_array = Que.transaction do Que.execute('SET LOCAL que.skip_notify TO true') unless notify Que.execute( :bulk_insert_jobs, attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data), ) end values_array.map(&method(:new)) end end
bulk_enqueue(job_options: {}, notify: false) { || ... }
click to toggle source
# File lib/que/job.rb, line 140 def bulk_enqueue(job_options: {}, notify: false) raise Que::Error, "Can't nest .bulk_enqueue" unless Thread.current[:que_jobs_to_bulk_insert].nil? Thread.current[:que_jobs_to_bulk_insert] = { jobs_attrs: [], job_options: job_options } yield jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options] return [] if jobs_attrs.empty? raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one? args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) } klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class]) klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify) ensure Thread.current[:que_jobs_to_bulk_insert] = nil end
enqueue(*args)
click to toggle source
# File lib/que/job.rb, line 80 def enqueue(*args) args, kwargs = Que.split_out_ruby2_keywords(args) job_options = kwargs.delete(:job_options) || {} if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})" end job_options[:tags].each do |tag| if tag.length > MAXIMUM_TAG_LENGTH raise Que::Error, "Can't enqueue a job with a tag longer than 100 characters! (\"#{tag}\")" end end end attrs = { queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, priority: job_options[:priority] || resolve_que_setting(:priority), run_at: job_options[:run_at] || resolve_que_setting(:run_at), args: args, kwargs: kwargs, data: job_options[:tags] ? { tags: job_options[:tags] } : {}, job_class: \ job_options[:job_class] || name || raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), } if Thread.current[:que_jobs_to_bulk_insert] if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' raise Que::Error, "Que.bulk_enqueue does not support ActiveJob." end raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {} Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs new({}) elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) attrs.merge!( args: Que.deserialize_json(Que.serialize_json(attrs[:args])), kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])), data: Que.deserialize_json(Que.serialize_json(attrs[:data])), ) _run_attrs(attrs) else attrs.merge!( args: Que.serialize_json(attrs[:args]), kwargs: Que.serialize_json(attrs[:kwargs]), data: Que.serialize_json(attrs[:data]), ) values = Que.execute( :insert_job, attrs.values_at(:queue, :priority, :run_at, :job_class, :args, :kwargs, :data), ).first new(values) end end
new(attrs)
click to toggle source
# File lib/que/job.rb, line 53 def initialize(attrs) @que_attrs = attrs Que.internal_log(:job_instantiate, self) { attrs } end
resolve_que_setting(setting, *args)
click to toggle source
# File lib/que/job.rb, line 227 def resolve_que_setting(setting, *args) value = send(setting) if respond_to?(setting) if !value.nil? value.respond_to?(:call) ? value.call(*args) : value else c = superclass if c.respond_to?(:resolve_que_setting) c.resolve_que_setting(setting, *args) end end end
run(*args)
click to toggle source
# File lib/que/job.rb, line 215 def run(*args) # Make sure things behave the same as they would have with a round-trip # to the DB. args, kwargs = Que.split_out_ruby2_keywords(args) args = Que.deserialize_json(Que.serialize_json(args)) kwargs = Que.deserialize_json(Que.serialize_json(kwargs)) # Should not fail if there's no DB connection. _run_attrs(args: args, kwargs: kwargs) end
Private Class Methods
_run_attrs(attrs)
click to toggle source
# File lib/que/job.rb, line 242 def _run_attrs(attrs) attrs[:error_count] = 0 Que.recursively_freeze(attrs) new(attrs).tap do |job| Que.run_job_middleware(job) do job._run(reraise_errors: true) end end end
Public Instance Methods
run(*args)
click to toggle source
Subclasses should define their own run methods, but keep an empty one here so that Que::Job.enqueue
can queue an empty job in testing.
# File lib/que/job.rb, line 60 def run(*args) end
Private Instance Methods
que_target()
click to toggle source
Have the job helper methods act on this object.
# File lib/que/job.rb, line 66 def que_target self end