class Rjob::Context

Attributes

bucket_count[R]
config[R]
job_wrapper_proc[R]
logger[R]
prefix[R]
recurring_jobs[R]
script_runner[R]

Public Class Methods

configure() { |config| ... } click to toggle source

Available options:

:redis - (passed to Redis.new) :max_threads - paralallelism :bucket_count - defaults to 32 :redis_pool_size - redis connection pool size. Defaults to 10 :prefix - defaults to “rjob” :job_wrapper_proc - defaults to none :logger - duck-typed Logger, defaults to nil

# File lib/rjob/context.rb, line 31
def self.configure
  raise "Already configured!: #{@instance}" if @instance
  config = {}
  yield(config)
  set_instance(new(config))
end
instance() click to toggle source
# File lib/rjob/context.rb, line 12
def self.instance
  return @instance if @instance
  raise "Rjob is not configured. Please call Rjob.configure first"
end
new(config) click to toggle source
# File lib/rjob/context.rb, line 38
def initialize(config)
  @config = config.dup
  @pool_size = @config.fetch(:redis_pool_size, 10)

  @bucket_count = config.fetch(:bucket_count, 32)
  @prefix = config.fetch(:prefix, 'rjob')
  @logger = config[:logger]
  @job_wrapper_proc = config[:job_wrapper_proc]
  @script_runner = Rjob::Scripts::ScriptRunner.new
  @recurring_jobs = nil

  if config.key?(:recurring_jobs)
    require "rjob/recurring"

    @recurring_jobs = config[:recurring_jobs].map do |defn|
      Rjob::RecurringJob.from_definition(self, defn)
    end
  end

  initialize_connection_pool
  load_redis_scripts
end
set_instance(instance) click to toggle source
# File lib/rjob/context.rb, line 17
def self.set_instance(instance)
  @instance = instance
end

Public Instance Methods

create_redis_connection() click to toggle source
# File lib/rjob/context.rb, line 94
def create_redis_connection
  redis_args = @config[:redis]
  Redis.new(redis_args)
end
demodularize_class(name) click to toggle source
# File lib/rjob/context.rb, line 86
def demodularize_class(name)
  const = Kernel
  name.split('::').each do |n|
    const = const.const_get(n)
  end
  const
end
enqueue_job(job_class, args) click to toggle source
# File lib/rjob/context.rb, line 65
def enqueue_job(job_class, args)
  redis(&method(:enqueue_job_with_redis).curry[job_class, args])
end
enqueue_job_with_redis(job_class, args, r) click to toggle source
# File lib/rjob/context.rb, line 69
def enqueue_job_with_redis(job_class, args, r)
  job_data = MessagePack.pack([job_class.to_s, args])
  @script_runner.exec(r, :enqueue_job, [], [@prefix, @bucket_count, job_data])
end
fetch_worker_class(class_name:) click to toggle source
# File lib/rjob/context.rb, line 82
def fetch_worker_class(class_name:)
  demodularize_class(class_name)
end
redis(&block) click to toggle source
# File lib/rjob/context.rb, line 61
def redis(&block)
  @pool.with(&block)
end
schedule_job_at(timestamp, job_class, args) click to toggle source
# File lib/rjob/context.rb, line 74
def schedule_job_at(timestamp, job_class, args)
  job_data = MessagePack.pack([job_class.to_s, args])

  redis do |r|
    @script_runner.exec(r, :schedule_job_at, [], [timestamp.to_s, job_data, @prefix, @bucket_count])
  end
end

Private Instance Methods

initialize_connection_pool() click to toggle source
# File lib/rjob/context.rb, line 107
def initialize_connection_pool
  @pool = ConnectionPool.new(size: @pool_size) { create_redis_connection }
end
load_redis_scripts() click to toggle source
# File lib/rjob/context.rb, line 101
def load_redis_scripts
  @pool.with do |redis|
    @script_runner.load_all_scripts(redis)
  end
end