class BatchAny::Manager
Attributes
exceptions[R]
Public Class Methods
new()
click to toggle source
# File lib/batch_any/manager.rb, line 14 def initialize @computations = [] @awaiting_services = {} end
Public Instance Methods
add_computation(&block)
click to toggle source
# File lib/batch_any/manager.rb, line 19 def add_computation(&block) fiber = Fiber.new do Thread.current[:batch_any_manager] = self block.call end @computations << fiber fiber end
enqueue_item(item)
click to toggle source
# File lib/batch_any/manager.rb, line 53 def enqueue_item(item) service_class = item.service_class @awaiting_services[service_class] ||= [] awaiting_services = @awaiting_services[service_class] service = awaiting_services.find { |service| service.can_serve?(item) } if service service.items << item else awaiting_services << item.service_class.new(item) end Fiber.yield end
run()
click to toggle source
# File lib/batch_any/manager.rb, line 28 def run @exceptions = [] while @computations.any? @computations.each do |computation| begin computation.resume rescue => e @exceptions << FiberError.new(computation, e) end end linear_keep_if!(@computations, &:alive?) @awaiting_services.values.each do |services| services.each do |service| begin service.fetch rescue => e service.items.each { |item| item.exception = e } end end end @awaiting_services.clear end @exceptions end
Private Instance Methods
linear_keep_if!(arr) { |v| ... }
click to toggle source
bugs.ruby-lang.org/issues/10714 github.com/ruby/ruby/commit/5ec029d1ea52224a365a11987379c3e9de74b47a
# File lib/batch_any/manager.rb, line 70 def linear_keep_if!(arr) i = 0 j = 0 while i < arr.length v = arr[i] if yield v arr[j] = v j += 1 end i += 1 end i != j ? arr : nil ensure if i != j arr[j, i-j] = [] end end