class Rjob::Context
Attributes
bucket_count[R]
config[R]
job_wrapper_proc[R]
logger[R]
prefix[R]
recurring_jobs[R]
script_runner[R]
Public Class Methods
configure() { |config| ... }
click to toggle source
Available options:
:redis - (passed to Redis.new) :max_threads - paralallelism :bucket_count - defaults to 32 :redis_pool_size - redis connection pool size. Defaults to 10 :prefix - defaults to “rjob” :job_wrapper_proc - defaults to none :logger - duck-typed Logger, defaults to nil
# File lib/rjob/context.rb, line 31 def self.configure raise "Already configured!: #{@instance}" if @instance config = {} yield(config) set_instance(new(config)) end
instance()
click to toggle source
# File lib/rjob/context.rb, line 12 def self.instance return @instance if @instance raise "Rjob is not configured. Please call Rjob.configure first" end
new(config)
click to toggle source
# File lib/rjob/context.rb, line 38 def initialize(config) @config = config.dup @pool_size = @config.fetch(:redis_pool_size, 10) @bucket_count = config.fetch(:bucket_count, 32) @prefix = config.fetch(:prefix, 'rjob') @logger = config[:logger] @job_wrapper_proc = config[:job_wrapper_proc] @script_runner = Rjob::Scripts::ScriptRunner.new @recurring_jobs = nil if config.key?(:recurring_jobs) require "rjob/recurring" @recurring_jobs = config[:recurring_jobs].map do |defn| Rjob::RecurringJob.from_definition(self, defn) end end initialize_connection_pool load_redis_scripts end
set_instance(instance)
click to toggle source
# File lib/rjob/context.rb, line 17 def self.set_instance(instance) @instance = instance end
Public Instance Methods
create_redis_connection()
click to toggle source
# File lib/rjob/context.rb, line 94 def create_redis_connection redis_args = @config[:redis] Redis.new(redis_args) end
demodularize_class(name)
click to toggle source
# File lib/rjob/context.rb, line 86 def demodularize_class(name) const = Kernel name.split('::').each do |n| const = const.const_get(n) end const end
enqueue_job(job_class, args)
click to toggle source
# File lib/rjob/context.rb, line 65 def enqueue_job(job_class, args) redis(&method(:enqueue_job_with_redis).curry[job_class, args]) end
enqueue_job_with_redis(job_class, args, r)
click to toggle source
# File lib/rjob/context.rb, line 69 def enqueue_job_with_redis(job_class, args, r) job_data = MessagePack.pack([job_class.to_s, args]) @script_runner.exec(r, :enqueue_job, [], [@prefix, @bucket_count, job_data]) end
fetch_worker_class(class_name:)
click to toggle source
# File lib/rjob/context.rb, line 82 def fetch_worker_class(class_name:) demodularize_class(class_name) end
redis(&block)
click to toggle source
# File lib/rjob/context.rb, line 61 def redis(&block) @pool.with(&block) end
schedule_job_at(timestamp, job_class, args)
click to toggle source
# File lib/rjob/context.rb, line 74 def schedule_job_at(timestamp, job_class, args) job_data = MessagePack.pack([job_class.to_s, args]) redis do |r| @script_runner.exec(r, :schedule_job_at, [], [timestamp.to_s, job_data, @prefix, @bucket_count]) end end
Private Instance Methods
initialize_connection_pool()
click to toggle source
# File lib/rjob/context.rb, line 107 def initialize_connection_pool @pool = ConnectionPool.new(size: @pool_size) { create_redis_connection } end
load_redis_scripts()
click to toggle source
# File lib/rjob/context.rb, line 101 def load_redis_scripts @pool.with do |redis| @script_runner.load_all_scripts(redis) end end