class SidekiqFastEnq

Implementation of the Sidekiq::Scheduled::Enq class that uses a server side Lua script to atomically get the next scheduled job to run and then pops it from the list. This works much better in large sidekiq deployments with many processes because it eliminates race conditions checking the scheduled queues.

Constants

DEFAULT_BATCH_SIZE

Public Class Methods

new(batch_size = nil) click to toggle source
# File lib/sidekiq-fast-enq.rb, line 10
def initialize(batch_size = nil)
  batch_size ||= (Sidekiq.options[:fast_enq_batch_size] || DEFAULT_BATCH_SIZE)
  @script = lua_script(batch_size)
  Sidekiq.redis do |conn|
    @script_sha_1 = conn.script(:load, @script)
  end
end

Public Instance Methods

enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = nil) click to toggle source
# File lib/sidekiq-fast-enq.rb, line 18
def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = nil)
  sorted_sets ||= Sidekiq::Scheduled::SETS
  logger = Sidekiq.logger

  # A job's "score" in Redis is the time at which it should be processed.
  # Just check Redis for the set of jobs with a timestamp before now.
  Sidekiq.redis do |conn|
    namespace = conn.namespace if conn.respond_to?(:namespace)
    sorted_sets.each do |sorted_set|
      redis_set = (namespace ? "#{namespace}:#{sorted_set}" : sorted_set)
      jobs_count = 0
      start_time = Time.now
      pop_time = 0.0
      enqueue_time = 0.0

      # Get the next item in the queue if it's score (time to execute) is <= now.
      # We need to go through the list one at a time to reduce the risk of something
      # going wrong between the time jobs are popped from the scheduled queue and when
      # they are pushed onto a work queue and losing the jobs.
      loop do
        t = Time.now
        job = pop_job(conn, redis_set, now)
        pop_time += (Time.now - t)
        break if job.nil?
        t = Time.now
        Sidekiq::Client.push(Sidekiq.load_json(job))
        enqueue_time += (Time.now - t)
        jobs_count += 1
        logger.debug("enqueued #{sorted_set}: #{job}") if logger && logger.debug?
      end

      if jobs_count > 0 && logger && logger.info?
        loop_time = Time.now - start_time
        logger.info("SidekiqFastEnq enqueued #{jobs_count} from #{sorted_set} in #{loop_time.round(3)}s (pop: #{pop_time.round(3)}s; enqueue: #{enqueue_time.round(3)}s)")
      end
    end
  end
end

Private Instance Methods

eval_script(conn, script, sha1, argv=[]) click to toggle source

Evaluate and execute a Lua script on the redis server.

# File lib/sidekiq-fast-enq.rb, line 66
def eval_script(conn, script, sha1, argv=[])
  begin
    conn.evalsha(sha1, [], argv)
  rescue Redis::CommandError => e
    if e.message.include?('NOSCRIPT'.freeze)
      t = Time.now
      sha1 = conn.script(:load, script)
      Sidekiq::Logging.logger.info("loaded script #{sha1} in #{Time.now - t}s")
      retry
    else
      raise e
    end
  end
end
lua_script(batch_size) click to toggle source

Lua script that will atomically get the next element from the sorted set of scheduled jobs and pop it from the list.

# File lib/sidekiq-fast-enq.rb, line 83
  def lua_script(batch_size)
    batch_size = batch_size.to_i
    batch_size = DEFAULT_BATCH_SIZE if batch_size <= 0
    <<-LUA
    local sorted_set = ARGV[1]
    local now = tonumber(ARGV[2])
    local ready_cache = sorted_set .. '.cache'

    while true do
      -- Check a cached list of jobs that are ready to execute
      local job = redis.call('lpop', ready_cache)
      if not job then
        -- If no jobs in the cache then get the next 100 jobs ready to be executed
        local ready_jobs = redis.call('zrangebyscore', sorted_set, '-inf', now, 'LIMIT', 0, #{batch_size})
        if #ready_jobs == 1 then
          job = ready_jobs[1]
        elseif #ready_jobs > 1 then
          -- If more than one job is ready, throw them in the cache which is faster to access than the sorted set
          redis.call('rpush', ready_cache, unpack(ready_jobs))
          -- Set an expiration on the cache since it's just a cache. The sorted set is still the canonical list.
          redis.call('expire', ready_cache, 60)
          job = redis.call('lpop', ready_cache)
        end
      end

      if job then
        -- Verify that the job was still in the sorted set when we remove. Could happen if
        -- another sidekiq process is still using the standard Enq mechanism.
        local removed = redis.call('zrem', sorted_set, job)
        if removed > 0 then
          return job
        end
      else
        return nil
      end
    end
    LUA
  end
pop_job(conn, sorted_set, now) click to toggle source

Invoke a Lua script on the server to pop the next job from a sorted set that should have been run before “now”.

# File lib/sidekiq-fast-enq.rb, line 61
def pop_job(conn, sorted_set, now)
  eval_script(conn, @script, @script_sha_1, [sorted_set, now])
end