class Sidekiq::Job::Setter

This helper class encapsulates the set options for ‘set`, e.g.

SomeJob.set(queue: 'foo').perform_async(....)

Public Class Methods

new(klass, opts) click to toggle source
# File lib/sidekiq/job.rb, line 187
def initialize(klass, opts)
  @klass = klass
  # NB: the internal hash always has stringified keys
  @opts = opts.transform_keys(&:to_s)

  # ActiveJob compatibility
  interval = @opts.delete("wait_until") || @opts.delete("wait")
  at(interval) if interval
end

Public Instance Methods

perform_async(*args) click to toggle source
# File lib/sidekiq/job.rb, line 205
def perform_async(*args)
  if @opts["sync"] == true
    perform_inline(*args)
  else
    @klass.client_push(@opts.merge("args" => args, "class" => @klass))
  end
end
perform_at(interval, *args)
Alias for: perform_in
perform_bulk(args, batch_size: 1_000) click to toggle source
# File lib/sidekiq/job.rb, line 251
def perform_bulk(args, batch_size: 1_000)
  client = @klass.build_client
  client.push_bulk(@opts.merge("class" => @klass, "args" => args, :batch_size => batch_size))
end
perform_in(interval, *args) click to toggle source

interval must be a timestamp, numeric or something that acts

numeric (like an activesupport time interval).
# File lib/sidekiq/job.rb, line 258
def perform_in(interval, *args)
  at(interval).perform_async(*args)
end
Also aliased as: perform_at
perform_inline(*args) click to toggle source

Explicit inline execution of a job. Returns nil if the job did not execute, true otherwise.

# File lib/sidekiq/job.rb, line 215
def perform_inline(*args)
  raw = @opts.merge("args" => args, "class" => @klass)

  # validate and normalize payload
  item = normalize_item(raw)
  queue = item["queue"]

  # run client-side middleware
  cfg = Sidekiq.default_configuration
  result = cfg.client_middleware.invoke(item["class"], item, queue, cfg.redis_pool) do
    item
  end
  return nil unless result

  # round-trip the payload via JSON
  msg = Sidekiq.load_json(Sidekiq.dump_json(item))

  # prepare the job instance
  klass = Object.const_get(msg["class"])
  job = klass.new
  job.jid = msg["jid"]
  job.bid = msg["bid"] if job.respond_to?(:bid)

  # run the job through server-side middleware
  result = cfg.server_middleware.invoke(job, msg, msg["queue"]) do
    # perform it
    job.perform(*msg["args"])
    true
  end
  return nil unless result
  # jobs do not return a result. they should store any
  # modified state.
  true
end
Also aliased as: perform_sync
perform_sync(*args)
Alias for: perform_inline
set(options) click to toggle source
# File lib/sidekiq/job.rb, line 197
def set(options)
  hash = options.transform_keys(&:to_s)
  interval = hash.delete("wait_until") || @opts.delete("wait")
  @opts.merge!(hash)
  at(interval) if interval
  self
end

Private Instance Methods

at(interval) click to toggle source
# File lib/sidekiq/job.rb, line 265
def at(interval)
  int = interval.to_f
  now = Time.now.to_f
  ts = ((int < 1_000_000_000) ? now + int : int)
  # Optimization to enqueue something now that is scheduled to go out now or in the past
  @opts["at"] = ts if ts > now
  self
end