class Computable::Variable
Attributes
calc_method[RW]
count[RW]
expired_from[RW]
in_process[RW]
name[RW]
recalc_error[RW]
used_for[RW]
value[RW]
value_calced[RW]
Public Class Methods
new(name, calc_method, comp, mutex)
click to toggle source
# File lib/computable.rb, line 20 def initialize(name, calc_method, comp, mutex) @name = name @calc_method = calc_method @comp = comp @mutex = mutex @used_for = {} @expired_from = {} @count = 0 @value = Unknown @in_process = false @recalc_error = nil end
Public Instance Methods
assign_value(value)
click to toggle source
# File lib/computable.rb, line 184 def assign_value(value) unless self.value == value expire_value expired_from.clear used_for.clear self.value = value end self.value_calced = false end
calc!()
click to toggle source
# File lib/computable.rb, line 38 def calc! self.count += 1 self.value_calced = true @mutex.unlock begin calc_method.call(self) ensure @mutex.lock end end
expire_value()
click to toggle source
# File lib/computable.rb, line 49 def expire_value return if used_for.empty? puts "expire #{inspect}" if @comp.computable_debug used_for.each do |name2, v2| if v2.value_calced && !v2.expired_from[name] v2.expire_value v2.expired_from[name] = self end end end
find_recalcable()
click to toggle source
# File lib/computable.rb, line 171 def find_recalcable if !value_calced || expired_from.empty? || in_process nil elsif expired_from.all?{ |_, v2| !v2.value_calced || v2.expired_from.empty? } self else expired_from.each do |_, v2| node = v2.find_recalcable and return node end nil end end
inspect()
click to toggle source
# File lib/computable.rb, line 33 def inspect has = @recalc_error ? "error!" : "value:#{Unknown!=value}" "<Variable #{name} used_for:#{used_for.keys} expired_from:#{expired_from.keys} has #{has} value_calced:#{value_calced.inspect}>" end
master_loop(max_threads, workers, from_workers, to_workers)
click to toggle source
# File lib/computable.rb, line 129 def master_loop(max_threads, workers, from_workers, to_workers) num_working = 0 loop do if num_working == max_threads || !(node = find_recalcable) # # maxed out or no nodes available -- wait for results # return if num_working == 0 puts "recalc join" if @comp.computable_debug @mutex.unlock begin node, recalced_value, err = from_workers.pop ensure @mutex.lock end node.in_process = false num_working -= 1 if err # Add the backtrace of the caller to the small in-thread backtrace for better debugging err.set_backtrace(err.backtrace + caller) end node.process_recalced_value(recalced_value, err) else # # not maxed out and found a node -- compute it # if (max_threads && workers.size < max_threads) || (!max_threads && num_working == workers.size) workers << new_worker(from_workers, to_workers) end node.in_process = true node.count += 1 node.value_calced = true num_working += 1 to_workers.push(node) end end end
new_worker(from_workers, to_workers)
click to toggle source
# File lib/computable.rb, line 104 def new_worker(from_workers, to_workers) Thread.new do while v = to_workers.pop puts "recalc parallel #{v.inspect}" if @comp.computable_debug err = nil begin recalced_value = v.calc_method.call(v) rescue Exception => err end from_workers.push([v, recalced_value, err]) end end end
process_recalced_value(recalced_value, err)
click to toggle source
# File lib/computable.rb, line 72 def process_recalced_value(recalced_value, err) if err self.recalc_error = err self.value = Unknown used_for.clear elsif self.value == recalced_value revoke_expire else self.recalc_error = nil self.value = recalced_value used_for.clear end expired_from.clear end
query_value(kaller)
click to toggle source
# File lib/computable.rb, line 194 def query_value(kaller) if kaller v2 = used_for[kaller.name] if v2 if Unknown==value && Unknown==v2.value && value_calced && v2.value_calced raise RecursionDetected, "#{v2.name} depends on #{name}, but #{name} could not be computed without #{v2.name}" end else used_for[kaller.name] = kaller end end max_threads = @comp.computable_max_threads if !max_threads || max_threads > 0 recalc_parallel(max_threads) else recalc_value end raise recalc_error if recalc_error self.value = calc! if Unknown==value self.value end
recalc_parallel(max_threads)
click to toggle source
# File lib/computable.rb, line 118 def recalc_parallel(max_threads) workers = [] from_workers = Queue.new to_workers = Queue.new master_loop(max_threads, workers, from_workers, to_workers) to_workers.close workers.each { |t| t.join } end
recalc_value()
click to toggle source
# File lib/computable.rb, line 87 def recalc_value return if !value_calced || expired_from.empty? puts "recalc #{inspect}" if @comp.computable_debug expired_from.each do |name2, v2| v2.recalc_value end unless expired_from.empty? begin recalced_value = self.calc! rescue Exception => err end process_recalced_value(recalced_value, err) end end
revoke_expire()
click to toggle source
# File lib/computable.rb, line 61 def revoke_expire return if used_for.empty? puts "revoke expire #{inspect}" if @comp.computable_debug used_for.each do |name2, v2| if v2.value_calced && v2.expired_from.delete(name) && v2.expired_from.empty? v2.revoke_expire end end end