class SeaMule::Job

Attributes

callback_class_name[R]
id[R]
meta[R]
payload[R]
queue[R]
result[RW]

Public Class Methods

create(queue, klass, payload, meta) click to toggle source
# File lib/seamule/job.rb, line 25
def self.create(queue, klass, payload, meta)
  SeaMule.push(queue, class: klass.to_s, payload: payload, meta: meta, id: SecureRandom.hex)
end
destroy(queue, klass: nil, payload: nil, id: nil) click to toggle source
# File lib/seamule/job.rb, line 29
def self.destroy(queue, klass: nil, payload: nil, id: nil)
  coder = SeaMule.coder
  redis = SeaMule.backend.store
  klass = klass.to_s

  destroyed_count = process_queue(queue, coder, redis, klass, payload, id) do |decoded, new_queue, temp_queue, requeue_queue|
    redis.del(temp_queue).to_i
  end

  destroyed_count.inject(0, :+)
end
new(queue, content) click to toggle source
# File lib/seamule/job.rb, line 17
def initialize(queue, content)
  @queue = queue
  @payload = content['payload']
  @meta = content['meta']
  @id = content['id']
  @callback_class_name = content['class']
end
queued(queue, klass: nil, payload: nil, id: nil) click to toggle source
# File lib/seamule/job.rb, line 41
def self.queued(queue, klass: nil, payload: nil, id: nil)
  coder = SeaMule.coder
  redis = SeaMule.backend.store
  klass = klass.to_s

  jobs = process_queue(queue, coder, redis, klass, payload, id) do |decoded, _new_queue, temp_queue, requeue_queue|
    redis.rpoplpush(temp_queue, requeue_queue)
    new(queue, decoded)
  end

  jobs
end

Protected Class Methods

process_queue(queue, coder, redis, klass, payload, id) { |decoded, new_queue, temp_queue, requeue_queue)| ... } click to toggle source
# File lib/seamule/job.rb, line 85
def self.process_queue(queue, coder, redis, klass, payload, id)
  return_array = []
  new_queue = "queue:#{queue}"
  temp_queue = "queue:#{queue}:temp:#{Time.now.to_i}"
  requeue_queue = "#{temp_queue}:requeue"

  while string = redis.rpoplpush(new_queue, temp_queue)
    decoded = coder.decode(string)
    if decoded['id'] == id || (decoded['class'] == klass && (payload.empty? || decoded['payload'] == payload))
      return_array.unshift(yield decoded, new_queue, temp_queue, requeue_queue)
    else
      redis.rpoplpush(temp_queue, requeue_queue)
    end
  end
  push_queue(redis, requeue_queue, new_queue)

  return_array
end
push_queue(redis, requeue_queue, queue) click to toggle source
# File lib/seamule/job.rb, line 104
def self.push_queue(redis, requeue_queue, queue)
  loop { redis.rpoplpush(requeue_queue, queue) or break }
end

Public Instance Methods

==(other) click to toggle source
# File lib/seamule/job.rb, line 76
def ==(other)
  queue == other.queue &&
      callback_class == other.callback_class &&
      meta == other.meta &&
      payload == other.payload
end
callback_class() click to toggle source
# File lib/seamule/job.rb, line 59
def callback_class
  @callback_class ||= callback_class_name.to_s.constantize
end
inspect() click to toggle source
# File lib/seamule/job.rb, line 71
def inspect
  "Job @#{@queue} | #{@callback_class_name} | #{@meta.inspect} | #{@payload.inspect}"
end
perform() click to toggle source
# File lib/seamule/job.rb, line 54
def perform
  object = callback_class.new
  object.perform(id, meta, payload, result)
end
to_h() click to toggle source
# File lib/seamule/job.rb, line 63
def to_h
  {
      :queue => queue,
      :run_at => Time.now.utc.iso8601,
      :payload => payload
  }
end