module Resque::Scheduler

Constants

CLI_OPTIONS_ENV_MAPPING
INTERMITTENT_ERRORS
VERSION

Attributes

failure_handler[W]

allow user to set an additional failure handler

logger[W]
scheduled_jobs[R]

the Rufus::Scheduler jobs that are scheduled

Public Class Methods

before_shutdown() click to toggle source
# File lib/resque/scheduler.rb, line 446
def before_shutdown
  stop_rufus_scheduler
  release_master_lock
end
clear_schedule!() click to toggle source

Stops old rufus scheduler and creates a new one. Returns the new rufus scheduler

# File lib/resque/scheduler.rb, line 360
def clear_schedule!
  rufus_scheduler.stop
  @rufus_scheduler = nil
  @scheduled_jobs = {}
  rufus_scheduler
end
enqueue(config) click to toggle source
# File lib/resque/scheduler.rb, line 285
def enqueue(config)
  enqueue_from_config(config)
rescue => e
  Resque::Scheduler.failure_handler.on_enqueue_failure(config, e)
end
enqueue_delayed_items_for_timestamp(timestamp) click to toggle source

Enqueues all delayed jobs for a timestamp

# File lib/resque/scheduler.rb, line 208
def enqueue_delayed_items_for_timestamp(timestamp)
  count = 0
  batch_size = delayed_requeue_batch_size
  actual_batch_size = nil

  log "Processing delayed items for timestamp #{timestamp}, in batches of #{batch_size}"

  loop do
    handle_shutdown do
      # Continually check that it is still the master
      if am_master
        actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp,
                                                                 batch_size)
      end
    end

    count += actual_batch_size
    log "queued #{count} jobs" if actual_batch_size != -1

    # continue processing until there are no more items in this
    # timestamp. If we don't have a full batch, this is the last one.
    # This also breaks us in the event of a redis transaction failure
    # i.e. enqueue_items_in_batch_for_timestamp returned -1
    break if actual_batch_size < batch_size
  end

  log "finished queueing #{count} total jobs for timestamp #{timestamp}" if count != -1
end
enqueue_from_config(job_config) click to toggle source

Enqueues a job based on a config hash

# File lib/resque/scheduler.rb, line 298
def enqueue_from_config(job_config)
  args = job_config['args'] || job_config[:args]

  klass_name = job_config['class'] || job_config[:class]
  begin
    klass = Resque::Scheduler::Util.constantize(klass_name)
  rescue NameError
    klass = klass_name
  end

  params = args.is_a?(Hash) ? [args] : Array(args)
  queue = job_config['queue'] ||
          job_config[:queue] ||
          Resque.queue_from_class(klass)
  # Support custom job classes like those that inherit from
  # Resque::JobWithStatus (resque-status)
  job_klass = job_config['custom_job_class']
  if job_klass && job_klass != 'Resque::Job'
    # The custom job class API must offer a static "scheduled" method. If
    # the custom job class can not be constantized (via a requeue call
    # from the web perhaps), fall back to enqueuing normally via
    # Resque::Job.create.
    begin
      Resque::Scheduler::Util.constantize(job_klass).scheduled(
        queue, klass_name, *params
      )
    rescue NameError
      # Note that the custom job class (job_config['custom_job_class'])
      # is the one enqueued
      Resque::Job.create(queue, job_klass, *params)
    end
  else
    # Hack to avoid havoc for people shoving stuff into queues
    # for non-existent classes (for example: running scheduler in
    # one app that schedules for another.
    if Class === klass
      Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks(
        klass, *params
      )

      # If the class is a custom job class, call self#scheduled on it.
      # This allows you to do things like Resque.enqueue_at(timestamp,
      # CustomJobClass). Otherwise, pass off to Resque.
      if klass.respond_to?(:scheduled)
        klass.scheduled(queue, klass_name, *params)
      else
        Resque.enqueue_to(queue, klass, *params)
      end
    else
      # This will not run the before_hooks in rescue, but will at least
      # queue the job.
      Resque::Job.create(queue, klass, *params)
    end
  end
