class ThomasUtils::Future

Constants

DEFAULT_EXECUTOR
IMMEDIATE_EXECUTOR

Public Class Methods

all(observations) click to toggle source
# File lib/thomas_utils/future.rb, line 22
def self.all(observations)
  observable = Concurrent::IVar.new
  left = observations.count
  buffer = [nil] * left
  mutex = Mutex.new
  observations.each_with_index do |observation, index|
    observation.on_complete do |value, error|
      if error
        observable.fail(error)
      else
        buffer[index] = value
        done = false
        mutex.synchronize do
          left -= 1
          done = !!left.zero?
        end
        observable.set(buffer) if done
      end
    end
  end
  Observation.new(DEFAULT_EXECUTOR, observable)
end
error(error) click to toggle source
# File lib/thomas_utils/future.rb, line 18
def self.error(error)
  Observation.new(IMMEDIATE_EXECUTOR, ConstantVar.error(error))
end
immediate(&block) click to toggle source
# File lib/thomas_utils/future.rb, line 10
def self.immediate(&block)
  none.then(&block)
end
new(options = {}, &block) click to toggle source
Calls superclass method
# File lib/thomas_utils/future.rb, line 45
def initialize(options = {}, &block)
  executor = options.fetch(:executor) { DEFAULT_EXECUTOR }
  observable = Concurrent::Future.execute(executor: executor, &block)
  super(executor, observable)
end
none() click to toggle source
# File lib/thomas_utils/future.rb, line 14
def self.none
  Observation.new(IMMEDIATE_EXECUTOR, ConstantVar.none)
end
value(value) click to toggle source
# File lib/thomas_utils/future.rb, line 6
def self.value(value)
  Observation.new(IMMEDIATE_EXECUTOR, ConstantVar.value(value))
end