class Delayed::Master::JobChecker

Public Class Methods

new(master) click to toggle source
# File lib/delayed/master/job_checker.rb, line 6
def initialize(master)
  @master = master
  @config = master.config
  @spec_names = target_spec_names

  define_models
  extend_after_fork_callback
end

Public Instance Methods

check() click to toggle source
# File lib/delayed/master/job_checker.rb, line 15
def check
  workers = []
  mon = Monitor.new

  threads = @spec_names.map do |spec_name|
    Thread.new(spec_name) do |spec_name|
      find_jobs_in_db(spec_name) do |setting|
        mon.synchronize do
          workers << Worker.new(index: @master.workers.size + workers.size, database: spec_name, setting: setting)
        end
      end
    end
  end

  threads.each(&:join)

  workers
end

Private Instance Methods

define_models() click to toggle source
# File lib/delayed/master/job_checker.rb, line 36
def define_models
  @spec_names.each do |spec_name|
    klass = Class.new(Delayed::Job)
    klass_name = "DelayedJob#{spec_name.capitalize}"
    unless Delayed::Master.const_defined?(klass_name)
      Delayed::Master.const_set(klass_name, klass)
      Delayed::Master.const_get(klass_name).establish_connection(spec_name)
    end
  end
end
extend_after_fork_callback() click to toggle source
# File lib/delayed/master/job_checker.rb, line 51
def extend_after_fork_callback
  prc = @config.after_fork
  @config.after_fork do |master, worker|
    prc.call(master, worker)
    ActiveRecord::Base.establish_connection(worker.database) if worker.database
  end
end
find_jobs_in_db(spec_name) { |setting| ... } click to toggle source
# File lib/delayed/master/job_checker.rb, line 81
def find_jobs_in_db(spec_name)
  counter = JobCounter.new(model_for(spec_name))

  @config.worker_settings.each do |setting|
    count = @master.workers.count { |worker| worker.setting.queues == setting.queues }
    slot = setting.count - count
    if slot > 0 && (job_count = counter.count(setting)) > 0
      [slot, job_count].min.times do
        yield setting
      end
    end
  end
end
has_delayed_job_table?(spec_name) click to toggle source
# File lib/delayed/master/job_checker.rb, line 76
def has_delayed_job_table?(spec_name)
  ActiveRecord::Base.establish_connection(spec_name)
  ActiveRecord::Base.connection.tables.include?('delayed_jobs')
end
load_spec_names() click to toggle source
# File lib/delayed/master/job_checker.rb, line 67
def load_spec_names
  if Rails::VERSION::MAJOR >= 6
    configs = ActiveRecord::Base.configurations.configs_for(env_name: Rails.env)
    configs.reject(&:replica?).map { |c| c.spec_name.to_sym }
  else
    [Rails.env.to_sym]
  end
end
model_for(spec_name) click to toggle source
# File lib/delayed/master/job_checker.rb, line 47
def model_for(spec_name)
  Delayed::Master.const_get("DelayedJob#{spec_name.capitalize}")
end
target_spec_names() click to toggle source
# File lib/delayed/master/job_checker.rb, line 59
def target_spec_names
  if @config.databases.nil? || @config.databases.empty?
    load_spec_names.select { |spec_name| has_delayed_job_table?(spec_name) }
  else
    @config.databases
  end
end