class MiniScheduler::Manager

Attributes

enable_stats[RW]
queue[RW]
random_ratio[RW]
redis[RW]
workers[RW]

Public Class Methods

current() click to toggle source
# File lib/mini_scheduler/manager.rb, line 202
def self.current
  @current ||= {}
end
discover_queues() click to toggle source
# File lib/mini_scheduler/manager.rb, line 327
def self.discover_queues
  ObjectSpace.each_object(MiniScheduler::Schedule).map(&:queue).to_set
end
discover_schedules() click to toggle source
# File lib/mini_scheduler/manager.rb, line 331
def self.discover_schedules
  # hack for developemnt reloader is crazytown
  # multiple classes with same name can be in
  # object space
  unique = Set.new
  schedules = []
  ObjectSpace.each_object(MiniScheduler::Schedule) do |schedule|
    if schedule.scheduled?
      next if unique.include?(schedule.to_s)
      schedules << schedule
      unique << schedule.to_s
    end
  end
  schedules
end
lock_key(queue) click to toggle source
# File lib/mini_scheduler/manager.rb, line 359
def self.lock_key(queue)
  "_scheduler_lock_#{queue}_"
end
new(options = nil) click to toggle source
# File lib/mini_scheduler/manager.rb, line 182
def initialize(options = nil)
  @queue = options && options[:queue] || "default"
  @workers = options && options[:workers] || 1
  @redis = MiniScheduler.redis
  @random_ratio = 0.1
  unless options && options[:skip_runner]
    @runner = Runner.new(self)
    self.class.current[@queue] = self
  end

  @hostname = options && options[:hostname]
  @manager_id = SecureRandom.hex

  if options && options.key?(:enable_stats)
    @enable_stats = options[:enable_stats]
  else
    @enable_stats = !!defined?(MiniScheduler::Stat)
  end
end
queue_key(queue, hostname = nil) click to toggle source
# File lib/mini_scheduler/manager.rb, line 363
def self.queue_key(queue, hostname = nil)
  if hostname
    "_scheduler_queue_#{queue}_#{hostname}_"
  else
    "_scheduler_queue_#{queue}_"
  end
end
schedule_key(klass, hostname = nil) click to toggle source
# File lib/mini_scheduler/manager.rb, line 371
def self.schedule_key(klass, hostname = nil)
  if hostname
    "_scheduler_#{klass}_#{hostname}"
  else
    "_scheduler_#{klass}"
  end
end
seq() click to toggle source
# File lib/mini_scheduler/manager.rb, line 348
def self.seq
  @mutex.synchronize do
    @i ||= 0
    @i += 1
  end
end
without_runner() click to toggle source
# File lib/mini_scheduler/manager.rb, line 178
def self.without_runner
  self.new(skip_runner: true)
end

Public Instance Methods

blocking_tick() click to toggle source
# File lib/mini_scheduler/manager.rb, line 303
def blocking_tick
  tick
  @runner.wait_till_done
end
ensure_schedule!(klass) click to toggle source
# File lib/mini_scheduler/manager.rb, line 222
def ensure_schedule!(klass)
  lock do
    schedule_info(klass).schedule!
  end
end
get_klass(name) click to toggle source
# File lib/mini_scheduler/manager.rb, line 256
def get_klass(name)
  name.constantize
rescue NameError
  nil
end
hostname() click to toggle source
# File lib/mini_scheduler/manager.rb, line 206
def hostname
  @hostname ||= begin
                  `hostname`.strip
                rescue
                  "unknown"
                end
end
identity_key() click to toggle source
# File lib/mini_scheduler/manager.rb, line 355
def identity_key
  @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end
keep_alive() click to toggle source
# File lib/mini_scheduler/manager.rb, line 317
def keep_alive
  redis.setex identity_key, keep_alive_duration, ""
end
keep_alive_duration() click to toggle source
# File lib/mini_scheduler/manager.rb, line 313
def keep_alive_duration
  60
end
lock() { || ... } click to toggle source
# File lib/mini_scheduler/manager.rb, line 321
def lock
  MiniScheduler::DistributedMutex.synchronize(Manager.lock_key(queue), MiniScheduler.redis) do
    yield
  end
end
next_run(klass) click to toggle source
# File lib/mini_scheduler/manager.rb, line 218
def next_run(klass)
  schedule_info(klass).next_run
end
remove(klass) click to toggle source
# File lib/mini_scheduler/manager.rb, line 228
def remove(klass)
  lock do
    schedule_info(klass).del!
  end
end
repair_queue() click to toggle source
# File lib/mini_scheduler/manager.rb, line 262
def repair_queue
  return if redis.exists?(self.class.queue_key(queue)) ||
    redis.exists?(self.class.queue_key(queue, hostname))

  self.class.discover_schedules
    .select { |schedule| schedule.queue == queue }
    .each { |schedule| ensure_schedule!(schedule) }
end
reschedule_orphans!() click to toggle source
# File lib/mini_scheduler/manager.rb, line 234
def reschedule_orphans!
  lock do
    reschedule_orphans_on!
    reschedule_orphans_on!(hostname)
  end
end
reschedule_orphans_on!(hostname = nil) click to toggle source
# File lib/mini_scheduler/manager.rb, line 241
def reschedule_orphans_on!(hostname = nil)
  redis.zrange(Manager.queue_key(queue, hostname), 0, -1).each do |key|
    klass = get_klass(key)
    next unless klass
    info = schedule_info(klass)

    if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
      (info.current_owner.blank? || !redis.get(info.current_owner))
      info.prev_result = 'ORPHAN'
      info.next_run = Time.now.to_i
      info.write!
    end
  end
end
schedule_info(klass) click to toggle source
# File lib/mini_scheduler/manager.rb, line 214
def schedule_info(klass)
  MiniScheduler::ScheduleInfo.new(klass, self)
end
schedule_next_job(hostname = nil) click to toggle source
# File lib/mini_scheduler/manager.rb, line 278
def schedule_next_job(hostname = nil)
  (key, due), _ = redis.zrange Manager.queue_key(queue, hostname), 0, 0, withscores: true
  return unless key

  if due.to_i <= Time.now.to_i
    klass = get_klass(key)
    if !klass || (
      (klass.is_per_host && !hostname) || (hostname && !klass.is_per_host)
    )
      # corrupt key, nuke it (renamed job or something)
      redis.zrem Manager.queue_key(queue, hostname), key
      return
    end

    info = schedule_info(klass)
    info.prev_run = Time.now.to_i
    info.prev_result = "QUEUED"
    info.prev_duration = -1
    info.next_run = nil
    info.current_owner = identity_key
    info.schedule!
    @runner.enq(klass)
  end
end
stop!() click to toggle source
# File lib/mini_scheduler/manager.rb, line 308
def stop!
  @runner.stop!
  self.class.current.delete(@queue)
end
tick() click to toggle source
# File lib/mini_scheduler/manager.rb, line 271
def tick
  lock do
    schedule_next_job
    schedule_next_job(hostname)
  end
end