end
enqueue_items_in_batch_for_timestamp(timestamp, batch_size) click to toggle source
# File lib/resque/scheduler.rb, line 241
def enqueue_items_in_batch_for_timestamp(timestamp, batch_size)
  timestamp_bucket_key = timestamp_key(timestamp)

  encoded_jobs_to_requeue = Resque.redis.lrange(timestamp_bucket_key, 0, batch_size - 1)

  # Watch is used to ensure that the timestamp bucket we are operating on
  # is not altered by any other clients between the watch call and when we call exec
  # (to execute the multi block). We should error catch on the redis.exec return value
  # as that will indicate if the entire transaction was aborted or not. Though we should
  # be safe as our ltrim is inside the multi block and therefore also would have been
  # aborted. So nothing would have been queued, but also nothing lost from the bucket.
  watch_result = Resque.redis.watch(timestamp_bucket_key) do
    Resque.redis.multi do |pipeline|
      encoded_jobs_to_requeue.each do |encoded_job|
        pipeline.srem("timestamps:#{encoded_job}", timestamp_bucket_key)

        decoded_job = Resque.decode(encoded_job)
        enqueue(decoded_job)
      end

      pipeline.ltrim(timestamp_bucket_key, batch_size, -1)
    end
  end

  # Did the multi block successfully remove from this timestamp and enqueue the jobs?
  success = !watch_result.nil?

  # If this was the last batch in this timestamp bucket, clean up
  if success && encoded_jobs_to_requeue.count < batch_size
    Resque.clean_up_timestamp(timestamp_bucket_key, timestamp)
  end

  unless success
    # Our batched transaction failed in Redis due to the timestamp_bucket_key value
    # being modified while we built our multi block. We return -1 to ensure we break
    # out of the loop iterating on this timestamp so it can be re-processed via the
    # loop in handle_delayed_items.
    return -1
  end

  # will return 0 if none were left to batch
  encoded_jobs_to_requeue.count
end
enqueue_next_item(timestamp) click to toggle source
# File lib/resque/scheduler.rb, line 196
def enqueue_next_item(timestamp)
  item = Resque.next_item_for_timestamp(timestamp)

  if item
    log "queuing #{item['class']} [delayed]"
    enqueue(item)
  end

  item
end
env_matches?(configured_env) click to toggle source

Returns true if the current env is non-nil and the configured env (which is a comma-split string) includes the current env.

# File lib/resque/scheduler.rb, line 179
def env_matches?(configured_env)
  env && configured_env.split(/[\s,]+/).include?(env)
end
failure_handler() click to toggle source
# File lib/resque/scheduler.rb, line 478
def failure_handler
  @failure_handler ||= Resque::Scheduler::FailureHandler
end
handle_delayed_items(at_time = nil) click to toggle source

Handles queueing delayed items at_time - Time to start scheduling items (default: now).

# File lib/resque/scheduler.rb, line 185
def handle_delayed_items(at_time = nil)
  timestamp = Resque.next_delayed_timestamp(at_time)
  if timestamp
    procline 'Processing Delayed Items'
    until timestamp.nil?
      enqueue_delayed_items_for_timestamp(timestamp)
      timestamp = Resque.next_delayed_timestamp(at_time)
    end
  end
end
handle_shutdown() { || ... } click to toggle source
# File lib/resque/scheduler.rb, line 291
def handle_shutdown
  exit if @shutdown
  yield
  exit if @shutdown
end
handle_signals_with_operation() { || ... } click to toggle source
# File lib/resque/scheduler.rb, line 433
def handle_signals_with_operation
  yield if block_given?
  handle_signals
  false
rescue Interrupt
  before_shutdown if @shutdown
  true
end
load_schedule!() click to toggle source

Pulls the schedule from Resque.schedule and loads it into the rufus scheduler instance

# File lib/resque/scheduler.rb, line 97
def load_schedule!
  procline 'Loading Schedule'

  # Need to load the schedule from redis for the first time if dynamic
  Resque.reload_schedule! if dynamic

  log! 'Schedule empty! Set Resque.schedule' if Resque.schedule.empty?

  @scheduled_jobs = {}

  Resque.schedule.each do |name, config|
    load_schedule_job(name, config)
  end
  Resque.redis.del(:schedules_changed) if am_master && dynamic
  procline 'Schedules Loaded'
