class BackRun::Worker

Attributes

queues[R]

Public Class Methods

new(pubsub, queues, threads, pool = nil) click to toggle source
# File lib/back_run/worker.rb, line 9
def initialize(pubsub, queues, threads, pool = nil)
  @queues = queues
  @pubsub = pubsub
  @pool = pool || Concurrent::ThreadPoolExecutor.new(
    min_threads: 1, max_threads: threads, max_queue: 1
  )
end

Public Instance Methods

kill_job(job) click to toggle source
# File lib/back_run/worker.rb, line 39
def kill_job(job)
  @pubsub.kill_job(job)
end
message_received(job, ack, modify_ack_deadline) click to toggle source
# File lib/back_run/worker.rb, line 22
def message_received(job, ack, modify_ack_deadline)
  if job.should_run_now?
    ack.call
    run_job(job)
  else
    remaining_seconds = job.remaining_seconds_to_run
    modify_ack_deadline.call(remaining_seconds)
  end
rescue Concurrent::RejectedExecutionError
  BackRun.logger.info('Thread pool busy. Republishing the job')
  retry_job(job)
end
retry_job(job) click to toggle source
# File lib/back_run/worker.rb, line 35
def retry_job(job)
  @pubsub.publish(job)
end
start_listening!() click to toggle source
# File lib/back_run/worker.rb, line 17
def start_listening!
  @pubsub.subscribe(self)
  sleep
end

Private Instance Methods

run_job(job) click to toggle source
# File lib/back_run/worker.rb, line 45
def run_job(job)
  @pool.post do
    job.perform(self)
    MetricsCollector.job_executed(job)
  end
end