class QueueManager::Task

Constants

MARKED_REGEXP
MARKER

Attributes

id[R]

Instance of QueueManager::Task provides detailed information about the task and allows you to manage it. You can change a job of the task, pass additional parameters or delete the task.

score[R]

Instance of QueueManager::Task provides detailed information about the task and allows you to manage it. You can change a job of the task, pass additional parameters or delete the task.

Public Class Methods

add(id, job:, **options) click to toggle source

Add a new task to the queue

@param id [String] The unique identifier of the task @param job [Symbol] Job class name @param options [Hash] Hash of additional options

@return [QueueManager::Task] Instance of QueueManager::Task

# File lib/queue_manager/task.rb, line 23
def self.add(id, job:, **options)
  raise ArgumentError, 'Job should be present' unless job

  transaction do
    time = redis.zscore(config.queue, "#{MARKER}#{id}") || timestamp
    score = time + config.delay

    task = new(id, score)
    task.job = job
    task.options = options.to_json

    redis.multi do
      redis.zadd(config.queue, score, id)
    end

    logger.info "Add new task \"#{id}\" with job: \"#{job}\""
    return task
  end
end
handling_queue() click to toggle source

Check the queue and run tasks

# File lib/queue_manager/task.rb, line 46
def self.handling_queue
  # Return the first element from range
  id, score = redis.zrange(config.queue, 0, 0, with_scores: true).flatten

  return false if id.blank? && score.blank?
  return false if score > timestamp

  new_score = timestamp + config.timeout

  if MARKED_REGEXP =~ id
    redis.zadd(config.queue, new_score, id)
    logger.info "Time is over for the task \"#{id}\". Updated time"
  else
    redis.zrem(config.queue, id)
    redis.zadd(config.queue, new_score, "#{MARKER}#{id}")
    logger.info "Task \"#{id}\" is taken into work"
  end

  original_id = id.tr('*', '')
  task = new(original_id, score.to_i)
  task.update_score(new_score)
  options = JSON.load(task.options).symbolize_keys

  task.job.constantize.public_send(:perform_later, task, original_id, **options)
  logger.info "Launched job: #{task.job}.perform_later(task, \"#{original_id}\", #{options})"
end
new(id, score) click to toggle source

@param id [String] The unique identifier of the task @param score [Fixnum] Timestamp of the task

# File lib/queue_manager/task.rb, line 83
def initialize(id, score)
  @id, @score = id, score
end

Public Instance Methods

delete()
Alias for: remove
done()
Alias for: remove
remove() click to toggle source

Remove task from the queue by score

@return [Boolean] True or false

# File lib/queue_manager/task.rb, line 115
def remove
  transaction do
    marked_id = "#{MARKER}#{id}"
    redis_score = redis.zscore(config.queue, marked_id)

    return false unless score.to_i == redis_score.to_i

    redis.multi do
      clear_task
      redis.zrem(config.queue, marked_id)
    end

    logger.info "The task \"#{id}\" is removed from the queue"
  end
  true
end
Also aliased as: done, delete
to_global_id() click to toggle source

Convert task to global id

@return [GlobalID] Instance of GlobalID

# File lib/queue_manager/task.rb, line 139
def to_global_id
  GlobalID.create(self, app: config.identifier, score: score)
end
update_score(value) click to toggle source
# File lib/queue_manager/task.rb, line 87
def update_score(value)
  transaction do
    _job, _options = job, options

    redis.multi do
      clear_task
      @score = value
      self.job = _job
      self.options = _options
    end
  end
end

Private Instance Methods

clear_task() click to toggle source
# File lib/queue_manager/task.rb, line 145
def clear_task
  %w(job options).each do |name|
    redis.del("#{key}/#{name}")
  end
end
key() click to toggle source
# File lib/queue_manager/task.rb, line 151
def key
  "#{config.queue}/#{id}/#{score}"
end