class BackRun::Worker
Attributes
queues[R]
Public Class Methods
new(pubsub, queues, threads, pool = nil)
click to toggle source
# File lib/back_run/worker.rb, line 9 def initialize(pubsub, queues, threads, pool = nil) @queues = queues @pubsub = pubsub @pool = pool || Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: threads, max_queue: 1 ) end
Public Instance Methods
kill_job(job)
click to toggle source
# File lib/back_run/worker.rb, line 39 def kill_job(job) @pubsub.kill_job(job) end
message_received(job, ack, modify_ack_deadline)
click to toggle source
# File lib/back_run/worker.rb, line 22 def message_received(job, ack, modify_ack_deadline) if job.should_run_now? ack.call run_job(job) else remaining_seconds = job.remaining_seconds_to_run modify_ack_deadline.call(remaining_seconds) end rescue Concurrent::RejectedExecutionError BackRun.logger.info('Thread pool busy. Republishing the job') retry_job(job) end
retry_job(job)
click to toggle source
# File lib/back_run/worker.rb, line 35 def retry_job(job) @pubsub.publish(job) end
start_listening!()
click to toggle source
# File lib/back_run/worker.rb, line 17 def start_listening! @pubsub.subscribe(self) sleep end
Private Instance Methods
run_job(job)
click to toggle source
# File lib/back_run/worker.rb, line 45 def run_job(job) @pool.post do job.perform(self) MetricsCollector.job_executed(job) end end