class Rjob::WorkerProcess
TODO: find a mechanism to recover from jobs that went to working but never returned
Constants
- HEARTBEAT_TIMEOUT
- ITERATION_TIMEOUT
- StopSubscription
Attributes
context[R]
leader[R]
state[R]
worker_name[R]
Public Class Methods
new(context)
click to toggle source
# File lib/rjob/worker_process.rb, line 16 def initialize(context) @context = context @prefix = @context.prefix @pubsub_redis = @context.create_redis_connection init_worker_name @iteration_no = 0 @max_queue_size = 20 max_threads = @context.config.fetch(:max_threads, 2) @subscription_mutex = Mutex.new @subscription_thread = nil @thread_pool = Concurrent::ThreadPoolExecutor.new( min_threads: [2, max_threads].min, max_threads: max_threads, max_queue: @max_queue_size, fallback_policy: :abort # Concurrent::RejectedExecutionError ) @processed_count = Concurrent::AtomicFixnum.new @failed_count = Concurrent::AtomicFixnum.new @returned_count = Concurrent::AtomicFixnum.new @leader = nil @state = :new end
Public Instance Methods
run_forever()
click to toggle source
# File lib/rjob/worker_process.rb, line 44 def run_forever register_worker Signal.trap("INT") do if @state == :exiting puts "Force exit requested. Exiting immediately" exit 1 else @state = :exiting puts "Exiting..." end end @state = :running loop do break if @state == :exited run_iteration end ensure unregister_worker end
Private Instance Methods
check_leadership()
click to toggle source
# File lib/rjob/worker_process.rb, line 137 def check_leadership @context.redis do |r| if leader? && @state == :exiting r.call('del', "#{@prefix}:leaderworker") return end @leader = @context.script_runner.exec(r, :check_leadership, [], [ @worker_name, Time.now.to_i, @prefix, HEARTBEAT_TIMEOUT ]) end end
disable_subscription_thread()
click to toggle source
# File lib/rjob/worker_process.rb, line 68 def disable_subscription_thread return unless @subscription_thread @subscription_mutex.synchronize do @subscription_thread.raise(StopSubscription.new) end @subscription_thread = nil end
enable_subscription_thread()
click to toggle source
# File lib/rjob/worker_process.rb, line 77 def enable_subscription_thread return if @subscription_thread @subscription_thread = Thread.new do begin @pubsub_redis.subscribe("#{@prefix}:jobs") do |on| on.message do |_, bucket_no| @pubsub_redis.unsubscribe unless @state == :running @subscription_mutex.synchronize do start_processing_message_from_bucket(bucket_no) end end end rescue StopSubscription => e @pubsub_redis.disconnect rescue nil rescue StandardError => e puts "staaahp -> #{e}" raise e exit 1 end end @subscription_thread.run end
enqueue_recurring_jobs()
click to toggle source
# File lib/rjob/worker_process.rb, line 334 def enqueue_recurring_jobs recurring_jobs = @context.recurring_jobs return unless recurring_jobs # Make sure all classes are loaded without error recurring_jobs.each(&:job_class) @context.redis do |redis| recurring_jobs.each do |rj| rj.maybe_enqueue(redis) end end end
enqueue_scheduled_jobs()
click to toggle source
# File lib/rjob/worker_process.rb, line 348 def enqueue_scheduled_jobs time_now = Time.now.to_i job_limit = 100 # Let's not be caught in an infinite loop. Thus, loop max 10 times 10.times do re_run = false @context.redis do |r| (0...@context.bucket_count).each do |bucket| num_jobs = @context.script_runner.exec(r, :enqueue_scheduled_jobs, [ "#{@prefix}:scheduled:#{bucket}", "#{@prefix}:jobs:#{bucket}", "#{@prefix}:jobs" ], [ time_now, job_limit, bucket ]) re_run = true if num_jobs == job_limit end end break unless re_run end end
exercise_leadership()
click to toggle source
# File lib/rjob/worker_process.rb, line 326 def exercise_leadership enqueue_scheduled_jobs scan_buckets enqueue_recurring_jobs end
handle_job_processing_failure(bucket, job_processor)
click to toggle source
# File lib/rjob/worker_process.rb, line 237 def handle_job_processing_failure(bucket, job_processor) job = job_processor.job error = job_processor.error if !error error = { message: "Unknown error" } end if @context.logger.respond_to?(:info) @context.logger.info("Job '#{job.worker_class_name}' with args '#{job.worker_args}' failed: #{error}") end if job_processor.stop_retry? move_job_to_dead(job_processor.job_str, bucket, error) return end retry_options = job.worker_class.retry_options if retry_options[:retry] exceptions = retry_options.fetch(:exceptions, [StandardError]) should_handle = exceptions.any? { |e| e >= error[:error_class] } retry_proc = retry_options[:next_retry_proc] || (proc { |x| 3 * x ** 4 + 15 }) max_retries = retry_options.fetch(:max_retries, 16) # retry for ~2 days new_retry_num = job.retry_num + 1 if should_handle && new_retry_num <= max_retries next_retry_at = Time.now.to_i + retry_proc.call(new_retry_num) retry_job(job, bucket, next_retry_at) return end end move_job_to_dead(job_processor.job_str, bucket, error) end
init_worker_name()
click to toggle source
# File lib/rjob/worker_process.rb, line 320 def init_worker_name host = Socket.gethostname rand_factor = SecureRandom.alphanumeric(24) @worker_name = [host, rand_factor].join('-') end
leader?()
click to toggle source
# File lib/rjob/worker_process.rb, line 154 def leader? @leader && @leader == @worker_name end
move_job_to_dead(job_str, bucket, error)
click to toggle source
TODO: this should probably be in a single redis pipelined operation
# File lib/rjob/worker_process.rb, line 276 def move_job_to_dead(job_str, bucket, error) push_job_to_dead(job_str, bucket, error) remove_job_from_working(job_str, bucket) end
push_job_to_dead(job_str, bucket, error)
click to toggle source
# File lib/rjob/worker_process.rb, line 281 def push_job_to_dead(job_str, bucket, error) error_payload = MessagePack.pack({ when: Time.now.to_i, error_class: error[:error_class].to_s, full_message: error[:message], job: job_str }) @context.redis do |r| r.lpush("#{@prefix}:dead", error_payload) end end
register_worker()
click to toggle source
# File lib/rjob/worker_process.rb, line 305 def register_worker report_stats @context.redis do |r| r.lpush("#{@prefix}:workers", @worker_name) end end
remove_job_from_working(job_str, bucket)
click to toggle source
# File lib/rjob/worker_process.rb, line 217 def remove_job_from_working(job_str, bucket) @context.redis do |r| r.lrem("#{@prefix}:jobs:#{bucket}:working", 1, job_str) end end
report_stats()
click to toggle source
# File lib/rjob/worker_process.rb, line 158 def report_stats key_prefix = "#{@prefix}:worker:#{@worker_name}" state_data = { heartbeat: Time.now.to_i, queue_length: @thread_pool.queue_length, processed: @processed_count.value, failed: @failed_count.value, returned: @returned_count.value, state: @state } @context.redis do |r| r.pipelined do |pl| state_data.each do |k, v| pl.hset(key_prefix, k, v.to_s) end end end end
retry_job(job, bucket, next_retry_at)
click to toggle source
# File lib/rjob/worker_process.rb, line 223 def retry_job(job, bucket, next_retry_at) @context.redis do |r| @context.script_runner.exec(r, :retry_job, [], [ next_retry_at.to_s, job.retry_num.to_s, bucket.to_s, job.id.to_s, job.payload, @prefix ]) end end
return_job_execution(job, bucket)
click to toggle source
When a job previously went to working state and we want to put it back (re-enqueue it).
This mostly happens when we picked a job for processing but realized that we don’t actually have the resources to process it at the moment.
# File lib/rjob/worker_process.rb, line 299 def return_job_execution(job, bucket) @context.redis do |r| @context.script_runner.exec(r, :return_job_execution, [], [job, bucket, @prefix]) end end
run_iteration()
click to toggle source
# File lib/rjob/worker_process.rb, line 103 def run_iteration stop_threshold = (@max_queue_size * 0.7).to_i if @thread_pool.queue_length >= stop_threshold || @state != :running disable_subscription_thread elsif @state == :running if !@subscription_thread enable_subscription_thread sleep(ITERATION_TIMEOUT) scan_buckets end end if @state == :exiting if @thread_pool.shutdown? @state = :exited elsif !@thread_pool.shuttingdown? @thread_pool.shutdown else puts "Waiting shutdown..." end end report_stats check_leadership if leader? && @state == :running exercise_leadership if @iteration_no % 2 == 0 end @iteration_no += 1 sleep(ITERATION_TIMEOUT) unless @state == :exited end
scan_buckets()
click to toggle source
# File lib/rjob/worker_process.rb, line 178 def scan_buckets @context.redis do |r| @context.script_runner.exec(r, :scan_buckets, [], [@prefix, @context.bucket_count]) end end
start_processing_message_from_bucket(bucket)
click to toggle source
# File lib/rjob/worker_process.rb, line 184 def start_processing_message_from_bucket(bucket) job_str = @context.redis do |r| r.rpoplpush("#{@prefix}:jobs:#{bucket}", "#{@prefix}:jobs:#{bucket}:working") end return false if job_str == nil job_str = job_str.b # move to inside thread job_processor = Rjob::JobProcessor.new(context, job_str) begin @thread_pool.post do using_app_wrapper do job_processor.run end if !job_processor.success? @failed_count.increment handle_job_processing_failure(bucket, job_processor) else remove_job_from_working(job_str, bucket) end end rescue Concurrent::RejectedExecutionError @returned_count.increment return_job_execution(job_str, bucket) ensure @processed_count.increment end end
unregister_worker()
click to toggle source
# File lib/rjob/worker_process.rb, line 313 def unregister_worker @context.redis do |r| r.lrem("#{@prefix}:workers", 1, @worker_name) r.del("#{@prefix}:worker:#{@worker_name}") end end
using_app_wrapper(&blk)
click to toggle source
# File lib/rjob/worker_process.rb, line 375 def using_app_wrapper(&blk) call_block = if @context.job_wrapper_proc != nil proc do @context.job_wrapper_proc.call(blk) end else blk end if defined?(::Rails) ::Rails.application.executor.wrap(&call_block) else call_block.call end end