class Scheduler::Manager

Attributes

random_ratio[RW]
redis[RW]

Public Class Methods

current() click to toggle source
# File lib/scheduler/manager.rb, line 132
def self.current
  @current
end
current=(manager) click to toggle source
# File lib/scheduler/manager.rb, line 136
def self.current=(manager)
  @current = manager
end
discover_schedules() click to toggle source
# File lib/scheduler/manager.rb, line 255
def self.discover_schedules
  # hack for developemnt reloader is crazytown
  # multiple classes with same name can be in
  # object space
  unique = Set.new
  schedules = []
  ObjectSpace.each_object(Scheduler::Schedule) do |schedule|
    if schedule.scheduled?
      next if unique.include?(schedule.to_s)
      schedules << schedule
      unique << schedule.to_s
    end
  end
  schedules
end
lock_key() click to toggle source
# File lib/scheduler/manager.rb, line 283
def self.lock_key
  "_scheduler_lock_"
end
new(redis = nil, options=nil) click to toggle source
# File lib/scheduler/manager.rb, line 120
def initialize(redis = nil, options=nil)
  @redis = $redis || redis
  @random_ratio = 0.1
  unless options && options[:skip_runner]
    @runner = Runner.new(self)
    self.class.current = self
  end

  @hostname = options && options[:hostname]
  @manager_id = SecureRandom.hex
end
queue_key(hostname=nil) click to toggle source
# File lib/scheduler/manager.rb, line 287
def self.queue_key(hostname=nil)
  if hostname
    "_scheduler_queue_#{hostname}_"
  else
    "_scheduler_queue_"
  end
end
schedule_key(klass,hostname=nil) click to toggle source
# File lib/scheduler/manager.rb, line 295
def self.schedule_key(klass,hostname=nil)
  if hostname
    "_scheduler_#{klass}_#{hostname}"
  else
    "_scheduler_#{klass}"
  end
end
seq() click to toggle source
# File lib/scheduler/manager.rb, line 272
def self.seq
  @mutex.synchronize do
    @i ||= 0
    @i += 1
  end
end
without_runner(redis=nil) click to toggle source
# File lib/scheduler/manager.rb, line 116
def self.without_runner(redis=nil)
  self.new(redis, skip_runner: true)
end

Public Instance Methods

blocking_tick() click to toggle source
# File lib/scheduler/manager.rb, line 230
def blocking_tick
  tick
  @runner.wait_till_done
end
ensure_schedule!(klass) click to toggle source
# File lib/scheduler/manager.rb, line 152
def ensure_schedule!(klass)
  lock do
    schedule_info(klass).schedule!
  end

end
get_klass(name) click to toggle source
# File lib/scheduler/manager.rb, line 187
def get_klass(name)
  name.constantize
rescue NameError
  nil
end
hostname() click to toggle source
# File lib/scheduler/manager.rb, line 140
def hostname
  @hostname ||= `hostname`.strip
end
identity_key() click to toggle source
# File lib/scheduler/manager.rb, line 279
def identity_key
  @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}"
end
keep_alive() click to toggle source
# File lib/scheduler/manager.rb, line 244
def keep_alive
  redis.setex identity_key, keep_alive_duration, ""
end
keep_alive_duration() click to toggle source
# File lib/scheduler/manager.rb, line 240
def keep_alive_duration
  60
end
lock() { || ... } click to toggle source
# File lib/scheduler/manager.rb, line 248
def lock
  DistributedMutex.new(Manager.lock_key).synchronize do
    yield
  end
end
next_run(klass) click to toggle source
# File lib/scheduler/manager.rb, line 148
def next_run(klass)
  schedule_info(klass).next_run
end
remove(klass) click to toggle source
# File lib/scheduler/manager.rb, line 159
def remove(klass)
  lock do
    schedule_info(klass).del!
  end
end
reschedule_orphans!() click to toggle source
# File lib/scheduler/manager.rb, line 165
def reschedule_orphans!
  lock do
    reschedule_orphans_on!
    reschedule_orphans_on!(hostname)
  end
end
reschedule_orphans_on!(hostname=nil) click to toggle source
# File lib/scheduler/manager.rb, line 172
def reschedule_orphans_on!(hostname=nil)
  redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key|
    klass = get_klass(key)
    next unless klass
    info = schedule_info(klass)

    if ['QUEUED', 'RUNNING'].include?(info.prev_result) &&
      (info.current_owner.blank? || !redis.get(info.current_owner))
      info.prev_result = 'ORPHAN'
      info.next_run = Time.now.to_i
      info.write!
    end
  end
end
schedule_info(klass) click to toggle source
# File lib/scheduler/manager.rb, line 144
def schedule_info(klass)
  ScheduleInfo.new(klass, self)
end
schedule_next_job(hostname=nil) click to toggle source
# File lib/scheduler/manager.rb, line 200
def schedule_next_job(hostname=nil)
  (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true

  return unless key
  if due.to_i <= Time.now.to_i
    klass = get_klass(key)
    unless klass
      # corrupt key, nuke it (renamed job or something)
      redis.zrem Manager.queue_key(hostname), key
      return
    end

    unless klass.respond_to?(:daily) &&
           klass.respond_to?(:every)
      # job klass exists but no longer extends from the base
      redis.zrem Manager.queue_key(hostname), key
      return
    end

    info = schedule_info(klass)
    info.prev_run = Time.now.to_i
    info.prev_result = "QUEUED"
    info.prev_duration = -1
    info.next_run = nil
    info.current_owner = identity_key
    info.schedule!
    @runner.enq(klass)
  end
end
stop!() click to toggle source
# File lib/scheduler/manager.rb, line 235
def stop!
  @runner.stop!
  self.class.current = nil
end
tick() click to toggle source
# File lib/scheduler/manager.rb, line 193
def tick
  lock do
    schedule_next_job
    schedule_next_job(hostname)
  end
end