class Scheduler::Manager
Attributes
random_ratio[RW]
redis[RW]
Public Class Methods
current()
click to toggle source
# File lib/scheduler/manager.rb, line 132 def self.current @current end
current=(manager)
click to toggle source
# File lib/scheduler/manager.rb, line 136 def self.current=(manager) @current = manager end
discover_schedules()
click to toggle source
# File lib/scheduler/manager.rb, line 255 def self.discover_schedules # hack for developemnt reloader is crazytown # multiple classes with same name can be in # object space unique = Set.new schedules = [] ObjectSpace.each_object(Scheduler::Schedule) do |schedule| if schedule.scheduled? next if unique.include?(schedule.to_s) schedules << schedule unique << schedule.to_s end end schedules end
lock_key()
click to toggle source
# File lib/scheduler/manager.rb, line 283 def self.lock_key "_scheduler_lock_" end
new(redis = nil, options=nil)
click to toggle source
# File lib/scheduler/manager.rb, line 120 def initialize(redis = nil, options=nil) @redis = $redis || redis @random_ratio = 0.1 unless options && options[:skip_runner] @runner = Runner.new(self) self.class.current = self end @hostname = options && options[:hostname] @manager_id = SecureRandom.hex end
queue_key(hostname=nil)
click to toggle source
# File lib/scheduler/manager.rb, line 287 def self.queue_key(hostname=nil) if hostname "_scheduler_queue_#{hostname}_" else "_scheduler_queue_" end end
schedule_key(klass,hostname=nil)
click to toggle source
# File lib/scheduler/manager.rb, line 295 def self.schedule_key(klass,hostname=nil) if hostname "_scheduler_#{klass}_#{hostname}" else "_scheduler_#{klass}" end end
seq()
click to toggle source
# File lib/scheduler/manager.rb, line 272 def self.seq @mutex.synchronize do @i ||= 0 @i += 1 end end
without_runner(redis=nil)
click to toggle source
# File lib/scheduler/manager.rb, line 116 def self.without_runner(redis=nil) self.new(redis, skip_runner: true) end
Public Instance Methods
blocking_tick()
click to toggle source
# File lib/scheduler/manager.rb, line 230 def blocking_tick tick @runner.wait_till_done end
ensure_schedule!(klass)
click to toggle source
# File lib/scheduler/manager.rb, line 152 def ensure_schedule!(klass) lock do schedule_info(klass).schedule! end end
get_klass(name)
click to toggle source
# File lib/scheduler/manager.rb, line 187 def get_klass(name) name.constantize rescue NameError nil end
hostname()
click to toggle source
# File lib/scheduler/manager.rb, line 140 def hostname @hostname ||= `hostname`.strip end
identity_key()
click to toggle source
# File lib/scheduler/manager.rb, line 279 def identity_key @identity_key ||= "_scheduler_#{hostname}:#{Process.pid}:#{self.class.seq}:#{SecureRandom.hex}" end
keep_alive()
click to toggle source
# File lib/scheduler/manager.rb, line 244 def keep_alive redis.setex identity_key, keep_alive_duration, "" end
keep_alive_duration()
click to toggle source
# File lib/scheduler/manager.rb, line 240 def keep_alive_duration 60 end
lock() { || ... }
click to toggle source
# File lib/scheduler/manager.rb, line 248 def lock DistributedMutex.new(Manager.lock_key).synchronize do yield end end
next_run(klass)
click to toggle source
# File lib/scheduler/manager.rb, line 148 def next_run(klass) schedule_info(klass).next_run end
remove(klass)
click to toggle source
# File lib/scheduler/manager.rb, line 159 def remove(klass) lock do schedule_info(klass).del! end end
reschedule_orphans!()
click to toggle source
# File lib/scheduler/manager.rb, line 165 def reschedule_orphans! lock do reschedule_orphans_on! reschedule_orphans_on!(hostname) end end
reschedule_orphans_on!(hostname=nil)
click to toggle source
# File lib/scheduler/manager.rb, line 172 def reschedule_orphans_on!(hostname=nil) redis.zrange(Manager.queue_key(hostname), 0, -1).each do |key| klass = get_klass(key) next unless klass info = schedule_info(klass) if ['QUEUED', 'RUNNING'].include?(info.prev_result) && (info.current_owner.blank? || !redis.get(info.current_owner)) info.prev_result = 'ORPHAN' info.next_run = Time.now.to_i info.write! end end end
schedule_info(klass)
click to toggle source
# File lib/scheduler/manager.rb, line 144 def schedule_info(klass) ScheduleInfo.new(klass, self) end
schedule_next_job(hostname=nil)
click to toggle source
# File lib/scheduler/manager.rb, line 200 def schedule_next_job(hostname=nil) (key, due), _ = redis.zrange Manager.queue_key(hostname), 0, 0, withscores: true return unless key if due.to_i <= Time.now.to_i klass = get_klass(key) unless klass # corrupt key, nuke it (renamed job or something) redis.zrem Manager.queue_key(hostname), key return end unless klass.respond_to?(:daily) && klass.respond_to?(:every) # job klass exists but no longer extends from the base redis.zrem Manager.queue_key(hostname), key return end info = schedule_info(klass) info.prev_run = Time.now.to_i info.prev_result = "QUEUED" info.prev_duration = -1 info.next_run = nil info.current_owner = identity_key info.schedule! @runner.enq(klass) end end
stop!()
click to toggle source
# File lib/scheduler/manager.rb, line 235 def stop! @runner.stop! self.class.current = nil end
tick()
click to toggle source
# File lib/scheduler/manager.rb, line 193 def tick lock do schedule_next_job schedule_next_job(hostname) end end