class Elected::Scheduler::Poller

Attributes

jobs[R]
key[RW]
senado[R]
stats[R]

Public Class Methods

new(key, timeout = nil) click to toggle source
# File lib/elected/scheduler/poller.rb, line 13
def initialize(key, timeout = nil)
  @senado ||= Senado.new key, timeout
  @key    = key
  @jobs   = Concurrent::Hash.new
  @stats  = Stats.new :processed_job, :no_match, :sleep_slave
end

Public Instance Methods

<<(job)
Alias for: add
add(job) click to toggle source
# File lib/elected/scheduler/poller.rb, line 20
def add(job)
  @jobs[job.name] = job
  self
end
Also aliased as: <<
inspect()
Alias for: to_s
leader?() click to toggle source
# File lib/elected/scheduler/poller.rb, line 63
def leader?
  senado.leader?
end
running?() click to toggle source
# File lib/elected/scheduler/poller.rb, line 31
def running?
  status == :running
end
start() click to toggle source
# File lib/elected/scheduler/poller.rb, line 39
def start
  return false unless stopped?
  raise 'No jobs to run!' if jobs.empty?

  debug "#{label} starting ..."
  @status = :starting
  start_polling_loop
  @status = :running
  debug "#{label} running poller!"
  @status
end
status() click to toggle source
# File lib/elected/scheduler/poller.rb, line 27
def status
  @status ||= :stopped
end
stop() click to toggle source
# File lib/elected/scheduler/poller.rb, line 51
def stop
  return false unless running?

  debug "#{label} stopping poller ..."
  @status = :stopping
  stop_polling_loop
  senado.release
  @status = :stopped
  debug "#{label} stopped poller!"
  @status
end
stopped?() click to toggle source
# File lib/elected/scheduler/poller.rb, line 35
def stopped?
  status == :stopped
end
to_s() click to toggle source
# File lib/elected/scheduler/poller.rb, line 67
def to_s
  %{#<#{self.class.name} key="#{key}" timeout="#{timeout}" jobs=#{jobs.size}>}
end
Also aliased as: inspect

Private Instance Methods

label(cnt = nil) click to toggle source
# File lib/elected/scheduler/poller.rb, line 123
def label(cnt = nil)
  "[#{key}:#{status}#{cnt ? ":#{cnt}" : ''}]"
end
poll_and_process_jobs() click to toggle source
# File lib/elected/scheduler/poller.rb, line 94
def poll_and_process_jobs
  if senado.leader?
    debug "#{label} leader, processing jobs ..."
    start_time = Time.now
    jobs.values.each { |job| process_job job, start_time }
  else
    debug "#{label} not a leader, sleeping ..."
    @stats.increment :sleep_slave
    sleep_for_slave
  end
end
poll_and_process_loop() click to toggle source
# File lib/elected/scheduler/poller.rb, line 84
def poll_and_process_loop
  cnt = 0
  while running?
    cnt += 1
    debug "#{label(cnt)} calling poll_and_process_jobs while running?:#{running?} ..."
    poll_and_process_jobs
    sleep_until_next_tick
  end
end
process_job(job, time) click to toggle source
# File lib/elected/scheduler/poller.rb, line 106
def process_job(job, time)
  unless job.matches? time
    @stats.increment :no_match
    return false
  end

  Concurrent::Future.execute { job.execute }
  @stats.increment :processed_job
end
sleep_for_slave(ratio = 0.25) click to toggle source
# File lib/elected/scheduler/poller.rb, line 132
def sleep_for_slave(ratio = 0.25)
  deadline = Time.now + (timeout / 1_000.0) * ratio
  sleep 0.1 while Time.now < deadline
end
sleep_until_next_tick() click to toggle source
# File lib/elected/scheduler/poller.rb, line 127
def sleep_until_next_tick
  start = Time.now
  sleep 0.1 while Time.now.sec == start.sec
end
start_polling_loop() click to toggle source
# File lib/elected/scheduler/poller.rb, line 77
def start_polling_loop
  debug 'starting process loop ...'
  @polling_loop_thread = Thread.new do
    poll_and_process_loop
  end
end
stop_polling_loop() click to toggle source
# File lib/elected/scheduler/poller.rb, line 116
def stop_polling_loop
  if @polling_loop_thread
    @polling_loop_thread.join
    @polling_loop_thread.terminate
  end
end