class Cyclop::Job

Attributes

_id[RW]

Unique identifier

attempts[RW]

Number of attempts

created_at[RW]

Time it was created

created_by[RW]

Host it’s added under

delay[RW]

Delay in seconds

delayed_until[RW]

Time until we do start the job

errors[RW]

Backtraces of unsuccessful attempts

failed[RW]

Mark as failed

job_params[RW]

Parameters sent to ‘#perform`

locked_at[RW]

Time when worker started

locked_by[RW]

Worker unique identifier

queue[RW]

Queue name

retries[RW]

Number of retries before being marked as failed

splay[RW]

Time in seconds between retry

updated_at[RW]

Time it was last updated

Public Class Methods

create(opts={}) click to toggle source

Create a new job and save it to the queue specified in ‘opts`

# File lib/cyclop/job.rb, line 40
def self.create(opts={})
  job = new opts
  job.save
  job
end
failed(opts={}) click to toggle source

Get failed jobs from any ‘opts`

# File lib/cyclop/job.rb, line 90
def self.failed(opts={})
  selector = {}
  # Failed or dead jobs only
  selector["$or"] = [
    {failed: true}, 
    {"$where" => "this.attempts > this.retries"},
  ]
  # Filter by queue if present
  selector[:queue] = {"$in" => opts[:queues]} if opts[:queues] && !opts[:queues].empty?

  options = {}
  options[:skip] = opts[:skip] if opts[:skip]
  options[:limit] = opts[:limit] if opts[:limit]

  collection.find(selector, options).collect{|attrs| new attrs}
end
find(id) click to toggle source
# File lib/cyclop/job.rb, line 107
def self.find(id)
  if doc = collection.find_one(id)
    new doc
  end
end
new(attrs={}) click to toggle source
# File lib/cyclop/job.rb, line 34
def initialize(attrs={})
  raise ArgumentError, ":queue is required" unless attrs["queue"] || attrs[:queue]
  self.attributes = attrs
end
next(opts={}) click to toggle source

Get the next job from any ‘opts` and mark it as locked

# File lib/cyclop/job.rb, line 47
def self.next(opts={})
  raise ArgumentError, "locked_by is required" unless opts[:locked_by]

  time_now = Time.now.utc

  conditions = {query: {}}
  # Not failed jobs only
  conditions[:query][:failed] = false
  # Only jobs generated by the specified host if present
  conditions[:query][:created_by] = opts[:host] if opts[:host]
  # Skip delayed jobs
  conditions[:query][:delayed_until] = {"$lte" => time_now}
  # Filter by queue if present
  conditions[:query][:queue] = {"$in" => opts[:queues]} if opts[:queues] && !opts[:queues].empty?
  # Skip locked jobs
  conditions[:query]["$or"] = [{locked_at: {"$lte" => time_now - 1800}}, {locked_at: nil}]
  # Last chance to skip dead jobs
  conditions[:query]["$where"] = "this.attempts <= this.retries"

  # Set `locked_by` with worker id and increment the number of attempts
  conditions[:update] = {
    "$set" => {
      locked_by: opts[:locked_by],
      locked_at: time_now,
    },
    "$inc" => {
      attempts: 1
    }
  }
  
  # Sort by `created_at`
  conditions[:sort] = [:created_at, :asc]

  # Returns the modified job
  conditions[:new] = true

  job = collection.find_and_modify conditions
  new job if job
rescue Mongo::OperationFailure
  nil
end

Private Class Methods

collection() click to toggle source
# File lib/cyclop/job.rb, line 177
def self.collection
  @@collection ||= Cyclop.db ?
    Cyclop.db["cyclop_jobs"] : raise(Cyclop::DatabaseNotAvailable)
end

Public Instance Methods

==(other) click to toggle source
# File lib/cyclop/job.rb, line 138
def ==(other)
  other._id == _id
end
complete!() click to toggle source

Remove successfully processed job from the queue

# File lib/cyclop/job.rb, line 143
def complete!
  collection.remove _id: _id, locked_by: Cyclop.master_id
end
persisted?() click to toggle source

If we have an id the object is persisted

# File lib/cyclop/job.rb, line 134
def persisted?
  !!_id
end
release!(exception = nil) click to toggle source

Release job for further processing

# File lib/cyclop/job.rb, line 148
def release!(exception = nil)
  now = ::Time.at(Time.now.to_i).utc
  selector = {_id: _id, locked_by: Cyclop.master_id}
  set = if attempts<=retries
    {locked_by: nil, locked_at: nil, delayed_until: now+splay}
  else
    {failed: true}
  end
  update = {"$set" => set}
  update["$push"] = {
    :errors => {
      :locked_by => locked_by,
      :locked_at => locked_at,
      :class => exception.class.name,
      :message => exception.message,
      :backtrace => exception.backtrace,
      :created_at => now,
    },
  } if exception
  collection.update selector, update, :safe => true
end
reload() click to toggle source
# File lib/cyclop/job.rb, line 128
def reload
  self.attributes = collection.find_one _id
  self
end
requeue() click to toggle source
# File lib/cyclop/job.rb, line 170
def requeue
  self.attempts, self.failed, self.locked_at = 0, false, nil
  update = {attempts: attempts, failed: failed, locked_at: locked_at}
  collection.update({_id: _id}, {"$set" => update}, :safe => true)
end
save() click to toggle source

Save to queue

# File lib/cyclop/job.rb, line 114
def save
  self.updated_at = Time.now.utc
  if persisted?
    raise NotImplementedError
  else
    self.created_at = updated_at
    self.delayed_until = ::Time.at(created_at.to_i + delay).utc
    self._id = collection.insert attributes, safe: true
  end
  true
rescue Mongo::OperationFailure
  false
end

Private Instance Methods

attributes() click to toggle source
# File lib/cyclop/job.rb, line 186
def attributes
  {
    queue: queue,
    job_params: job_params,
    delay: delay,
    delayed_until: delayed_until,
    retries: retries,
    splay: splay,
    created_by: created_by,
    created_at: created_at,
    updated_at: updated_at,
    locked_by: locked_by,
    locked_at: locked_at,
    failed: failed,
    attempts: attempts,
    errors: errors,
  }
end
attributes=(attrs) click to toggle source
# File lib/cyclop/job.rb, line 205
def attributes=(attrs)
  attrs.each do |key, value|
    send "#{key}=", value
  end
  self.delay ||= 0
  self.retries ||= 0
  self.splay ||= 60
  self.created_by ||= Cyclop.host
  self.failed ||= false
  self.attempts ||= 0
  self.errors ||= []
end
collection() click to toggle source
# File lib/cyclop/job.rb, line 182
def collection
  self.class.collection
end