class Rjob::WorkerProcess

TODO: find a mechanism to recover from jobs that went to working but never returned

Constants

HEARTBEAT_TIMEOUT
ITERATION_TIMEOUT
StopSubscription

Attributes

context[R]
leader[R]
state[R]
worker_name[R]

Public Class Methods

new(context) click to toggle source
# File lib/rjob/worker_process.rb, line 16
def initialize(context)
  @context = context
  @prefix = @context.prefix
  @pubsub_redis = @context.create_redis_connection

  init_worker_name

  @iteration_no = 0
  @max_queue_size = 20
  max_threads = @context.config.fetch(:max_threads, 2)

  @subscription_mutex = Mutex.new
  @subscription_thread = nil
  @thread_pool = Concurrent::ThreadPoolExecutor.new(
    min_threads: [2, max_threads].min,
    max_threads: max_threads,
    max_queue: @max_queue_size,
    fallback_policy: :abort # Concurrent::RejectedExecutionError
  )

  @processed_count = Concurrent::AtomicFixnum.new
  @failed_count = Concurrent::AtomicFixnum.new
  @returned_count = Concurrent::AtomicFixnum.new

  @leader = nil
  @state = :new
end

Public Instance Methods

run_forever() click to toggle source
# File lib/rjob/worker_process.rb, line 44
def run_forever
  register_worker

  Signal.trap("INT") do
    if @state == :exiting
      puts "Force exit requested. Exiting immediately"
      exit 1
    else
      @state = :exiting
      puts "Exiting..."
    end
  end

  @state = :running
  loop do
    break if @state == :exited
    run_iteration
  end
ensure
  unregister_worker
end

Private Instance Methods

check_leadership() click to toggle source
# File lib/rjob/worker_process.rb, line 137
def check_leadership
  @context.redis do |r|
    if leader? && @state == :exiting
      r.call('del', "#{@prefix}:leaderworker")
      return
    end

    @leader = @context.script_runner.exec(r, :check_leadership,
      [], [
        @worker_name,
        Time.now.to_i,
        @prefix,
        HEARTBEAT_TIMEOUT
      ])
  end
end
disable_subscription_thread() click to toggle source
# File lib/rjob/worker_process.rb, line 68
def disable_subscription_thread
  return unless @subscription_thread
  @subscription_mutex.synchronize do
    @subscription_thread.raise(StopSubscription.new)
  end

  @subscription_thread = nil
end
enable_subscription_thread() click to toggle source
# File lib/rjob/worker_process.rb, line 77
def enable_subscription_thread
  return if @subscription_thread

  @subscription_thread = Thread.new do
    begin
      @pubsub_redis.subscribe("#{@prefix}:jobs") do |on|
        on.message do |_, bucket_no|
          @pubsub_redis.unsubscribe unless @state == :running

          @subscription_mutex.synchronize do
            start_processing_message_from_bucket(bucket_no)
          end
        end
      end
    rescue StopSubscription => e
      @pubsub_redis.disconnect rescue nil
    rescue StandardError => e
      puts "staaahp -> #{e}"
      raise e
      exit 1
    end
  end

  @subscription_thread.run
end
enqueue_recurring_jobs() click to toggle source
# File lib/rjob/worker_process.rb, line 334
def enqueue_recurring_jobs
  recurring_jobs = @context.recurring_jobs
  return unless recurring_jobs

  # Make sure all classes are loaded without error
  recurring_jobs.each(&:job_class)

  @context.redis do |redis|
    recurring_jobs.each do |rj|
      rj.maybe_enqueue(redis)
    end
  end
