module Resque::Plugins::Locket
Constants
- VERSION
Public Instance Methods
# File lib/resque/plugins/locket/locket.rb, line 52 def heartbeat_frequency @heartbeat_frequency || 30 end
Adjust how often the job will call to redis to extend the job lock.
# File lib/resque/plugins/locket/locket.rb, line 44 def heartbeat_frequency=(seconds) if seconds <= 0 raise ArgumentError, "The heartbeat frequency cannot be 0 seconds" end @heartbeat_frequency = seconds end
# File lib/resque/plugins/locket/locket.rb, line 66 def job_lock_duration @job_lock_duration || 35 end
Adjust how long the duration of the lock will be set to. The heartbeat should refresh the lock at a rate faster than its expiration.
# File lib/resque/plugins/locket/locket.rb, line 58 def job_lock_duration=(seconds) if !seconds.is_a?(Integer) || seconds <= 0 raise ArgumentError, "The job lock duration must be an integer greater than 0" end @job_lock_duration = seconds end
# File lib/resque/plugins/locket/locket.rb, line 70 def job_lock_key=(job_lock_proc) @job_lock_proc = job_lock_proc end
Enable locket. Set all queues to be watched, and register the after_fork hook.
# File lib/resque/plugins/locket/locket.rb, line 30 def locket! Resque.after_fork { |job| locket_or_requeue(job) } unless locket_enabled? @locket_enabled = true end
Has resque-locket been enabled?
# File lib/resque/plugins/locket/locket.rb, line 25 def locket_enabled? @locket_enabled end
Check if a queue’s jobs should be unique across workers.
# File lib/resque/plugins/locket/locket.rb, line 6 def locketed_queue?(queue) case when !locket_enabled? then false when locketed_queues.nil? then true else locketed_queues.include?(queue) end end
List all locketed queues.
# File lib/resque/plugins/locket/locket.rb, line 15 def locketed_queues @locketed_queues end
Set which queues jobs should be unique across workers.
# File lib/resque/plugins/locket/locket.rb, line 20 def locketed_queues=(queues) @locketed_queues = queues end
When a queue is removed, we also need to remove its lock counters and tell locket to stop tracking it.
# File lib/resque/plugins/locket/locket.rb, line 38 def remove_queue(queue) super(queue) redis.hdel("locket:queue_lock_counters", queue) end
Private Instance Methods
# File lib/resque/plugins/locket/locket.rb, line 106 def attach_before_perform_exception(job) job.payload_class.singleton_class.class_eval do define_method(:before_perform_raise_exception) do |*args| raise Resque::Job::DontPerform end end end
# File lib/resque/plugins/locket/locket.rb, line 136 def attach_job_expirations(job) lock_key = job_lock_key(job) job.payload_class.singleton_class.class_eval do # TODO : should we use around_perform with begin/ensure/end so we expire this on failure? define_method(:after_perform_remove_lock) do |*args| Resque.redis.del(lock_key) Resque.redis.del("locket:queue_lock_counters") end define_method(:on_failure_remove_lock) do |*args| Resque.redis.del(lock_key) Resque.redis.del("locket:queue_lock_counters") end end end
# File lib/resque/plugins/locket/locket.rb, line 159 def destroy_queue_lock_counters redis.del("locket:queue_lock_counters") end
# File lib/resque/plugins/locket/locket.rb, line 102 def increment_queue_lock(job) redis.hincrby("locket:queue_lock_counters", job.queue, 1) end
# File lib/resque/plugins/locket/locket.rb, line 173 def job_lock_key(job) if @job_lock_proc @job_lock_proc.call(job) else "locket:job_locks:#{job.payload.to_s}" end end
Check if a queue is locketed, and if so, validate the job of that queue’s availability for locking.
# File lib/resque/plugins/locket/locket.rb, line 78 def locket_or_requeue(job) return unless locketed_queue?(job.queue) obtain_job_lock(job) ? retain_job_lock(job) : requeue_job(job) end
If a lock doesn’t exist for a job, set an expiring lock. If it does, we can’t obtain the lock, and this will return nil.
# File lib/resque/plugins/locket/locket.rb, line 86 def obtain_job_lock(job) lock_key = job_lock_key(job) set_expiring_key(job) unless redis.get(lock_key) end
WHEN A JOB IS LOCKED ——————————————————————————–
Requeue the locked job and increment our lock counter.
# File lib/resque/plugins/locket/locket.rb, line 96 def requeue_job(job) attach_before_perform_exception(job) job.recreate increment_queue_lock(job) end
WHEN A JOB IS NOT LOCKED —————————————————————————-
Clear our queue lock counters, begin a thread to start a heartbeat to redis that will hold the lock as long as we’re active, and dynamically attach an after_perform hook that will manually remove the lock.
# File lib/resque/plugins/locket/locket.rb, line 120 def retain_job_lock(job) validate_timing destroy_queue_lock_counters spawn_heartbeat_thread(job) attach_job_expirations(job) end
INDIVIDUAL JOB LOCK CONVENIENCES ——————————————————————–
A couple quickies to make our life easier when dealing with setting a lock for a job that is currently being processed.
# File lib/resque/plugins/locket/locket.rb, line 168 def set_expiring_key(job) lock_key = job_lock_key(job) redis.setex(lock_key, job_lock_duration, "") end
# File lib/resque/plugins/locket/locket.rb, line 127 def spawn_heartbeat_thread(job) Thread.new do loop do sleep(heartbeat_frequency) set_expiring_key(job) end end end
# File lib/resque/plugins/locket/locket.rb, line 153 def validate_timing if job_lock_duration < heartbeat_frequency raise "A job's heartbeat must be more frequent than its lock expiration." end end