class QPush::Server::Delay
The Delay
worker requeues any jobs that have been delayed on our Redis server. Delayed jobs are pulled by a 'zrangebyscore', with the score representing the time the job should be performed.
Public Class Methods
new()
click to toggle source
# File lib/qpush/server/delay.rb, line 8 def initialize @done = false @conn = nil end
Public Instance Methods
shutdown()
click to toggle source
Shutsdown our delay process.
# File lib/qpush/server/delay.rb, line 27 def shutdown @done = true end
start()
click to toggle source
Starts our delay process. This will run until instructed to stop.
# File lib/qpush/server/delay.rb, line 15 def start until @done Server.redis do |conn| @conn = conn watch_delay { retrieve_delays } end sleep 2 end end
Private Instance Methods
perform_job(json)
click to toggle source
Add a delayed job to the appropriate perform list.
# File lib/qpush/server/delay.rb, line 52 def perform_job(json) job = Job.new(JSON.parse(json)) job.perform rescue => e raise ServerError, e.message end
retrieve_delays()
click to toggle source
Retrieves delayed jobs based on the time they should be performed. If any are found, begin to update them.
# File lib/qpush/server/delay.rb, line 36 def retrieve_delays delays = @conn.zrangebyscore(Server.keys[:delay], 0, Time.now.to_i) delays.any? ? update_delays(delays) : @conn.unwatch end
update_delays(delays)
click to toggle source
Removes jobs that have been retrieved and sets them up to be performed.
# File lib/qpush/server/delay.rb, line 43 def update_delays(delays) @conn.multi do |multi| multi.zrem(Server.keys[:delay], delays) delays.each { |job| perform_job(job) } end end
watch_delay() { || ... }
click to toggle source
Performs a watch on our delay list
# File lib/qpush/server/delay.rb, line 61 def watch_delay @conn.watch(Server.keys[:delay]) do yield if block_given? end end