class SeaMule::Job
Attributes
callback_class_name[R]
id[R]
meta[R]
payload[R]
queue[R]
result[RW]
Public Class Methods
create(queue, klass, payload, meta)
click to toggle source
# File lib/seamule/job.rb, line 25 def self.create(queue, klass, payload, meta) SeaMule.push(queue, class: klass.to_s, payload: payload, meta: meta, id: SecureRandom.hex) end
destroy(queue, klass: nil, payload: nil, id: nil)
click to toggle source
# File lib/seamule/job.rb, line 29 def self.destroy(queue, klass: nil, payload: nil, id: nil) coder = SeaMule.coder redis = SeaMule.backend.store klass = klass.to_s destroyed_count = process_queue(queue, coder, redis, klass, payload, id) do |decoded, new_queue, temp_queue, requeue_queue| redis.del(temp_queue).to_i end destroyed_count.inject(0, :+) end
new(queue, content)
click to toggle source
# File lib/seamule/job.rb, line 17 def initialize(queue, content) @queue = queue @payload = content['payload'] @meta = content['meta'] @id = content['id'] @callback_class_name = content['class'] end
queued(queue, klass: nil, payload: nil, id: nil)
click to toggle source
# File lib/seamule/job.rb, line 41 def self.queued(queue, klass: nil, payload: nil, id: nil) coder = SeaMule.coder redis = SeaMule.backend.store klass = klass.to_s jobs = process_queue(queue, coder, redis, klass, payload, id) do |decoded, _new_queue, temp_queue, requeue_queue| redis.rpoplpush(temp_queue, requeue_queue) new(queue, decoded) end jobs end
Protected Class Methods
process_queue(queue, coder, redis, klass, payload, id) { |decoded, new_queue, temp_queue, requeue_queue)| ... }
click to toggle source
# File lib/seamule/job.rb, line 85 def self.process_queue(queue, coder, redis, klass, payload, id) return_array = [] new_queue = "queue:#{queue}" temp_queue = "queue:#{queue}:temp:#{Time.now.to_i}" requeue_queue = "#{temp_queue}:requeue" while string = redis.rpoplpush(new_queue, temp_queue) decoded = coder.decode(string) if decoded['id'] == id || (decoded['class'] == klass && (payload.empty? || decoded['payload'] == payload)) return_array.unshift(yield decoded, new_queue, temp_queue, requeue_queue) else redis.rpoplpush(temp_queue, requeue_queue) end end push_queue(redis, requeue_queue, new_queue) return_array end
push_queue(redis, requeue_queue, queue)
click to toggle source
# File lib/seamule/job.rb, line 104 def self.push_queue(redis, requeue_queue, queue) loop { redis.rpoplpush(requeue_queue, queue) or break } end
Public Instance Methods
==(other)
click to toggle source
# File lib/seamule/job.rb, line 76 def ==(other) queue == other.queue && callback_class == other.callback_class && meta == other.meta && payload == other.payload end
callback_class()
click to toggle source
# File lib/seamule/job.rb, line 59 def callback_class @callback_class ||= callback_class_name.to_s.constantize end
inspect()
click to toggle source
# File lib/seamule/job.rb, line 71 def inspect "Job @#{@queue} | #{@callback_class_name} | #{@meta.inspect} | #{@payload.inspect}" end
perform()
click to toggle source
# File lib/seamule/job.rb, line 54 def perform object = callback_class.new object.perform(id, meta, payload, result) end
to_h()
click to toggle source
# File lib/seamule/job.rb, line 63 def to_h { :queue => queue, :run_at => Time.now.utc.iso8601, :payload => payload } end