end
load_schedule_job(name, config) click to toggle source

Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobs

# File lib/resque/scheduler.rb, line 134
def load_schedule_job(name, config)
  # If `rails_env` or `env` is set in the config, load jobs only if they
  # are meant to be loaded in `Resque::Scheduler.env`.  If `rails_env` or
  # `env` is missing, the job should be scheduled regardless of the value
  # of `Resque::Scheduler.env`.

  configured_env = config['rails_env'] || config['env']

  if configured_env.nil? || env_matches?(configured_env)
    log! "Scheduling #{name} "
    interval_defined = false
    interval_types = %w(cron every)
    interval_types.each do |interval_type|
      next unless !config[interval_type].nil? && !config[interval_type].empty?
      args = optionizate_interval_value(config[interval_type])
      args = [args, nil, job: true] if args.is_a?(::String)

      job = rufus_scheduler.send(interval_type, *args) do
        enqueue_recurring(name, config)
      end
      @scheduled_jobs[name] = job
      interval_defined = true
      break
    end
    unless interval_defined
      log! "no #{interval_types.join(' / ')} found for " \
           "#{config['class']} (#{name}) - skipping"
    end
  else
    log "Skipping schedule of #{name} because configured " \
        "env #{configured_env.inspect} does not match current " \
        "env #{env.inspect}"
  end
end
log(msg) click to toggle source
# File lib/resque/scheduler.rb, line 467
def log(msg)
  logger.debug { msg }
end
log!(msg) click to toggle source
# File lib/resque/scheduler.rb, line 459
def log!(msg)
  logger.info { msg }
end
log_error(msg) click to toggle source
# File lib/resque/scheduler.rb, line 463
def log_error(msg)
  logger.error { msg }
end
logger() click to toggle source
# File lib/resque/scheduler.rb, line 482
def logger
  @logger ||= Resque::Scheduler::LoggerBuilder.new(
    quiet: quiet,
    verbose: verbose,
    log_dev: logfile,
    format: logformat
  ).build
end
optionizate_interval_value(value) click to toggle source

modify interval type value to value with options if options available

# File lib/resque/scheduler.rb, line 115
def optionizate_interval_value(value)
  args = value
  if args.is_a?(::Array)
    return args.first if args.size > 2 || !args.last.is_a?(::Hash)
    # symbolize keys of hash for options
    args[2] = args[1].reduce({}) do |m, i|
      key, value = i
      m[(key.respond_to?(:to_sym) ? key.to_sym : key) || key] = value
      m
    end

    args[2][:job] = true
    args[1] = nil
  end
  args
end
poll_sleep() click to toggle source

Sleeps and returns true

# File lib/resque/scheduler.rb, line 400
def poll_sleep
  handle_shutdown do
    begin
      poll_sleep_loop
    ensure
      @sleeping = false
    end
  end
  true
end
poll_sleep_loop() click to toggle source
# File lib/resque/scheduler.rb, line 411
def poll_sleep_loop
  @sleeping = true
  if poll_sleep_amount > 0
    start = Time.now
    loop do
      elapsed_sleep = (Time.now - start)
      remaining_sleep = poll_sleep_amount - elapsed_sleep
      @do_break = false
      if remaining_sleep <= 0
        @do_break = true
      else
        @do_break = handle_signals_with_operation do
          sleep(remaining_sleep)
        end
      end
      break if @do_break
    end
  else
    handle_signals_with_operation
  end
end
print_schedule() click to toggle source
procline(string) click to toggle source
# File lib/resque/scheduler.rb, line 471
def procline(string)
  log! string
  argv0 = build_procline(string)
  log "Setting procline #{argv0.inspect}"
  $0 = argv0
end
rails_env_matches?(config) click to toggle source

Returns true if the given schedule config hash matches the current env

# File lib/resque/scheduler.rb, line 170
def rails_env_matches?(config)
  warn '`Resque::Scheduler.rails_env_matches?` is deprecated. ' \
       'Please use `Resque::Scheduler.env_matches?` instead.'
  config['rails_env'] && env &&
    config['rails_env'].split(/[\s,]+/).include?(env)
