class SidekiqScheduler::Scheduler
Attributes
Set to update the schedule in runtime in a given time period.
Set to update the schedule in runtime dynamically per this period.
Set to enable or disable the scheduler.
Set to schedule jobs only when will be pushed to queues listened by sidekiq
Set custom options for rufus scheduler, like max_work_threads.
Private Class Methods
# File lib/sidekiq-scheduler/scheduler.rb, line 40 def instance @instance = new unless @instance @instance end
# File lib/sidekiq-scheduler/scheduler.rb, line 45 def instance=(value) @instance = value end
# File lib/sidekiq-scheduler/scheduler.rb, line 49 def method_missing(method, *arguments, &block) instance_methods.include?(method) ? instance.public_send(method, *arguments) : super end
# File lib/sidekiq-scheduler/scheduler.rb, line 54 def initialize(config = SidekiqScheduler::Config.new(without_defaults: true)) @scheduler_config = config self.enabled = config.enabled? self.dynamic = config.dynamic? self.dynamic_every = config.dynamic_every? self.listened_queues_only = config.listened_queues_only? self.rufus_scheduler_options = config.rufus_scheduler_options || {} end
Private Instance Methods
Adds a Hash with schedule metadata as the last argument to call the worker. It currently returns the schedule time as a Float number representing the milliseconds since epoch.
@example with hash argument
arguments_with_metadata({value: 1}, scheduled_at: Time.now.round(3)) #=> [{value: 1}, {scheduled_at: <milliseconds since epoch>}]
@param args [Array|Hash] @param metadata [Hash] @return [Array] arguments with added metadata
# File lib/sidekiq-scheduler/scheduler.rb, line 316 def arguments_with_metadata(args, metadata) if args.is_a? Array [*args, metadata] else [args, metadata] end end
Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler
@param [Symbol] stop_option The option to be passed to Rufus::Scheduler#stop
# File lib/sidekiq-scheduler/scheduler.rb, line 193 def clear_schedule!(stop_option = :wait) if @rufus_scheduler @rufus_scheduler.stop(stop_option) @rufus_scheduler = nil end @scheduled_jobs = {} rufus_scheduler end
Returns true if a job’s queue is included in the array of queues
If queues are empty, returns true.
@param [String] job_queue Job’s queue name @param [Array<String>] queues
@return [Boolean]
# File lib/sidekiq-scheduler/scheduler.rb, line 332 def enabled_queue?(job_queue, queues) queues.empty? || queues.include?(job_queue) end
Enqueue a job based on a config hash
@param job_config [Hash] the job configuration @param time [Time] time the job is enqueued
# File lib/sidekiq-scheduler/scheduler.rb, line 171 def enqueue_job(job_config, time = Time.now) config = prepare_arguments(job_config.dup) if config.delete('include_metadata') config['args'] = arguments_with_metadata(config['args'], "scheduled_at" => time.to_f.round(3)) end if SidekiqScheduler::Utils.active_job_enqueue?(config['class']) SidekiqScheduler::Utils.enqueue_with_active_job(config) else SidekiqScheduler::Utils.enqueue_with_sidekiq(config) end end
# File lib/sidekiq-scheduler/scheduler.rb, line 364 def handle_errors begin yield rescue StandardError => e Sidekiq.logger.info "#{e.class.name}: #{e.message}" end end
Pushes the job into Sidekiq
if not already pushed for the given time
@param [String] job_name The job’s name @param [Time] time The time when the job got cleared for triggering @param [Hash] config Job’s config hash
# File lib/sidekiq-scheduler/scheduler.rb, line 153 def idempotent_job_enqueue(job_name, time, config) registered = SidekiqScheduler::RedisManager.register_job_instance(job_name, time) if registered Sidekiq.logger.info "queueing #{config['class']} (#{job_name})" handle_errors { enqueue_job(config, time) } SidekiqScheduler::RedisManager.remove_elder_job_instances(job_name) else Sidekiq.logger.debug { "Ignoring #{job_name} job as it has been already enqueued" } end end
# File lib/sidekiq-scheduler/scheduler.rb, line 234 def job_enabled?(name) job = Sidekiq.schedule[name] schedule_state(name).fetch('enabled', job.fetch('enabled', true)) if job end
Pulls the schedule from Sidekiq.schedule and loads it into the rufus scheduler instance
# File lib/sidekiq-scheduler/scheduler.rb, line 81 def load_schedule! if enabled Sidekiq.logger.info 'Loading Schedule' # Load schedule from redis for the first time if dynamic if dynamic Sidekiq.reload_schedule! @current_changed_score = Time.now.to_f rufus_scheduler.every(dynamic_every) do update_schedule end end Sidekiq.logger.info 'Schedule empty! Set Sidekiq.schedule' if Sidekiq.schedule.empty? @scheduled_jobs = {} queues = scheduler_config.sidekiq_queues Sidekiq.schedule.each do |name, config| if !listened_queues_only || enabled_queue?(config['queue'].to_s, queues) load_schedule_job(name, config) else Sidekiq.logger.info { "Ignoring #{name}, job's queue is not enabled." } end end Sidekiq.logger.info 'Schedules Loaded' else Sidekiq.logger.info 'SidekiqScheduler is disabled' end end
Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs
# File lib/sidekiq-scheduler/scheduler.rb, line 114 def load_schedule_job(name, config) # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as # required for the jobs to be scheduled. If rails_env is missing, the # job should be scheduled regardless of what ENV['RAILS_ENV'] is set # to. if config['rails_env'].nil? || rails_env_matches?(config) Sidekiq.logger.info "Scheduling #{name} #{config}" interval_defined = false interval_types = %w(cron every at in interval) interval_types.each do |interval_type| config_interval_type = config[interval_type] if !config_interval_type.nil? && config_interval_type.length > 0 schedule, options = SidekiqScheduler::RufusUtils.normalize_schedule_options(config_interval_type) rufus_job = new_job(name, interval_type, config, schedule, options) return unless rufus_job @scheduled_jobs[name] = rufus_job SidekiqScheduler::Utils.update_job_next_time(name, rufus_job.next_time) interval_defined = true break end end unless interval_defined Sidekiq.logger.info "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" end end end
# File lib/sidekiq-scheduler/scheduler.rb, line 263 def new_job(name, interval_type, config, schedule, options) options = options.merge({ :job => true, :tags => [name] }) rufus_scheduler.send(interval_type, schedule, options) do |job, time| if job_enabled?(name) conf = SidekiqScheduler::Utils.sanitize_job_config(config) if job.is_a?(Rufus::Scheduler::CronJob) idempotent_job_enqueue(name, SidekiqScheduler::Utils.calc_cron_run_time(job.cron_line, time.to_t), conf) else idempotent_job_enqueue(name, time.to_t, conf) end end end end
Convert the given arguments in the format expected to be enqueued.
@param [Hash] config the options to be converted @option config [String] class the job class @option config [Hash/Array] args the arguments to be passed to the job
class
@return [Hash]
# File lib/sidekiq-scheduler/scheduler.rb, line 344 def prepare_arguments(config) config['class'] = SidekiqScheduler::Utils.try_to_constantize(config['class']) if config['args'].is_a?(Hash) config['args'].symbolize_keys! if config['args'].respond_to?(:symbolize_keys!) else config['args'] = Array(config['args']) end config end
# File lib/sidekiq-scheduler/scheduler.rb, line 69 def print_schedule if rufus_scheduler Sidekiq.logger.info "Scheduling Info\tLast Run" scheduler_jobs = rufus_scheduler.jobs scheduler_jobs.each_value do |v| Sidekiq.logger.info "#{v.t}\t#{v.last}\t" end end end
Returns true if the given schedule config hash matches the current ENV @param [Hash] config The schedule job configuration
@return [Boolean] true if the schedule config matches the current ENV
# File lib/sidekiq-scheduler/scheduler.rb, line 360 def rails_env_matches?(config) config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/, '').split(',').include?(ENV['RAILS_ENV']) end
# File lib/sidekiq-scheduler/scheduler.rb, line 204 def reload_schedule! if enabled Sidekiq.logger.info 'Reloading Schedule' clear_schedule! load_schedule! else Sidekiq.logger.info 'SidekiqScheduler is disabled' end end
# File lib/sidekiq-scheduler/scheduler.rb, line 185 def rufus_scheduler @rufus_scheduler ||= SidekiqScheduler::Utils.new_rufus_scheduler(rufus_scheduler_options) end
Retrieves a schedule state
@param name [String] with the schedule’s name @return [Hash] with the schedule’s state
# File lib/sidekiq-scheduler/scheduler.rb, line 291 def schedule_state(name) state = SidekiqScheduler::RedisManager.get_job_state(name) state ? JSON.parse(state) : {} end
the Rufus::Scheduler jobs that are scheduled
# File lib/sidekiq-scheduler/scheduler.rb, line 65 def scheduled_jobs @scheduled_jobs end
Saves a schedule state
@param name [String] with the schedule’s name @param state [Hash] with the schedule’s state
# File lib/sidekiq-scheduler/scheduler.rb, line 301 def set_schedule_state(name, state) SidekiqScheduler::RedisManager.set_job_state(name, state) end
# File lib/sidekiq-scheduler/scheduler.rb, line 253 def to_hash { scheduler_config: @scheduler_config.to_hash } end
# File lib/sidekiq-scheduler/scheduler.rb, line 245 def toggle_all_jobs(new_state) Sidekiq.schedule!.keys.each do |name| state = schedule_state(name) state['enabled'] = new_state set_schedule_state(name, state) end end
# File lib/sidekiq-scheduler/scheduler.rb, line 239 def toggle_job_enabled(name) state = schedule_state(name) state['enabled'] = !job_enabled?(name) set_schedule_state(name, state) end
# File lib/sidekiq-scheduler/scheduler.rb, line 279 def unschedule_job(name) if scheduled_jobs[name] Sidekiq.logger.debug "Removing schedule #{name}" scheduled_jobs[name].unschedule scheduled_jobs.delete(name) end end
# File lib/sidekiq-scheduler/scheduler.rb, line 214 def update_schedule last_changed_score, @current_changed_score = @current_changed_score, Time.now.to_f schedule_changes = SidekiqScheduler::RedisManager.get_schedule_changes(last_changed_score, @current_changed_score) if schedule_changes.size > 0 Sidekiq.logger.info 'Updating schedule' Sidekiq.reload_schedule! schedule_changes.each do |schedule_name| if Sidekiq.schedule.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Sidekiq.schedule[schedule_name]) else unschedule_job(schedule_name) end end Sidekiq.logger.info 'Schedule updated' end end