class Cyclop::Job
Attributes
_id[RW]
Unique identifier
attempts[RW]
Number of attempts
created_at[RW]
Time it was created
created_by[RW]
Host it’s added under
delay[RW]
Delay in seconds
delayed_until[RW]
Time until we do start the job
errors[RW]
Backtraces of unsuccessful attempts
failed[RW]
Mark as failed
job_params[RW]
Parameters sent to ‘#perform`
locked_at[RW]
Time when worker started
locked_by[RW]
Worker
unique identifier
queue[RW]
Queue name
retries[RW]
Number of retries before being marked as failed
splay[RW]
Time in seconds between retry
updated_at[RW]
Time it was last updated
Public Class Methods
create(opts={})
click to toggle source
Create a new job and save it to the queue specified in ‘opts`
# File lib/cyclop/job.rb, line 40 def self.create(opts={}) job = new opts job.save job end
failed(opts={})
click to toggle source
Get failed jobs from any ‘opts`
# File lib/cyclop/job.rb, line 90 def self.failed(opts={}) selector = {} # Failed or dead jobs only selector["$or"] = [ {failed: true}, {"$where" => "this.attempts > this.retries"}, ] # Filter by queue if present selector[:queue] = {"$in" => opts[:queues]} if opts[:queues] && !opts[:queues].empty? options = {} options[:skip] = opts[:skip] if opts[:skip] options[:limit] = opts[:limit] if opts[:limit] collection.find(selector, options).collect{|attrs| new attrs} end
find(id)
click to toggle source
# File lib/cyclop/job.rb, line 107 def self.find(id) if doc = collection.find_one(id) new doc end end
new(attrs={})
click to toggle source
# File lib/cyclop/job.rb, line 34 def initialize(attrs={}) raise ArgumentError, ":queue is required" unless attrs["queue"] || attrs[:queue] self.attributes = attrs end
next(opts={})
click to toggle source
Get the next job from any ‘opts` and mark it as locked
# File lib/cyclop/job.rb, line 47 def self.next(opts={}) raise ArgumentError, "locked_by is required" unless opts[:locked_by] time_now = Time.now.utc conditions = {query: {}} # Not failed jobs only conditions[:query][:failed] = false # Only jobs generated by the specified host if present conditions[:query][:created_by] = opts[:host] if opts[:host] # Skip delayed jobs conditions[:query][:delayed_until] = {"$lte" => time_now} # Filter by queue if present conditions[:query][:queue] = {"$in" => opts[:queues]} if opts[:queues] && !opts[:queues].empty? # Skip locked jobs conditions[:query]["$or"] = [{locked_at: {"$lte" => time_now - 1800}}, {locked_at: nil}] # Last chance to skip dead jobs conditions[:query]["$where"] = "this.attempts <= this.retries" # Set `locked_by` with worker id and increment the number of attempts conditions[:update] = { "$set" => { locked_by: opts[:locked_by], locked_at: time_now, }, "$inc" => { attempts: 1 } } # Sort by `created_at` conditions[:sort] = [:created_at, :asc] # Returns the modified job conditions[:new] = true job = collection.find_and_modify conditions new job if job rescue Mongo::OperationFailure nil end
Private Class Methods
collection()
click to toggle source
# File lib/cyclop/job.rb, line 177 def self.collection @@collection ||= Cyclop.db ? Cyclop.db["cyclop_jobs"] : raise(Cyclop::DatabaseNotAvailable) end
Public Instance Methods
==(other)
click to toggle source
# File lib/cyclop/job.rb, line 138 def ==(other) other._id == _id end
complete!()
click to toggle source
Remove successfully processed job from the queue
# File lib/cyclop/job.rb, line 143 def complete! collection.remove _id: _id, locked_by: Cyclop.master_id end
persisted?()
click to toggle source
If we have an id the object is persisted
# File lib/cyclop/job.rb, line 134 def persisted? !!_id end
release!(exception = nil)
click to toggle source
Release job for further processing
# File lib/cyclop/job.rb, line 148 def release!(exception = nil) now = ::Time.at(Time.now.to_i).utc selector = {_id: _id, locked_by: Cyclop.master_id} set = if attempts<=retries {locked_by: nil, locked_at: nil, delayed_until: now+splay} else {failed: true} end update = {"$set" => set} update["$push"] = { :errors => { :locked_by => locked_by, :locked_at => locked_at, :class => exception.class.name, :message => exception.message, :backtrace => exception.backtrace, :created_at => now, }, } if exception collection.update selector, update, :safe => true end
reload()
click to toggle source
# File lib/cyclop/job.rb, line 128 def reload self.attributes = collection.find_one _id self end
requeue()
click to toggle source
# File lib/cyclop/job.rb, line 170 def requeue self.attempts, self.failed, self.locked_at = 0, false, nil update = {attempts: attempts, failed: failed, locked_at: locked_at} collection.update({_id: _id}, {"$set" => update}, :safe => true) end
save()
click to toggle source
Save to queue
# File lib/cyclop/job.rb, line 114 def save self.updated_at = Time.now.utc if persisted? raise NotImplementedError else self.created_at = updated_at self.delayed_until = ::Time.at(created_at.to_i + delay).utc self._id = collection.insert attributes, safe: true end true rescue Mongo::OperationFailure false end
Private Instance Methods
attributes()
click to toggle source
# File lib/cyclop/job.rb, line 186 def attributes { queue: queue, job_params: job_params, delay: delay, delayed_until: delayed_until, retries: retries, splay: splay, created_by: created_by, created_at: created_at, updated_at: updated_at, locked_by: locked_by, locked_at: locked_at, failed: failed, attempts: attempts, errors: errors, } end
attributes=(attrs)
click to toggle source
# File lib/cyclop/job.rb, line 205 def attributes=(attrs) attrs.each do |key, value| send "#{key}=", value end self.delay ||= 0 self.retries ||= 0 self.splay ||= 60 self.created_by ||= Cyclop.host self.failed ||= false self.attempts ||= 0 self.errors ||= [] end
collection()
click to toggle source
# File lib/cyclop/job.rb, line 182 def collection self.class.collection end