class MiniScheduler::Manager::Runner

Public Class Methods

new(manager) click to toggle source
# File lib/mini_scheduler/manager.rb, line 8
def initialize(manager)
  @stopped = false
  @mutex = Mutex.new
  @queue = Queue.new
  @manager = manager
  @hostname = manager.hostname

  @recovery_thread = Thread.new do
    while !@stopped
      sleep 60

      @mutex.synchronize do
        repair_queue
        reschedule_orphans
      end
    end
  end
  @keep_alive_thread = Thread.new do
    while !@stopped
      @mutex.synchronize do
        keep_alive
      end
      sleep (@manager.keep_alive_duration / 2)
    end
  end
  @threads = []
  manager.workers.times do
    @threads << Thread.new do
      while !@stopped
        process_queue
      end
    end
  end
end

Public Instance Methods

attempts(n) { || ... } click to toggle source
# File lib/mini_scheduler/manager.rb, line 166
def attempts(n)
  n.times {
    begin
      yield; break
    rescue
      sleep Random.rand
    end
  }
end
enq(klass) click to toggle source
# File lib/mini_scheduler/manager.rb, line 150
def enq(klass)
  @queue << klass
end
hostname() click to toggle source
# File lib/mini_scheduler/manager.rb, line 61
def hostname
  @hostname
end
keep_alive() click to toggle source
# File lib/mini_scheduler/manager.rb, line 43
def keep_alive
  @manager.keep_alive
rescue => ex
  MiniScheduler.handle_job_exception(ex, message: "Scheduling manager keep-alive")
end
process_queue() click to toggle source
# File lib/mini_scheduler/manager.rb, line 65
def process_queue

  klass = @queue.deq
  # hack alert, I need to both deq and set @running atomically.
  @running = true

  return if !klass

  failed = false
  start = Time.now.to_f
  info = @mutex.synchronize { @manager.schedule_info(klass) }
  stat = nil
  error = nil

  begin
    info.prev_result = "RUNNING"
    @mutex.synchronize { info.write! }

    if @manager.enable_stats
      stat = MiniScheduler::Stat.create!(
        name: klass.to_s,
        hostname: hostname,
        pid: Process.pid,
        started_at: Time.now,
        live_slots_start: GC.stat[:heap_live_slots]
      )
    end

    klass.new.perform
  rescue => e
    MiniScheduler.handle_job_exception(e, message: "Running a scheduled job", job: { "class" => klass })

    error = "#{e.class}: #{e.message} #{e.backtrace.join("\n")}"
    failed = true
  end
  duration = ((Time.now.to_f - start) * 1000).to_i
  info.prev_duration = duration
  info.prev_result = failed ? "FAILED" : "OK"
  info.current_owner = nil
  if stat
    stat.update!(
      duration_ms: duration,
      live_slots_finish: GC.stat[:heap_live_slots],
      success: !failed,
      error: error
    )
    MiniScheduler.job_ran&.call(stat)
  end
  attempts(3) do
    @mutex.synchronize { info.write! }
  end
rescue => ex
  MiniScheduler.handle_job_exception(ex, message: "Processing scheduled job queue")
ensure
  @running = false
  if defined?(ActiveRecord::Base)
    ActiveRecord::Base.connection_handler.clear_active_connections!
  end
end
repair_queue() click to toggle source
# File lib/mini_scheduler/manager.rb, line 49
def repair_queue
  @manager.repair_queue
rescue => ex
  MiniScheduler.handle_job_exception(ex, message: "Scheduling manager queue repair")
end
reschedule_orphans() click to toggle source
# File lib/mini_scheduler/manager.rb, line 55
def reschedule_orphans
  @manager.reschedule_orphans!
rescue => ex
  MiniScheduler.handle_job_exception(ex, message: "Scheduling manager orphan rescheduler")
end
stop!() click to toggle source
# File lib/mini_scheduler/manager.rb, line 125
def stop!
  return if @stopped

  @mutex.synchronize do
    @stopped = true

    @keep_alive_thread.kill
    @recovery_thread.kill

    @keep_alive_thread.join
    @recovery_thread.join

    enq(nil)

    kill_thread = Thread.new do
      sleep 0.5
      @threads.each(&:kill)
    end

    @threads.each(&:join)
    kill_thread.kill
    kill_thread.join
  end
end
wait_till_done() click to toggle source
# File lib/mini_scheduler/manager.rb, line 154
def wait_till_done
  while !@queue.empty? && !(@queue.num_waiting > 0)
    sleep 0.001
  end
  # this is a hack, but is only used for test anyway
  # if tests fail that depend on this we are forced to increase it.
  sleep 0.010
  while @running
    sleep 0.001
  end
end