class Rbgo::CoRun::Scheduler
Attributes
check_interval[RW]
io_machine[RW]
io_machine_init_once[RW]
msg_queue[RW]
num_thread[RW]
supervisor_thread[RW]
task_queues[RW]
task_queues_mutex[RW]
thread_pool[RW]
Public Class Methods
new()
click to toggle source
# File lib/rbgo/corun.rb, line 203 def initialize self.num_thread = System::CPU.count rescue 8 self.thread_pool = [] self.msg_queue = Queue.new self.task_queues = Hash.new { |hash, key| hash[key] = Queue.new } self.task_queues_mutex = Mutex.new self.check_interval = 0.1 self.io_machine_init_once = Once.new msg_queue << [:init] create_supervisor_thread generate_check_msg end
Public Instance Methods
io_machine=(machine)
click to toggle source
# File lib/rbgo/corun.rb, line 191 def io_machine=(machine) @io_machine = machine end
schedule(routine, new_thread: false, queue_tag: :default)
click to toggle source
# File lib/rbgo/corun.rb, line 377 def schedule(routine, new_thread: false, queue_tag: :default) if new_thread msg_queue << [:new_thread, routine, queue_tag] else queue = get_queue(queue_tag) queue << routine end nil end
Private Instance Methods
check_thread_pool()
click to toggle source
only called by supervisor thread
# File lib/rbgo/corun.rb, line 340 def check_thread_pool temp = [] thread_pool.each do |th| case th.status when 'run' temp << th when 'sleep' performing = th.thread_variable_get(:performing) if performing th.thread_variable_set(:should_exit, true) else temp << th end end end self.thread_pool = temp n = num_thread - thread_pool.size if n > 0 n.times { create_thread } elsif n < 0 n = -n thread_pool.take(n).each do |th| th.thread_variable_set(:should_exit, true) end self.thread_pool = thread_pool.drop(n) end nil end
create_supervisor_thread()
click to toggle source
# File lib/rbgo/corun.rb, line 304 def create_supervisor_thread self.supervisor_thread = Thread.new do begin loop do msg = msg_queue.deq case msg[0] when :thread_exit, :init, :check check_thread_pool when :new_thread task = msg[1] tag = msg[2] create_thread(run_for_once: true, queue_tag: tag, init_task: task) end end ensure Rbgo.logger&.warn('Rbgo') { 'supervisor thread exit' } end end nil end
create_thread(run_for_once: false, queue_tag: :default, init_task: nil)
click to toggle source
only called by supervisor thread
# File lib/rbgo/corun.rb, line 221 def create_thread(run_for_once: false, queue_tag: :default, init_task: nil) begin thread_pool << Thread.new do Thread.current.report_on_exception = false begin should_exit = false yield_task_queue = Queue.new pending_io_task_queue = Queue.new local_task_queue = Queue.new Thread.current.thread_variable_set(LOCAL_TASK_QUEUES, [yield_task_queue, pending_io_task_queue, local_task_queue]) task_queue = get_queue(queue_tag) local_task_queue << init_task if init_task loop do task = nil if local_task_queue.empty? task = task_queue.deq(true) rescue nil if task.nil? task = yield_task_queue.deq unless yield_task_queue.empty? if task.nil? task = pending_io_task_queue.deq unless pending_io_task_queue.empty? if task.nil? task = task_queue.deq else # only pending io tasks in queue receipt = task.send(:io_receipt) if receipt && !receipt.done_flag sleep 0.1 end end else receipt = task.send(:io_receipt) if receipt pending_io_task_queue << task unless receipt.done_flag next end end end local_task_queue << task local_task_queue << yield_task_queue.deq unless yield_task_queue.empty? local_task_queue << pending_io_task_queue.deq unless pending_io_task_queue.empty? end task = local_task_queue.deq begin Thread.current.thread_variable_set(:performing, true) task.send :perform rescue Exception => ex task.error = ex Rbgo.logger&.debug('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } next ensure Thread.current.thread_variable_set(:performing, false) end if task.alive? yield_task_queue << task end if run_for_once should_exit = yield_task_queue.empty? && pending_io_task_queue.empty? && local_task_queue.empty? else should_exit = Thread.current.thread_variable_get(:should_exit) && yield_task_queue.empty? && pending_io_task_queue.empty? && local_task_queue.empty? end break if should_exit end ensure msg_queue << [:thread_exit] unless should_exit end end rescue Exception => ex Rbgo.logger&.error('Rbgo') { "#{ex.message}\n#{ex.backtrace}" } end nil end
generate_check_msg()
click to toggle source
# File lib/rbgo/corun.rb, line 325 def generate_check_msg Thread.new do begin loop do msg_queue << [:check] sleep check_interval end ensure Rbgo.logger&.warn('Rbgo') { 'check generator thread exit' } end end nil end
get_queue(tag)
click to toggle source
# File lib/rbgo/corun.rb, line 369 def get_queue(tag) task_queues_mutex.synchronize do task_queues[tag] end end