class QueueManager::Task
Constants
- MARKED_REGEXP
- MARKER
Attributes
Instance of QueueManager::Task
provides detailed information about the task and allows you to manage it. You can change a job of the task, pass additional parameters or delete the task.
Instance of QueueManager::Task
provides detailed information about the task and allows you to manage it. You can change a job of the task, pass additional parameters or delete the task.
Public Class Methods
Add a new task to the queue
@param id [String] The unique identifier of the task @param job [Symbol] Job class name @param options [Hash] Hash of additional options
@return [QueueManager::Task] Instance of QueueManager::Task
# File lib/queue_manager/task.rb, line 23 def self.add(id, job:, **options) raise ArgumentError, 'Job should be present' unless job transaction do time = redis.zscore(config.queue, "#{MARKER}#{id}") || timestamp score = time + config.delay task = new(id, score) task.job = job task.options = options.to_json redis.multi do redis.zadd(config.queue, score, id) end logger.info "Add new task \"#{id}\" with job: \"#{job}\"" return task end end
Check the queue and run tasks
# File lib/queue_manager/task.rb, line 46 def self.handling_queue # Return the first element from range id, score = redis.zrange(config.queue, 0, 0, with_scores: true).flatten return false if id.blank? && score.blank? return false if score > timestamp new_score = timestamp + config.timeout if MARKED_REGEXP =~ id redis.zadd(config.queue, new_score, id) logger.info "Time is over for the task \"#{id}\". Updated time" else redis.zrem(config.queue, id) redis.zadd(config.queue, new_score, "#{MARKER}#{id}") logger.info "Task \"#{id}\" is taken into work" end original_id = id.tr('*', '') task = new(original_id, score.to_i) task.update_score(new_score) options = JSON.load(task.options).symbolize_keys task.job.constantize.public_send(:perform_later, task, original_id, **options) logger.info "Launched job: #{task.job}.perform_later(task, \"#{original_id}\", #{options})" end
@param id [String] The unique identifier of the task @param score [Fixnum] Timestamp of the task
# File lib/queue_manager/task.rb, line 83 def initialize(id, score) @id, @score = id, score end
Public Instance Methods
Remove task from the queue by score
@return [Boolean] True or false
# File lib/queue_manager/task.rb, line 115 def remove transaction do marked_id = "#{MARKER}#{id}" redis_score = redis.zscore(config.queue, marked_id) return false unless score.to_i == redis_score.to_i redis.multi do clear_task redis.zrem(config.queue, marked_id) end logger.info "The task \"#{id}\" is removed from the queue" end true end
Convert task to global id
@return [GlobalID] Instance of GlobalID
# File lib/queue_manager/task.rb, line 139 def to_global_id GlobalID.create(self, app: config.identifier, score: score) end
# File lib/queue_manager/task.rb, line 87 def update_score(value) transaction do _job, _options = job, options redis.multi do clear_task @score = value self.job = _job self.options = _options end end end
Private Instance Methods
# File lib/queue_manager/task.rb, line 145 def clear_task %w(job options).each do |name| redis.del("#{key}/#{name}") end end
# File lib/queue_manager/task.rb, line 151 def key "#{config.queue}/#{id}/#{score}" end