end
enqueue_scheduled_jobs() click to toggle source
# File lib/rjob/worker_process.rb, line 348
def enqueue_scheduled_jobs
  time_now = Time.now.to_i
  job_limit = 100

  # Let's not be caught in an infinite loop. Thus, loop max 10 times
  10.times do
    re_run = false

    @context.redis do |r|
      (0...@context.bucket_count).each do |bucket|
        num_jobs = @context.script_runner.exec(r, :enqueue_scheduled_jobs,
          [
            "#{@prefix}:scheduled:#{bucket}",
            "#{@prefix}:jobs:#{bucket}",
            "#{@prefix}:jobs"
          ], [
            time_now, job_limit, bucket
          ])

        re_run = true if num_jobs == job_limit
      end
    end

    break unless re_run
  end
end
exercise_leadership() click to toggle source
# File lib/rjob/worker_process.rb, line 326
def exercise_leadership
  enqueue_scheduled_jobs

  scan_buckets

  enqueue_recurring_jobs
end
handle_job_processing_failure(bucket, job_processor) click to toggle source
# File lib/rjob/worker_process.rb, line 237
def handle_job_processing_failure(bucket, job_processor)
  job = job_processor.job
  error = job_processor.error

  if !error
    error = { message: "Unknown error" }
  end

  if @context.logger.respond_to?(:info)
    @context.logger.info("Job '#{job.worker_class_name}' with args '#{job.worker_args}' failed: #{error}")
  end

  if job_processor.stop_retry?
    move_job_to_dead(job_processor.job_str, bucket, error)
    return
  end

  retry_options = job.worker_class.retry_options

  if retry_options[:retry]
    exceptions = retry_options.fetch(:exceptions, [StandardError])
    should_handle = exceptions.any? { |e| e >= error[:error_class] }

    retry_proc = retry_options[:next_retry_proc] || (proc { |x| 3 * x ** 4 + 15 })
    max_retries = retry_options.fetch(:max_retries, 16) # retry for ~2 days

    new_retry_num = job.retry_num + 1

    if should_handle && new_retry_num <= max_retries
      next_retry_at = Time.now.to_i + retry_proc.call(new_retry_num)
      retry_job(job, bucket, next_retry_at)
      return
    end
  end

  move_job_to_dead(job_processor.job_str, bucket, error)
end
init_worker_name() click to toggle source
# File lib/rjob/worker_process.rb, line 320
def init_worker_name
  host = Socket.gethostname
  rand_factor = SecureRandom.alphanumeric(24)
  @worker_name = [host, rand_factor].join('-')
end
leader?() click to toggle source
# File lib/rjob/worker_process.rb, line 154
def leader?
  @leader && @leader == @worker_name
end
move_job_to_dead(job_str, bucket, error) click to toggle source

TODO: this should probably be in a single redis pipelined operation

# File lib/rjob/worker_process.rb, line 276
def move_job_to_dead(job_str, bucket, error)
  push_job_to_dead(job_str, bucket, error)
  remove_job_from_working(job_str, bucket)
end
push_job_to_dead(job_str, bucket, error) click to toggle source
# File lib/rjob/worker_process.rb, line 281
def push_job_to_dead(job_str, bucket, error)
  error_payload = MessagePack.pack({
    when: Time.now.to_i,
    error_class: error[:error_class].to_s,
    full_message: error[:message],
    job: job_str
  })

  @context.redis do |r|
    r.lpush("#{@prefix}:dead", error_payload)
  end
end
register_worker() click to toggle source
# File lib/rjob/worker_process.rb, line 305
def register_worker
  report_stats

  @context.redis do |r|
    r.lpush("#{@prefix}:workers", @worker_name)
  end
end
remove_job_from_working(job_str, bucket) click to toggle source
# File lib/rjob/worker_process.rb, line 217
def remove_job_from_working(job_str, bucket)
  @context.redis do |r|
    r.lrem("#{@prefix}:jobs:#{bucket}:working", 1, job_str)
  end
end
report_stats() click to toggle source
# File lib/rjob/worker_process.rb, line 158
def report_stats
  key_prefix = "#{@prefix}:worker:#{@worker_name}"
  state_data = {
    heartbeat: Time.now.to_i,
    queue_length: @thread_pool.queue_length,
    processed: @processed_count.value,
    failed: @failed_count.value,
    returned: @returned_count.value,
    state: @state
  }

  @context.redis do |r|
    r.pipelined do |pl|
      state_data.each do |k, v|
        pl.hset(key_prefix, k, v.to_s)
      end
    end
  end
