class Computable::Variable

Attributes

calc_method[RW]
count[RW]
expired_from[RW]
in_process[RW]
name[RW]
recalc_error[RW]
used_for[RW]
value[RW]
value_calced[RW]

Public Class Methods

new(name, calc_method, comp, mutex) click to toggle source
# File lib/computable.rb, line 20
def initialize(name, calc_method, comp, mutex)
  @name = name
  @calc_method = calc_method
  @comp = comp
  @mutex = mutex
  @used_for = {}
  @expired_from = {}
  @count = 0
  @value = Unknown
  @in_process = false
  @recalc_error = nil
end

Public Instance Methods

assign_value(value) click to toggle source
# File lib/computable.rb, line 184
def assign_value(value)
  unless self.value == value
    expire_value
    expired_from.clear
    used_for.clear
    self.value = value
  end
  self.value_calced = false
end
calc!() click to toggle source
# File lib/computable.rb, line 38
def calc!
  self.count += 1
  self.value_calced = true
  @mutex.unlock
  begin
    calc_method.call(self)
  ensure
    @mutex.lock
  end
end
expire_value() click to toggle source
# File lib/computable.rb, line 49
def expire_value
  return if used_for.empty?

  puts "expire #{inspect}" if @comp.computable_debug
  used_for.each do |name2, v2|
    if v2.value_calced && !v2.expired_from[name]
      v2.expire_value
      v2.expired_from[name] = self
    end
  end
end
find_recalcable() click to toggle source
# File lib/computable.rb, line 171
def find_recalcable
  if !value_calced || expired_from.empty? || in_process
    nil
  elsif expired_from.all?{ |_, v2| !v2.value_calced || v2.expired_from.empty? }
    self
  else
    expired_from.each do |_, v2|
      node = v2.find_recalcable and return node
    end
    nil
  end
end
inspect() click to toggle source
# File lib/computable.rb, line 33
def inspect
  has = @recalc_error ? "error!" : "value:#{Unknown!=value}"
  "<Variable #{name} used_for:#{used_for.keys} expired_from:#{expired_from.keys} has #{has} value_calced:#{value_calced.inspect}>"
end
master_loop(max_threads, workers, from_workers, to_workers) click to toggle source
# File lib/computable.rb, line 129
def master_loop(max_threads, workers, from_workers, to_workers)
  num_working = 0
  loop do
    if num_working == max_threads || !(node = find_recalcable)
      #
      # maxed out or no nodes available -- wait for results
      #
      return if num_working == 0

      puts "recalc join" if @comp.computable_debug
      @mutex.unlock
      begin
        node, recalced_value, err = from_workers.pop
      ensure
        @mutex.lock
      end
      node.in_process = false
      num_working -= 1

      if err
        # Add the backtrace of the caller to the small in-thread backtrace for better debugging
        err.set_backtrace(err.backtrace + caller)
      end

      node.process_recalced_value(recalced_value, err)
    else
      #
      # not maxed out and found a node -- compute it
      #
      if (max_threads && workers.size < max_threads) ||
         (!max_threads && num_working == workers.size)
        workers << new_worker(from_workers, to_workers)
      end
      node.in_process = true
      node.count += 1
      node.value_calced = true
      num_working += 1
      to_workers.push(node)
    end
  end
end
new_worker(from_workers, to_workers) click to toggle source
# File lib/computable.rb, line 104
def new_worker(from_workers, to_workers)
  Thread.new do
    while v = to_workers.pop
      puts "recalc parallel #{v.inspect}" if @comp.computable_debug
      err = nil
      begin
        recalced_value = v.calc_method.call(v)
      rescue Exception => err
      end
      from_workers.push([v, recalced_value, err])
    end
  end
end
process_recalced_value(recalced_value, err) click to toggle source
# File lib/computable.rb, line 72
def process_recalced_value(recalced_value, err)
  if err
    self.recalc_error = err
    self.value = Unknown
    used_for.clear
  elsif self.value == recalced_value
    revoke_expire
  else
    self.recalc_error = nil
    self.value = recalced_value
    used_for.clear
  end
  expired_from.clear
end
query_value(kaller) click to toggle source
# File lib/computable.rb, line 194
def query_value(kaller)
  if kaller
    v2 = used_for[kaller.name]
    if v2
      if Unknown==value && Unknown==v2.value && value_calced && v2.value_calced
        raise RecursionDetected, "#{v2.name} depends on #{name}, but #{name} could not be computed without #{v2.name}"
      end
    else
      used_for[kaller.name] = kaller
    end
  end

  max_threads = @comp.computable_max_threads
  if !max_threads || max_threads > 0
    recalc_parallel(max_threads)
  else
    recalc_value
  end

  raise recalc_error if recalc_error
  self.value = calc! if Unknown==value
  self.value
end
recalc_parallel(max_threads) click to toggle source
# File lib/computable.rb, line 118
def recalc_parallel(max_threads)
  workers = []
  from_workers = Queue.new
  to_workers = Queue.new

  master_loop(max_threads, workers, from_workers, to_workers)

  to_workers.close
  workers.each { |t| t.join }
end
recalc_value() click to toggle source
# File lib/computable.rb, line 87
def recalc_value
  return if !value_calced || expired_from.empty?

  puts "recalc #{inspect}" if @comp.computable_debug
  expired_from.each do |name2, v2|
    v2.recalc_value
  end

  unless expired_from.empty?
    begin
      recalced_value = self.calc!
    rescue Exception => err
    end
    process_recalced_value(recalced_value, err)
  end
end
revoke_expire() click to toggle source
# File lib/computable.rb, line 61
def revoke_expire
  return if used_for.empty?

  puts "revoke expire #{inspect}" if @comp.computable_debug
  used_for.each do |name2, v2|
    if v2.value_calced && v2.expired_from.delete(name) && v2.expired_from.empty?
      v2.revoke_expire
    end
  end
end