module Sidekiq::Job::Iterable

Constants

CANCELLATION_PERIOD

Three days is the longest period you generally need to wait for a retry to execute when using the default retry scheme. We don’t want to “forget” the job is cancelled before it has a chance to execute and cancel itself.

STATE_FLUSH_INTERVAL
STATE_TTL

might be retrying

Public Class Methods

included(base) click to toggle source

@api private

# File lib/sidekiq/job/iterable.rb, line 13
def self.included(base)
  base.extend(ClassMethods)
end
new() click to toggle source

@api private

Calls superclass method
# File lib/sidekiq/job/iterable.rb, line 26
def initialize
  super

  @_executions = 0
  @_cursor = nil
  @_start_time = nil
  @_runtime = 0
  @_args = nil
  @_cancelled = nil
end

Public Instance Methods

arguments() click to toggle source
# File lib/sidekiq/job/iterable.rb, line 37
def arguments
  @_args
end
around_iteration() { || ... } click to toggle source

A hook to override that will be called around each iteration.

Can be useful for some metrics collection, performance tracking etc.

# File lib/sidekiq/job/iterable.rb, line 80
def around_iteration
  yield
end
build_enumerator(*) click to toggle source

The enumerator to be iterated over.

@return [Enumerator]

@raise [NotImplementedError] with a message advising subclasses to

implement an override for this method.
# File lib/sidekiq/job/iterable.rb, line 108
def build_enumerator(*)
  raise NotImplementedError, "#{self.class.name} must implement a '#build_enumerator' method"
end
cancel!() click to toggle source

Set a flag in Redis to mark this job as cancelled. Cancellation is asynchronous and is checked at the start of iteration and every 5 seconds thereafter as part of the recurring state flush.

# File lib/sidekiq/job/iterable.rb, line 49
def cancel!
  return @_cancelled if cancelled?

  key = "it-#{jid}"
  _, result, _ = Sidekiq.redis do |c|
    c.pipelined do |p|
      p.hsetnx(key, "cancelled", Time.now.to_i)
      p.hget(key, "cancelled")
      # TODO When Redis 7.2 is required
      # p.expire(key, Sidekiq::Job::Iterable::STATE_TTL, "nx")
      p.expire(key, Sidekiq::Job::Iterable::STATE_TTL)
    end
  end
  @_cancelled = result.to_i
end
cancelled?() click to toggle source
# File lib/sidekiq/job/iterable.rb, line 65
def cancelled?
  @_cancelled
end
each_iteration(*) click to toggle source

The action to be performed on each item from the enumerator.

@return [void]

@raise [NotImplementedError] with a message advising subclasses to

implement an override for this method.
# File lib/sidekiq/job/iterable.rb, line 119
def each_iteration(*)
  raise NotImplementedError, "#{self.class.name} must implement an '#each_iteration' method"
end
iteration_key() click to toggle source
# File lib/sidekiq/job/iterable.rb, line 123
def iteration_key
  "it-#{jid}"
end
on_complete() click to toggle source

A hook to override that will be called when the job finished iterating.

# File lib/sidekiq/job/iterable.rb, line 98
def on_complete
end
on_resume() click to toggle source

A hook to override that will be called when the job resumes iterating.

# File lib/sidekiq/job/iterable.rb, line 86
def on_resume
end
on_start() click to toggle source

A hook to override that will be called when the job starts iterating.

It is called only once, for the first time.

# File lib/sidekiq/job/iterable.rb, line 73
def on_start
end
on_stop() click to toggle source

A hook to override that will be called each time the job is interrupted.

This can be due to interruption or sidekiq stopping.

# File lib/sidekiq/job/iterable.rb, line 93
def on_stop
end
perform(*args) click to toggle source

@api private

# File lib/sidekiq/job/iterable.rb, line 128
def perform(*args)
  @_args = args.dup.freeze
  fetch_previous_iteration_state

  @_executions += 1
  @_start_time = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)

  enumerator = build_enumerator(*args, cursor: @_cursor)
  unless enumerator
    logger.info("'#build_enumerator' returned nil, skipping the job.")
    return
  end

  assert_enumerator!(enumerator)

  if @_executions == 1
    on_start
  else
    on_resume
  end

  completed = catch(:abort) do
    iterate_with_enumerator(enumerator, args)
  end

  on_stop
  completed = handle_completed(completed)

  if completed
    on_complete
    cleanup
  else
    reenqueue_iteration_job
  end
