class Elected::Scheduler::Poller
Attributes
jobs[R]
key[RW]
senado[R]
stats[R]
Public Class Methods
new(key, timeout = nil)
click to toggle source
# File lib/elected/scheduler/poller.rb, line 13 def initialize(key, timeout = nil) @senado ||= Senado.new key, timeout @key = key @jobs = Concurrent::Hash.new @stats = Stats.new :processed_job, :no_match, :sleep_slave end
Public Instance Methods
add(job)
click to toggle source
# File lib/elected/scheduler/poller.rb, line 20 def add(job) @jobs[job.name] = job self end
Also aliased as: <<
leader?()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 63 def leader? senado.leader? end
running?()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 31 def running? status == :running end
start()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 39 def start return false unless stopped? raise 'No jobs to run!' if jobs.empty? debug "#{label} starting ..." @status = :starting start_polling_loop @status = :running debug "#{label} running poller!" @status end
status()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 27 def status @status ||= :stopped end
stop()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 51 def stop return false unless running? debug "#{label} stopping poller ..." @status = :stopping stop_polling_loop senado.release @status = :stopped debug "#{label} stopped poller!" @status end
stopped?()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 35 def stopped? status == :stopped end
to_s()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 67 def to_s %{#<#{self.class.name} key="#{key}" timeout="#{timeout}" jobs=#{jobs.size}>} end
Also aliased as: inspect
Private Instance Methods
label(cnt = nil)
click to toggle source
# File lib/elected/scheduler/poller.rb, line 123 def label(cnt = nil) "[#{key}:#{status}#{cnt ? ":#{cnt}" : ''}]" end
poll_and_process_jobs()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 94 def poll_and_process_jobs if senado.leader? debug "#{label} leader, processing jobs ..." start_time = Time.now jobs.values.each { |job| process_job job, start_time } else debug "#{label} not a leader, sleeping ..." @stats.increment :sleep_slave sleep_for_slave end end
poll_and_process_loop()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 84 def poll_and_process_loop cnt = 0 while running? cnt += 1 debug "#{label(cnt)} calling poll_and_process_jobs while running?:#{running?} ..." poll_and_process_jobs sleep_until_next_tick end end
process_job(job, time)
click to toggle source
# File lib/elected/scheduler/poller.rb, line 106 def process_job(job, time) unless job.matches? time @stats.increment :no_match return false end Concurrent::Future.execute { job.execute } @stats.increment :processed_job end
sleep_for_slave(ratio = 0.25)
click to toggle source
# File lib/elected/scheduler/poller.rb, line 132 def sleep_for_slave(ratio = 0.25) deadline = Time.now + (timeout / 1_000.0) * ratio sleep 0.1 while Time.now < deadline end
sleep_until_next_tick()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 127 def sleep_until_next_tick start = Time.now sleep 0.1 while Time.now.sec == start.sec end
start_polling_loop()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 77 def start_polling_loop debug 'starting process loop ...' @polling_loop_thread = Thread.new do poll_and_process_loop end end
stop_polling_loop()
click to toggle source
# File lib/elected/scheduler/poller.rb, line 116 def stop_polling_loop if @polling_loop_thread @polling_loop_thread.join @polling_loop_thread.terminate end end