class KueRuby

Interface with the Automattic Kue redis store

Constants

VERSION

Attributes

prefix[R]
redis[R]

Public Class Methods

new(options = {}) click to toggle source

Create a new client instance

@param Hash options @option options Redis :redis an instance of `redis` @option options [String] :prefix namespace in redis (default is q)

@return [KueRuby] a new kue client instance

Calls superclass method
# File lib/kue_ruby.rb, line 16
def initialize(options = {})
  raise(ArgumentError, ':redis Redis required', caller) unless options[:redis]
  @redis = options[:redis]
  @prefix = options[:prefix] ? options[:prefix] : 'q'
  super()
end

Public Instance Methods

create_fifo(id = 1) click to toggle source

Create FIFO id for zset to preserve order

@param Integer id

@return String

# File lib/kue_ruby.rb, line 28
def create_fifo(id = 1)
  id_len = id.to_s.length.to_s
  len = 2 - id_len.length
  while len > 0
    id_len = '0' + id_len
    len -= 1
  end
  id_len.to_s + '|' + id.to_s
end
create_job(options = {}) click to toggle source

Enqueue a job

@param Hash options @option options String :type name of the queue for the job @option options Hash :data hash of job data @option options [Integer] :max_attempts max attempts for the job @option options [Integer] :priority default is 0/normal @option options [Integer] :ttl expiry value for time the job can live in active state (ms)

@return [KueJob] a new kue job

# File lib/kue_ruby.rb, line 48
def create_job(options = {})
  raise(ArgumentError, ':type String required', caller) unless options[:type]
  raise(ArgumentError, ':data Hash required', caller) unless options[:data]
  begin
    return create_job! options
  rescue
    return nil
  end
end
create_job!(options = {}) click to toggle source

Enqueue a job

@param Hash options @option options String :type name of the queue for the job @option options Hash :data hash of job data @option options [Integer] :max_attempts max attempts for the job @option options [Integer] :priority default is 0/normal @option options [Integer] :ttl expiry value for time the job can live in active state (ms)

@return KueJob a new kue job, throwing on exception

# File lib/kue_ruby.rb, line 68
def create_job!(options = {})
  raise(ArgumentError, ':type String required', caller) unless options[:type]
  raise(ArgumentError, ':data Hash required', caller) unless options[:data]
  job = KueJob.new
  job.type = options[:type]
  job.data = options[:data]
  job.ttl = options[:ttl] if options[:ttl]
  job.priority = options[:priority] ? options[:priority] : 0
  job.max_attempts = options[:max_attempts] ? options[:max_attempts] : 1
  job.state = 'inactive'
  job.created_at = Time.now
  job.backoff = { delay: 60 * 1000, type: 'exponential' }
  job.id = @redis.incr "#{@prefix}.ids"
  job.zid = create_fifo job.id
  @redis.sadd "#{@prefix}:job:types", job.type
  job.save! self
  @redis.zadd("#{@prefix}:jobs", job.priority, job.zid)
  @redis.zadd("#{@prefix}:jobs:inactive", job.priority, job.zid)
  @redis.zadd("#{@prefix}:jobs:#{job.type}:inactive", job.priority, job.zid)
  @redis.lpush("#{@prefix}:#{job.type}:jobs", 1)
  job
end