class KueRuby
Interface with the Automattic Kue redis store
Constants
- VERSION
Attributes
Public Class Methods
Create a new client instance
@param Hash options @option options Redis :redis an instance of `redis` @option options [String] :prefix namespace in redis (default is q)
@return [KueRuby] a new kue client instance
# File lib/kue_ruby.rb, line 16 def initialize(options = {}) raise(ArgumentError, ':redis Redis required', caller) unless options[:redis] @redis = options[:redis] @prefix = options[:prefix] ? options[:prefix] : 'q' super() end
Public Instance Methods
Create FIFO id for zset to preserve order
@param Integer id
@return String
# File lib/kue_ruby.rb, line 28 def create_fifo(id = 1) id_len = id.to_s.length.to_s len = 2 - id_len.length while len > 0 id_len = '0' + id_len len -= 1 end id_len.to_s + '|' + id.to_s end
Enqueue a job
@param Hash options @option options String :type name of the queue for the job @option options Hash :data hash of job data @option options [Integer] :max_attempts max attempts for the job @option options [Integer] :priority default is 0/normal @option options [Integer] :ttl expiry value for time the job can live in active state (ms)
@return [KueJob] a new kue job
# File lib/kue_ruby.rb, line 48 def create_job(options = {}) raise(ArgumentError, ':type String required', caller) unless options[:type] raise(ArgumentError, ':data Hash required', caller) unless options[:data] begin return create_job! options rescue return nil end end
Enqueue a job
@param Hash options @option options String :type name of the queue for the job @option options Hash :data hash of job data @option options [Integer] :max_attempts max attempts for the job @option options [Integer] :priority default is 0/normal @option options [Integer] :ttl expiry value for time the job can live in active state (ms)
@return KueJob
a new kue job, throwing on exception
# File lib/kue_ruby.rb, line 68 def create_job!(options = {}) raise(ArgumentError, ':type String required', caller) unless options[:type] raise(ArgumentError, ':data Hash required', caller) unless options[:data] job = KueJob.new job.type = options[:type] job.data = options[:data] job.ttl = options[:ttl] if options[:ttl] job.priority = options[:priority] ? options[:priority] : 0 job.max_attempts = options[:max_attempts] ? options[:max_attempts] : 1 job.state = 'inactive' job.created_at = Time.now job.backoff = { delay: 60 * 1000, type: 'exponential' } job.id = @redis.incr "#{@prefix}.ids" job.zid = create_fifo job.id @redis.sadd "#{@prefix}:job:types", job.type job.save! self @redis.zadd("#{@prefix}:jobs", job.priority, job.zid) @redis.zadd("#{@prefix}:jobs:inactive", job.priority, job.zid) @redis.zadd("#{@prefix}:jobs:#{job.type}:inactive", job.priority, job.zid) @redis.lpush("#{@prefix}:#{job.type}:jobs", 1) job end