module Sorta::Parallel::Reactor
Public Class Methods
queues()
click to toggle source
# File lib/sorta/parallel/reactor.rb, line 18 def self.queues Ractor.current[:queues] ||= {} end
register(task)
click to toggle source
# File lib/sorta/parallel/reactor.rb, line 22 def self.register(task) queues[task[1].object_id] = task[1] safe_task = task.dup safe_task[1] = task[1].object_id request_pipe << [safe_task, result_pipe] end
request_pipe()
click to toggle source
# File lib/sorta/parallel/reactor.rb, line 6 def self.request_pipe Ractor.current[:request_pipe] ||= Ractor.new { loop { Ractor.yield(Ractor.receive) } } end
result_pipe()
click to toggle source
# File lib/sorta/parallel/reactor.rb, line 10 def self.result_pipe Ractor.current[:result_pipe] ||= Ractor.new { loop { Ractor.yield(Ractor.receive) } } end
start!()
click to toggle source
# File lib/sorta/parallel/reactor.rb, line 29 def self.start! request_pipe result_pipe workers Ractor.current[:thread] ||= Thread.new do loop do msg = result_pipe.take result_queue = queues[msg[1]] result_queue.push(msg) rescue => e puts e.message puts e.backtrace.join("\n") next end end end
workers()
click to toggle source
# File lib/sorta/parallel/reactor.rb, line 14 def self.workers Ractor.current[:workers] ||= Etc.nprocessors.times.map { Worker.new(request_pipe) }.freeze end