end
retry_job(job, bucket, next_retry_at) click to toggle source
# File lib/rjob/worker_process.rb, line 223
def retry_job(job, bucket, next_retry_at)
  @context.redis do |r|
    @context.script_runner.exec(r, :retry_job, [],
      [
        next_retry_at.to_s,
        job.retry_num.to_s,
        bucket.to_s,
        job.id.to_s,
        job.payload,
        @prefix
      ])
  end
end
return_job_execution(job, bucket) click to toggle source

When a job previously went to working state and we want to put it back (re-enqueue it).

This mostly happens when we picked a job for processing but realized that we don’t actually have the resources to process it at the moment.

# File lib/rjob/worker_process.rb, line 299
def return_job_execution(job, bucket)
  @context.redis do |r|
    @context.script_runner.exec(r, :return_job_execution, [], [job, bucket, @prefix])
  end
end
run_iteration() click to toggle source
# File lib/rjob/worker_process.rb, line 103
def run_iteration
  stop_threshold = (@max_queue_size * 0.7).to_i
  if @thread_pool.queue_length >= stop_threshold || @state != :running
    disable_subscription_thread
  elsif @state == :running
    if !@subscription_thread
      enable_subscription_thread
      sleep(ITERATION_TIMEOUT)
      scan_buckets
    end
  end

  if @state == :exiting
    if @thread_pool.shutdown?
      @state = :exited
    elsif !@thread_pool.shuttingdown?
      @thread_pool.shutdown
    else
      puts "Waiting shutdown..."
    end
  end

  report_stats

  check_leadership

  if leader? && @state == :running
    exercise_leadership if @iteration_no % 2 == 0
  end

  @iteration_no += 1
  sleep(ITERATION_TIMEOUT) unless @state == :exited
end
scan_buckets() click to toggle source
# File lib/rjob/worker_process.rb, line 178
def scan_buckets
  @context.redis do |r|
    @context.script_runner.exec(r, :scan_buckets, [], [@prefix, @context.bucket_count])
  end
end
start_processing_message_from_bucket(bucket) click to toggle source
# File lib/rjob/worker_process.rb, line 184
def start_processing_message_from_bucket(bucket)
  job_str = @context.redis do |r|
    r.rpoplpush("#{@prefix}:jobs:#{bucket}", "#{@prefix}:jobs:#{bucket}:working")
  end

  return false if job_str == nil

  job_str = job_str.b

  # move to inside thread
  job_processor = Rjob::JobProcessor.new(context, job_str)

  begin
    @thread_pool.post do
      using_app_wrapper do
        job_processor.run
      end

      if !job_processor.success?
        @failed_count.increment
        handle_job_processing_failure(bucket, job_processor)
      else
        remove_job_from_working(job_str, bucket)
      end
    end
  rescue Concurrent::RejectedExecutionError
    @returned_count.increment
    return_job_execution(job_str, bucket)
  ensure
    @processed_count.increment
  end
end
unregister_worker() click to toggle source
# File lib/rjob/worker_process.rb, line 313
def unregister_worker
  @context.redis do |r|
    r.lrem("#{@prefix}:workers", 1, @worker_name)
    r.del("#{@prefix}:worker:#{@worker_name}")
  end
end
using_app_wrapper(&blk) click to toggle source
# File lib/rjob/worker_process.rb, line 375
def using_app_wrapper(&blk)
  call_block = if @context.job_wrapper_proc != nil
    proc do
      @context.job_wrapper_proc.call(blk)
    end
  else
    blk
  end

  if defined?(::Rails)
    ::Rails.application.executor.wrap(&call_block)
  else
    call_block.call
  end
end