end

Private Instance Methods

assert_enumerator!(enum) click to toggle source
# File lib/sidekiq/job/iterable.rb, line 242
      def assert_enumerator!(enum)
        unless enum.is_a?(Enumerator)
          raise ArgumentError, <<~MSG
            #build_enumerator must return an Enumerator, but returned #{enum.class}.
            Example:
              def build_enumerator(params, cursor:)
                active_record_records_enumerator(
                  Shop.find(params["shop_id"]).products,
                  cursor: cursor
                )
              end
          MSG
        end
      end
cleanup() click to toggle source
# File lib/sidekiq/job/iterable.rb, line 274
def cleanup
  logger.debug {
    format("Completed iteration. executions=%d runtime=%.3f", @_executions, @_runtime)
  }
  Sidekiq.redis { |conn| conn.unlink(iteration_key) }
end
fetch_previous_iteration_state() click to toggle source
# File lib/sidekiq/job/iterable.rb, line 170
def fetch_previous_iteration_state
  state = Sidekiq.redis { |conn| conn.hgetall(iteration_key) }

  unless state.empty?
    @_executions = state["ex"].to_i
    @_cursor = Sidekiq.load_json(state["c"])
    @_runtime = state["rt"].to_f
  end
end
flush_state() click to toggle source
# File lib/sidekiq/job/iterable.rb, line 257
def flush_state
  key = iteration_key
  state = {
    "ex" => @_executions,
    "c" => Sidekiq.dump_json(@_cursor),
    "rt" => @_runtime
  }

  Sidekiq.redis do |conn|
    conn.multi do |pipe|
      pipe.hset(key, state)
      pipe.expire(key, STATE_TTL)
      pipe.hget(key, "cancelled")
    end
  end
end
handle_completed(completed) click to toggle source
# File lib/sidekiq/job/iterable.rb, line 281
def handle_completed(completed)
  case completed
  when nil, # someone aborted the job but wants to call the on_complete callback
       true
    true
  when false
    false
  else
    raise "Unexpected thrown value: #{completed.inspect}"
  end
end
is_cancelled?() click to toggle source
# File lib/sidekiq/job/iterable.rb, line 166
def is_cancelled?
  @_cancelled = Sidekiq.redis { |c| c.hget("it-#{jid}", "cancelled") }
end
iterate_with_enumerator(enumerator, arguments) click to toggle source
# File lib/sidekiq/job/iterable.rb, line 185
def iterate_with_enumerator(enumerator, arguments)
  if is_cancelled?
    logger.info { "Job cancelled" }
    return true
  end

  time_limit = Sidekiq.default_configuration[:timeout]
  found_record = false
  state_flushed_at = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)

  enumerator.each do |object, cursor|
    found_record = true
    @_cursor = cursor

    is_interrupted = interrupted?
    if ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - state_flushed_at >= STATE_FLUSH_INTERVAL || is_interrupted
      _, _, cancelled = flush_state
      state_flushed_at = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
      if cancelled == 1
        @_cancelled = true
        logger.info { "Job cancelled" }
        return true
      end
    end

    return false if is_interrupted

    verify_iteration_time(time_limit, object) do
      around_iteration do
        each_iteration(object, *arguments)
      end
    end
  end

  logger.debug("Enumerator found nothing to iterate!") unless found_record
  true
ensure
  @_runtime += (::Process.clock_gettime(::Process::CLOCK_MONOTONIC) - @_start_time)
end
reenqueue_iteration_job() click to toggle source
# File lib/sidekiq/job/iterable.rb, line 235
def reenqueue_iteration_job
  flush_state
  logger.debug { "Interrupting job (cursor=#{@_cursor.inspect})" }

  raise Interrupted
end
verify_iteration_time(time_limit, object) { || ... } click to toggle source
# File lib/sidekiq/job/iterable.rb, line 225
def verify_iteration_time(time_limit, object)
  start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  yield
  finish = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  total = finish - start
  if total > time_limit
    logger.warn { "Iteration took longer (%.2f) than Sidekiq's shutdown timeout (%d) when processing `%s`. This can lead to job processing problems during deploys" % [total, time_limit, object] }
  end
end