class Rbgo::CoRun::Scheduler

Attributes

check_interval[RW]
io_machine[RW]
io_machine_init_once[RW]
msg_queue[RW]
num_thread[RW]
supervisor_thread[RW]
task_queues[RW]
task_queues_mutex[RW]
thread_pool[RW]

Public Class Methods

new() click to toggle source
# File lib/rbgo/corun.rb, line 203
def initialize
  self.num_thread = System::CPU.count rescue 8
  self.thread_pool = []

  self.msg_queue         = Queue.new
  self.task_queues       = Hash.new { |hash, key| hash[key] = Queue.new }
  self.task_queues_mutex = Mutex.new

  self.check_interval = 0.1

  self.io_machine_init_once = Once.new

  msg_queue << [:init]
  create_supervisor_thread
  generate_check_msg
end

Public Instance Methods

io_machine=(machine) click to toggle source
# File lib/rbgo/corun.rb, line 191
def io_machine=(machine)
  @io_machine = machine
end
schedule(routine, new_thread: false, queue_tag: :default) click to toggle source
# File lib/rbgo/corun.rb, line 377
def schedule(routine, new_thread: false, queue_tag: :default)
  if new_thread
    msg_queue << [:new_thread, routine, queue_tag]
  else
    queue = get_queue(queue_tag)
    queue << routine
  end
  nil
end

Private Instance Methods

check_thread_pool() click to toggle source

only called by supervisor thread

# File lib/rbgo/corun.rb, line 340
def check_thread_pool
  temp = []
  thread_pool.each do |th|
    case th.status
    when 'run'
      temp << th
    when 'sleep'
      performing = th.thread_variable_get(:performing)
      if performing
        th.thread_variable_set(:should_exit, true)
      else
        temp << th
      end
    end
  end
  self.thread_pool = temp
  n                = num_thread - thread_pool.size
  if n > 0
    n.times { create_thread }
  elsif n < 0
    n = -n
    thread_pool.take(n).each do |th|
      th.thread_variable_set(:should_exit, true)
    end
    self.thread_pool = thread_pool.drop(n)
  end
  nil
end
create_supervisor_thread() click to toggle source
# File lib/rbgo/corun.rb, line 304
def create_supervisor_thread
  self.supervisor_thread = Thread.new do
    begin
      loop do
        msg = msg_queue.deq
        case msg[0]
        when :thread_exit, :init, :check
          check_thread_pool
        when :new_thread
          task = msg[1]
          tag  = msg[2]
          create_thread(run_for_once: true, queue_tag: tag, init_task: task)
        end
      end
    ensure
      Rbgo.logger&.warn('Rbgo') { 'supervisor thread exit' }
    end
  end
  nil
end
create_thread(run_for_once: false, queue_tag: :default, init_task: nil) click to toggle source

only called by supervisor thread

# File lib/rbgo/corun.rb, line 221
def create_thread(run_for_once: false, queue_tag: :default, init_task: nil)
  begin
    thread_pool << Thread.new do
      Thread.current.report_on_exception = false
      begin
        should_exit           = false
        yield_task_queue      = Queue.new
        pending_io_task_queue = Queue.new
        local_task_queue      = Queue.new
        Thread.current.thread_variable_set(LOCAL_TASK_QUEUES, [yield_task_queue, pending_io_task_queue, local_task_queue])
        task_queue = get_queue(queue_tag)
        local_task_queue << init_task if init_task
        loop do
          task = nil
          if local_task_queue.empty?
            task = task_queue.deq(true) rescue nil
            if task.nil?
              task = yield_task_queue.deq unless yield_task_queue.empty?
              if task.nil?
                task = pending_io_task_queue.deq unless pending_io_task_queue.empty?
                if task.nil?
                  task = task_queue.deq
                else
                  # only pending io tasks in queue
                  receipt = task.send(:io_receipt)
                  if receipt && !receipt.done_flag
                    sleep 0.1
                  end
                end
              else
                receipt = task.send(:io_receipt)
                if receipt
                  pending_io_task_queue << task unless receipt.done_flag
                  next
                end
              end
            end

            local_task_queue << task
            local_task_queue << yield_task_queue.deq unless yield_task_queue.empty?
            local_task_queue << pending_io_task_queue.deq unless pending_io_task_queue.empty?

          end

          task = local_task_queue.deq

          begin
            Thread.current.thread_variable_set(:performing, true)
            task.send :perform
          rescue Exception => ex
            task.error = ex
            Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
            next
          ensure
            Thread.current.thread_variable_set(:performing, false)
          end

          if task.alive?
            yield_task_queue << task
          end

          if run_for_once
            should_exit = yield_task_queue.empty? &&
              pending_io_task_queue.empty? &&
              local_task_queue.empty?
          else
            should_exit = Thread.current.thread_variable_get(:should_exit) &&
              yield_task_queue.empty? &&
              pending_io_task_queue.empty? &&
              local_task_queue.empty?
          end
          break if should_exit
        end
      ensure
        msg_queue << [:thread_exit] unless should_exit
      end
    end
  rescue Exception => ex
    Rbgo.logger&.error('Rbgo') { "#{ex.message}\n#{ex.backtrace}" }
  end
  nil
end
generate_check_msg() click to toggle source
# File lib/rbgo/corun.rb, line 325
def generate_check_msg
  Thread.new do
    begin
      loop do
        msg_queue << [:check]
        sleep check_interval
      end
    ensure
      Rbgo.logger&.warn('Rbgo') { 'check generator thread exit' }
    end
  end
  nil
end
get_queue(tag) click to toggle source
# File lib/rbgo/corun.rb, line 369
def get_queue(tag)
  task_queues_mutex.synchronize do
    task_queues[tag]
  end
end