end
reload_schedule!() click to toggle source
# File lib/resque/scheduler.rb, line 367
def reload_schedule!
  procline 'Reloading Schedule'
  clear_schedule!
  load_schedule!
end
rufus_scheduler() click to toggle source
# File lib/resque/scheduler.rb, line 354
def rufus_scheduler
  @rufus_scheduler ||= Rufus::Scheduler.new
end
run() click to toggle source

Schedule all jobs and continually look for delayed jobs (never returns)

# File lib/resque/scheduler.rb, line 39
def run
  procline 'Starting'

  # trap signals
  register_signal_handlers

  # Quote from the resque/worker.
  # Fix buffering so we can `rake resque:scheduler > scheduler.log` and
  # get output from the child in there.
  $stdout.sync = true
  $stderr.sync = true

  was_master = nil

  begin
    @th = Thread.current

    # Now start the scheduling part of the loop.
    loop do
      begin
        # Check on changes to master/child
        @am_master = master?
        if am_master != was_master
          procline am_master ? 'Master scheduler' : 'Child scheduler'

          # Load schedule because changed
          reload_schedule!
        end

        if am_master
          handle_delayed_items
          update_schedule if dynamic
        end
        was_master = am_master
      rescue *INTERMITTENT_ERRORS => e
        log! e.message
        release_master_lock
      end
      poll_sleep
    end

  rescue Interrupt
    log 'Exiting'
  end
end
shutdown() click to toggle source

Sets the shutdown flag, clean schedules and exits if sleeping

# File lib/resque/scheduler.rb, line 452
def shutdown
  return if @shutdown
  @shutdown = true
  log!('Shutting down')
  @th.raise Interrupt if @sleeping
end
stop_rufus_scheduler() click to toggle source
# File lib/resque/scheduler.rb, line 442
def stop_rufus_scheduler
  rufus_scheduler.shutdown(:wait)
end
timestamp_key(timestamp) click to toggle source
# File lib/resque/scheduler.rb, line 237
def timestamp_key(timestamp)
  "delayed:#{timestamp.to_i}"
end
unschedule_job(name) click to toggle source
# File lib/resque/scheduler.rb, line 391
def unschedule_job(name)
  if scheduled_jobs[name]
    log "Removing schedule #{name}"
    scheduled_jobs[name].unschedule
    @scheduled_jobs.delete(name)
  end
end
update_schedule() click to toggle source
# File lib/resque/scheduler.rb, line 373
def update_schedule
  if Resque.redis.scard(:schedules_changed) > 0
    procline 'Updating schedule'
    loop do
      schedule_name = Resque.redis.spop(:schedules_changed)
      break unless schedule_name
      Resque.reload_schedule!
      if Resque.schedule.keys.include?(schedule_name)
        unschedule_job(schedule_name)
        load_schedule_job(schedule_name, Resque.schedule[schedule_name])
      else
        unschedule_job(schedule_name)
      end
    end
    procline 'Schedules Loaded'
  end
end

Private Class Methods

am_master() click to toggle source
# File lib/resque/scheduler.rb, line 517
def am_master
  @am_master = master? unless defined?(@am_master)
  @am_master
end
app_str() click to toggle source
# File lib/resque/scheduler.rb, line 501
def app_str
  app_name ? "[#{app_name}]" : ''
end
build_procline(string) click to toggle source
# File lib/resque/scheduler.rb, line 509
def build_procline(string)
  "#{internal_name}#{app_str}#{env_str}: #{string}"
end
enqueue_recurring(name, config) click to toggle source
# File lib/resque/scheduler.rb, line 493
def enqueue_recurring(name, config)
  if am_master
    log! "queueing #{config['class']} (#{name})"
    enqueue(config)
    Resque.last_enqueued_at(name, Time.now.to_s)
  end
end
env_str() click to toggle source
# File lib/resque/scheduler.rb, line 505
def env_str
  env ? "[#{env}]" : ''
end
internal_name() click to toggle source
# File lib/resque/scheduler.rb, line 513
def internal_name
  "resque-scheduler-#{Resque::Scheduler::VERSION}"
end