class Sidekiq::WebCustom::Processor

Public Class Methods

__processor__(queue:, options: Sidekiq.options) click to toggle source
# File lib/sidekiq/web_custom/processor.rb, line 17
def self.__processor__(queue:, options: Sidekiq.options)
  options_temp = options.clone
  queue = queue.is_a?(String) ? Sidekiq::Queue.new(queue) : queue

  options_temp[:queues] = [queue.name]
  klass = options_temp[:fetch]&.class || BasicFetch
  options_temp[:fetch] = klass.new(options_temp)
  new(manager: nil, options: options_temp, queue: queue)
end
execute(max:, queue:, options: Sidekiq.options) click to toggle source
# File lib/sidekiq/web_custom/processor.rb, line 7
def self.execute(max:, queue:, options: Sidekiq.options)
  __processor__(queue: queue, options: options).__execute(max: max)
end
execute_job(job:, options: Sidekiq.options) click to toggle source
# File lib/sidekiq/web_custom/processor.rb, line 11
def self.execute_job(job:, options: Sidekiq.options)
  __processor__(queue: job.queue, options: options).__execute_job(job: job)
rescue StandardError => _
  false # error gets loggged downstream
end
new(manager:, options:, queue:) click to toggle source
Calls superclass method
# File lib/sidekiq/web_custom/processor.rb, line 27
def initialize(manager:, options:, queue:)
  @__queue = queue
  @__basic_fetch = options[:fetch].class == BasicFetch

  super(manager, options)
end

Public Instance Methods

__execute(max:) click to toggle source
# File lib/sidekiq/web_custom/processor.rb, line 56
def __execute(max:)
  count = 0
  max.times do
    break if @__queue.size <= 0

    if Thread.current[Sidekiq::WebCustom::BREAK_BIT]
      Sidekiq.logger.warn "Yikes -- Break bit has been set. Attempting to return in time. Completed #{count} of attempted #{max}"
      break
    end

    Sidekiq.logger.info { "Manually processing next item in queue:[#{@__queue.name}]" }
    process_one
    count += 1

  end
  count
rescue Exception => ex
  if @job && @__basic_fetch
    Sidekiq.logger.fatal "Processor Execution interrupted. Lost Job #{@job.job}"
  end
  Sidekiq.logger.warn "Manual execution has terminated. Received error [#{ex.message}]"
  return count
end
__execute_job(job:) click to toggle source
# File lib/sidekiq/web_custom/processor.rb, line 34
def __execute_job(job:)
  queue_name = "queue:#{job.queue}"
  work_unit = Sidekiq::BasicFetch::UnitOfWork.new(queue_name, job.item.to_json)
  begin
    Sidekiq.logger.info "Manually processing individual work unit for #{work_unit.queue_name}"
    process(work_unit)
  rescue StandardError => e
    Sidekiq.logger.error "Manually processed work unit failed with #{e.message}. Work unit will not be dequeued"
    raise e
  end

  begin
    job.delete
    Sidekiq.logger.info { "Manually processed work unit sucessfully dequeued." }
  rescue StandardError => e
    Sidekiq.logger.fatal "Manually processed work unit failed to be dequeued. #{e.message}."
    raise e
  end

  true
end