module Resque::Plugins::Locket

Constants

VERSION

Public Instance Methods

heartbeat_frequency() click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 52
def heartbeat_frequency
  @heartbeat_frequency || 30
end
heartbeat_frequency=(seconds) click to toggle source

Adjust how often the job will call to redis to extend the job lock.

# File lib/resque/plugins/locket/locket.rb, line 44
def heartbeat_frequency=(seconds)
  if seconds <= 0
    raise ArgumentError, "The heartbeat frequency cannot be 0 seconds"
  end

  @heartbeat_frequency = seconds
end
job_lock_duration() click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 66
def job_lock_duration
  @job_lock_duration || 35
end
job_lock_duration=(seconds) click to toggle source

Adjust how long the duration of the lock will be set to. The heartbeat should refresh the lock at a rate faster than its expiration.

# File lib/resque/plugins/locket/locket.rb, line 58
def job_lock_duration=(seconds)
  if !seconds.is_a?(Integer) || seconds <= 0
    raise ArgumentError, "The job lock duration must be an integer greater than 0"
  end

  @job_lock_duration = seconds
end
job_lock_key=(job_lock_proc) click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 70
def job_lock_key=(job_lock_proc)
  @job_lock_proc = job_lock_proc
end
locket!() click to toggle source

Enable locket. Set all queues to be watched, and register the after_fork hook.

# File lib/resque/plugins/locket/locket.rb, line 30
def locket!
  Resque.after_fork { |job| locket_or_requeue(job) } unless locket_enabled?

  @locket_enabled = true
end
locket_enabled?() click to toggle source

Has resque-locket been enabled?

# File lib/resque/plugins/locket/locket.rb, line 25
def locket_enabled?
  @locket_enabled
end
locketed_queue?(queue) click to toggle source

Check if a queue’s jobs should be unique across workers.

# File lib/resque/plugins/locket/locket.rb, line 6
def locketed_queue?(queue)
  case
  when !locket_enabled?     then false
  when locketed_queues.nil? then true
  else                           locketed_queues.include?(queue)
  end
end
locketed_queues() click to toggle source

List all locketed queues.

# File lib/resque/plugins/locket/locket.rb, line 15
def locketed_queues
  @locketed_queues
end
locketed_queues=(queues) click to toggle source

Set which queues jobs should be unique across workers.

# File lib/resque/plugins/locket/locket.rb, line 20
def locketed_queues=(queues)
  @locketed_queues = queues
end
remove_queue(queue) click to toggle source

When a queue is removed, we also need to remove its lock counters and tell locket to stop tracking it.

Calls superclass method
# File lib/resque/plugins/locket/locket.rb, line 38
def remove_queue(queue)
  super(queue)
  redis.hdel("locket:queue_lock_counters", queue)
end

Private Instance Methods

attach_before_perform_exception(job) click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 106
def attach_before_perform_exception(job)
  job.payload_class.singleton_class.class_eval do
    define_method(:before_perform_raise_exception) do |*args|
      raise Resque::Job::DontPerform
    end
  end
end
attach_job_expirations(job) click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 136
def attach_job_expirations(job)
  lock_key = job_lock_key(job)

  job.payload_class.singleton_class.class_eval do
    # TODO : should we use around_perform with begin/ensure/end so we expire this on failure?
    define_method(:after_perform_remove_lock) do |*args|
      Resque.redis.del(lock_key)
      Resque.redis.del("locket:queue_lock_counters")
    end

    define_method(:on_failure_remove_lock) do |*args|
      Resque.redis.del(lock_key)
      Resque.redis.del("locket:queue_lock_counters")
    end
  end
end
destroy_queue_lock_counters() click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 159
def destroy_queue_lock_counters
  redis.del("locket:queue_lock_counters")
end
increment_queue_lock(job) click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 102
def increment_queue_lock(job)
  redis.hincrby("locket:queue_lock_counters", job.queue, 1)
end
job_lock_key(job) click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 173
def job_lock_key(job)
  if @job_lock_proc
    @job_lock_proc.call(job)
  else
    "locket:job_locks:#{job.payload.to_s}"
  end
end
locket_or_requeue(job) click to toggle source

Check if a queue is locketed, and if so, validate the job of that queue’s availability for locking.

# File lib/resque/plugins/locket/locket.rb, line 78
def locket_or_requeue(job)
  return unless locketed_queue?(job.queue)

  obtain_job_lock(job) ? retain_job_lock(job) : requeue_job(job)
end
obtain_job_lock(job) click to toggle source

If a lock doesn’t exist for a job, set an expiring lock. If it does, we can’t obtain the lock, and this will return nil.

# File lib/resque/plugins/locket/locket.rb, line 86
def obtain_job_lock(job)
  lock_key = job_lock_key(job)

  set_expiring_key(job) unless redis.get(lock_key)
end
requeue_job(job) click to toggle source

WHEN A JOB IS LOCKED ——————————————————————————–

Requeue the locked job and increment our lock counter.

# File lib/resque/plugins/locket/locket.rb, line 96
def requeue_job(job)
  attach_before_perform_exception(job)
  job.recreate
  increment_queue_lock(job)
end
retain_job_lock(job) click to toggle source

WHEN A JOB IS NOT LOCKED —————————————————————————-

Clear our queue lock counters, begin a thread to start a heartbeat to redis that will hold the lock as long as we’re active, and dynamically attach an after_perform hook that will manually remove the lock.

# File lib/resque/plugins/locket/locket.rb, line 120
def retain_job_lock(job)
  validate_timing
  destroy_queue_lock_counters
  spawn_heartbeat_thread(job)
  attach_job_expirations(job)
end
set_expiring_key(job) click to toggle source

INDIVIDUAL JOB LOCK CONVENIENCES ——————————————————————–

A couple quickies to make our life easier when dealing with setting a lock for a job that is currently being processed.

# File lib/resque/plugins/locket/locket.rb, line 168
def set_expiring_key(job)
  lock_key = job_lock_key(job)
  redis.setex(lock_key, job_lock_duration, "")
end
spawn_heartbeat_thread(job) click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 127
def spawn_heartbeat_thread(job)
  Thread.new do
    loop do
      sleep(heartbeat_frequency)
      set_expiring_key(job)
    end
  end
end
validate_timing() click to toggle source
# File lib/resque/plugins/locket/locket.rb, line 153
def validate_timing
  if job_lock_duration < heartbeat_frequency
    raise "A job's heartbeat must be more frequent than its lock expiration."
  end
end