module Resque::Scheduler
Constants
- CLI_OPTIONS_ENV_MAPPING
- INTERMITTENT_ERRORS
- VERSION
Attributes
allow user to set an additional failure handler
the Rufus::Scheduler jobs that are scheduled
Public Class Methods
# File lib/resque/scheduler.rb, line 446 def before_shutdown stop_rufus_scheduler release_master_lock end
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
# File lib/resque/scheduler.rb, line 360 def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil @scheduled_jobs = {} rufus_scheduler end
# File lib/resque/scheduler.rb, line 285 def enqueue(config) enqueue_from_config(config) rescue => e Resque::Scheduler.failure_handler.on_enqueue_failure(config, e) end
Enqueues all delayed jobs for a timestamp
# File lib/resque/scheduler.rb, line 208 def enqueue_delayed_items_for_timestamp(timestamp) count = 0 batch_size = delayed_requeue_batch_size actual_batch_size = nil log "Processing delayed items for timestamp #{timestamp}, in batches of #{batch_size}" loop do handle_shutdown do # Continually check that it is still the master if am_master actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp, batch_size) end end count += actual_batch_size log "queued #{count} jobs" if actual_batch_size != -1 # continue processing until there are no more items in this # timestamp. If we don't have a full batch, this is the last one. # This also breaks us in the event of a redis transaction failure # i.e. enqueue_items_in_batch_for_timestamp returned -1 break if actual_batch_size < batch_size end log "finished queueing #{count} total jobs for timestamp #{timestamp}" if count != -1 end
Enqueues a job based on a config hash
# File lib/resque/scheduler.rb, line 298 def enqueue_from_config(job_config) args = job_config['args'] || job_config[:args] klass_name = job_config['class'] || job_config[:class] begin klass = Resque::Scheduler::Util.constantize(klass_name) rescue NameError klass = klass_name end params = args.is_a?(Hash) ? [args] : Array(args) queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(klass) # Support custom job classes like those that inherit from # Resque::JobWithStatus (resque-status) job_klass = job_config['custom_job_class'] if job_klass && job_klass != 'Resque::Job' # The custom job class API must offer a static "scheduled" method. If # the custom job class can not be constantized (via a requeue call # from the web perhaps), fall back to enqueuing normally via # Resque::Job.create. begin Resque::Scheduler::Util.constantize(job_klass).scheduled( queue, klass_name, *params ) rescue NameError # Note that the custom job class (job_config['custom_job_class']) # is the one enqueued Resque::Job.create(queue, job_klass, *params) end else # Hack to avoid havoc for people shoving stuff into queues # for non-existent classes (for example: running scheduler in # one app that schedules for another. if Class === klass Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks( klass, *params ) # If the class is a custom job class, call self#scheduled on it. # This allows you to do things like Resque.enqueue_at(timestamp, # CustomJobClass). Otherwise, pass off to Resque. if klass.respond_to?(:scheduled) klass.scheduled(queue, klass_name, *params) else Resque.enqueue_to(queue, klass, *params) end else # This will not run the before_hooks in rescue, but will at least # queue the job. Resque::Job.create(queue, klass, *params) end end end
# File lib/resque/scheduler.rb, line 241 def enqueue_items_in_batch_for_timestamp(timestamp, batch_size) timestamp_bucket_key = timestamp_key(timestamp) encoded_jobs_to_requeue = Resque.redis.lrange(timestamp_bucket_key, 0, batch_size - 1) # Watch is used to ensure that the timestamp bucket we are operating on # is not altered by any other clients between the watch call and when we call exec # (to execute the multi block). We should error catch on the redis.exec return value # as that will indicate if the entire transaction was aborted or not. Though we should # be safe as our ltrim is inside the multi block and therefore also would have been # aborted. So nothing would have been queued, but also nothing lost from the bucket. watch_result = Resque.redis.watch(timestamp_bucket_key) do Resque.redis.multi do |pipeline| encoded_jobs_to_requeue.each do |encoded_job| pipeline.srem("timestamps:#{encoded_job}", timestamp_bucket_key) decoded_job = Resque.decode(encoded_job) enqueue(decoded_job) end pipeline.ltrim(timestamp_bucket_key, batch_size, -1) end end # Did the multi block successfully remove from this timestamp and enqueue the jobs? success = !watch_result.nil? # If this was the last batch in this timestamp bucket, clean up if success && encoded_jobs_to_requeue.count < batch_size Resque.clean_up_timestamp(timestamp_bucket_key, timestamp) end unless success # Our batched transaction failed in Redis due to the timestamp_bucket_key value # being modified while we built our multi block. We return -1 to ensure we break # out of the loop iterating on this timestamp so it can be re-processed via the # loop in handle_delayed_items. return -1 end # will return 0 if none were left to batch encoded_jobs_to_requeue.count end
# File lib/resque/scheduler.rb, line 196 def enqueue_next_item(timestamp) item = Resque.next_item_for_timestamp(timestamp) if item log "queuing #{item['class']} [delayed]" enqueue(item) end item end
Returns true if the current env is non-nil and the configured env (which is a comma-split string) includes the current env.
# File lib/resque/scheduler.rb, line 179 def env_matches?(configured_env) env && configured_env.split(/[\s,]+/).include?(env) end
# File lib/resque/scheduler.rb, line 478 def failure_handler @failure_handler ||= Resque::Scheduler::FailureHandler end
Handles queueing delayed items at_time - Time to start scheduling items (default: now).
# File lib/resque/scheduler.rb, line 185 def handle_delayed_items(at_time = nil) timestamp = Resque.next_delayed_timestamp(at_time) if timestamp procline 'Processing Delayed Items' until timestamp.nil? enqueue_delayed_items_for_timestamp(timestamp) timestamp = Resque.next_delayed_timestamp(at_time) end end end
# File lib/resque/scheduler.rb, line 291 def handle_shutdown exit if @shutdown yield exit if @shutdown end
# File lib/resque/scheduler.rb, line 433 def handle_signals_with_operation yield if block_given? handle_signals false rescue Interrupt before_shutdown if @shutdown true end
Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance
# File lib/resque/scheduler.rb, line 97 def load_schedule! procline 'Loading Schedule' # Need to load the schedule from redis for the first time if dynamic Resque.reload_schedule! if dynamic log! 'Schedule empty! Set Resque.schedule' if Resque.schedule.empty? @scheduled_jobs = {} Resque.schedule.each do |name, config| load_schedule_job(name, config) end Resque.redis.del(:schedules_changed) if am_master && dynamic procline 'Schedules Loaded' end
Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs
# File lib/resque/scheduler.rb, line 134 def load_schedule_job(name, config) # If `rails_env` or `env` is set in the config, load jobs only if they # are meant to be loaded in `Resque::Scheduler.env`. If `rails_env` or # `env` is missing, the job should be scheduled regardless of the value # of `Resque::Scheduler.env`. configured_env = config['rails_env'] || config['env'] if configured_env.nil? || env_matches?(configured_env) log! "Scheduling #{name} " interval_defined = false interval_types = %w(cron every) interval_types.each do |interval_type| next unless !config[interval_type].nil? && !config[interval_type].empty? args = optionizate_interval_value(config[interval_type]) args = [args, nil, job: true] if args.is_a?(::String) job = rufus_scheduler.send(interval_type, *args) do enqueue_recurring(name, config) end @scheduled_jobs[name] = job interval_defined = true break end unless interval_defined log! "no #{interval_types.join(' / ')} found for " \ "#{config['class']} (#{name}) - skipping" end else log "Skipping schedule of #{name} because configured " \ "env #{configured_env.inspect} does not match current " \ "env #{env.inspect}" end end
# File lib/resque/scheduler.rb, line 467 def log(msg) logger.debug { msg } end
# File lib/resque/scheduler.rb, line 459 def log!(msg) logger.info { msg } end
# File lib/resque/scheduler.rb, line 463 def log_error(msg) logger.error { msg } end
# File lib/resque/scheduler.rb, line 482 def logger @logger ||= Resque::Scheduler::LoggerBuilder.new( quiet: quiet, verbose: verbose, log_dev: logfile, format: logformat ).build end
modify interval type value to value with options if options available
# File lib/resque/scheduler.rb, line 115 def optionizate_interval_value(value) args = value if args.is_a?(::Array) return args.first if args.size > 2 || !args.last.is_a?(::Hash) # symbolize keys of hash for options args[2] = args[1].reduce({}) do |m, i| key, value = i m[(key.respond_to?(:to_sym) ? key.to_sym : key) || key] = value m end args[2][:job] = true args[1] = nil end args end
Sleeps and returns true
# File lib/resque/scheduler.rb, line 400 def poll_sleep handle_shutdown do begin poll_sleep_loop ensure @sleeping = false end end true end
# File lib/resque/scheduler.rb, line 411 def poll_sleep_loop @sleeping = true if poll_sleep_amount > 0 start = Time.now loop do elapsed_sleep = (Time.now - start) remaining_sleep = poll_sleep_amount - elapsed_sleep @do_break = false if remaining_sleep <= 0 @do_break = true else @do_break = handle_signals_with_operation do sleep(remaining_sleep) end end break if @do_break end else handle_signals_with_operation end end
# File lib/resque/scheduler.rb, line 85 def print_schedule if rufus_scheduler log! "Scheduling Info\tLast Run" scheduler_jobs = rufus_scheduler.jobs scheduler_jobs.each do |_k, v| log! "#{v.t}\t#{v.last}\t" end end end
# File lib/resque/scheduler.rb, line 471 def procline(string) log! string argv0 = build_procline(string) log "Setting procline #{argv0.inspect}" $0 = argv0 end
Returns true if the given schedule config hash matches the current env
# File lib/resque/scheduler.rb, line 170 def rails_env_matches?(config) warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' \ 'Please use `Resque::Scheduler.env_matches?` instead.' config['rails_env'] && env && config['rails_env'].split(/[\s,]+/).include?(env) end
# File lib/resque/scheduler.rb, line 367 def reload_schedule! procline 'Reloading Schedule' clear_schedule! load_schedule! end
# File lib/resque/scheduler.rb, line 354 def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.new end
Schedule all jobs and continually look for delayed jobs (never returns)
# File lib/resque/scheduler.rb, line 39 def run procline 'Starting' # trap signals register_signal_handlers # Quote from the resque/worker. # Fix buffering so we can `rake resque:scheduler > scheduler.log` and # get output from the child in there. $stdout.sync = true $stderr.sync = true was_master = nil begin @th = Thread.current # Now start the scheduling part of the loop. loop do begin # Check on changes to master/child @am_master = master? if am_master != was_master procline am_master ? 'Master scheduler' : 'Child scheduler' # Load schedule because changed reload_schedule! end if am_master handle_delayed_items update_schedule if dynamic end was_master = am_master rescue *INTERMITTENT_ERRORS => e log! e.message release_master_lock end poll_sleep end rescue Interrupt log 'Exiting' end end
Sets the shutdown flag, clean schedules and exits if sleeping
# File lib/resque/scheduler.rb, line 452 def shutdown return if @shutdown @shutdown = true log!('Shutting down') @th.raise Interrupt if @sleeping end
# File lib/resque/scheduler.rb, line 442 def stop_rufus_scheduler rufus_scheduler.shutdown(:wait) end
# File lib/resque/scheduler.rb, line 237 def timestamp_key(timestamp) "delayed:#{timestamp.to_i}" end
# File lib/resque/scheduler.rb, line 391 def unschedule_job(name) if scheduled_jobs[name] log "Removing schedule #{name}" scheduled_jobs[name].unschedule @scheduled_jobs.delete(name) end end
# File lib/resque/scheduler.rb, line 373 def update_schedule if Resque.redis.scard(:schedules_changed) > 0 procline 'Updating schedule' loop do schedule_name = Resque.redis.spop(:schedules_changed) break unless schedule_name Resque.reload_schedule! if Resque.schedule.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Resque.schedule[schedule_name]) else unschedule_job(schedule_name) end end procline 'Schedules Loaded' end end
Private Class Methods
# File lib/resque/scheduler.rb, line 517 def am_master @am_master = master? unless defined?(@am_master) @am_master end
# File lib/resque/scheduler.rb, line 501 def app_str app_name ? "[#{app_name}]" : '' end
# File lib/resque/scheduler.rb, line 509 def build_procline(string) "#{internal_name}#{app_str}#{env_str}: #{string}" end
# File lib/resque/scheduler.rb, line 493 def enqueue_recurring(name, config) if am_master log! "queueing #{config['class']} (#{name})" enqueue(config) Resque.last_enqueued_at(name, Time.now.to_s) end end
# File lib/resque/scheduler.rb, line 505 def env_str env ? "[#{env}]" : '' end
# File lib/resque/scheduler.rb, line 513 def internal_name "resque-scheduler-#{Resque::Scheduler::VERSION}" end