class QPush::Server::Delay

The Delay worker requeues any jobs that have been delayed on our Redis server. Delayed jobs are pulled by a 'zrangebyscore', with the score representing the time the job should be performed.

Public Class Methods

new() click to toggle source
# File lib/qpush/server/delay.rb, line 8
def initialize
  @done = false
  @conn = nil
end

Public Instance Methods

shutdown() click to toggle source

Shutsdown our delay process.

# File lib/qpush/server/delay.rb, line 27
def shutdown
  @done = true
end
start() click to toggle source

Starts our delay process. This will run until instructed to stop.

# File lib/qpush/server/delay.rb, line 15
def start
  until @done
    Server.redis do |conn|
      @conn = conn
      watch_delay { retrieve_delays }
    end
    sleep 2
  end
end

Private Instance Methods

perform_job(json) click to toggle source

Add a delayed job to the appropriate perform list.

# File lib/qpush/server/delay.rb, line 52
def perform_job(json)
  job = Job.new(JSON.parse(json))
  job.perform
rescue => e
  raise ServerError, e.message
end
retrieve_delays() click to toggle source

Retrieves delayed jobs based on the time they should be performed. If any are found, begin to update them.

# File lib/qpush/server/delay.rb, line 36
def retrieve_delays
  delays = @conn.zrangebyscore(Server.keys[:delay], 0, Time.now.to_i)
  delays.any? ? update_delays(delays) : @conn.unwatch
end
update_delays(delays) click to toggle source

Removes jobs that have been retrieved and sets them up to be performed.

# File lib/qpush/server/delay.rb, line 43
def update_delays(delays)
  @conn.multi do |multi|
    multi.zrem(Server.keys[:delay], delays)
    delays.each { |job| perform_job(job) }
  end
end
watch_delay() { || ... } click to toggle source

Performs a watch on our delay list

# File lib/qpush/server/delay.rb, line 61
def watch_delay
  @conn.watch(Server.keys[:delay]) do
    yield if block_given?
  end
end