class Bud::PushElement
p.insert(1) p.insert(nil)
Attributes
elem_name[RW]
found_delta[R]
invalidated[RW]
outputs[R]
pendings[R]
rescan[RW]
wired_by[R]
Public Class Methods
new(name_in, bud_instance, collection_name=nil, given_schema=nil, defer_schema=false, &blk)
click to toggle source
Calls superclass method
# File lib/bud/executor/elements.rb, line 17 def initialize(name_in, bud_instance, collection_name=nil, given_schema=nil, defer_schema=false, &blk) super(name_in, bud_instance, given_schema, defer_schema) @blk = blk @outputs = Set.new @pendings = Set.new @deletes = Set.new @delete_keys = Set.new @wired_by = [] @elem_name = name_in @found_delta = false @collection_name = collection_name @invalidated = true @rescan = true end
Public Instance Methods
*(elem2, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 234 def *(elem2, &blk) join(elem2, &blk) end
<<(i)
click to toggle source
# File lib/bud/executor/elements.rb, line 184 def <<(i) insert(i, nil) end
add_rescan_invalidate(rescan, invalidate)
click to toggle source
default for stateless elements
# File lib/bud/executor/elements.rb, line 151 def add_rescan_invalidate(rescan, invalidate) # If any sources are in rescan mode, then put this node in rescan srcs = non_temporal_predecessors if srcs.any?{|p| rescan.member? p} rescan << self end # Pass the current state to each output collection and see if they end up # marking this node for rescan invalidate_tables(rescan, invalidate) # Finally, if this node is in rescan, pass the request on to all source # elements if rescan.member? self rescan.merge(srcs) end end
all?(name=nil, bud_instance=nil, the_schema=nil, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 329 def all?(name=nil, bud_instance=nil, the_schema=nil, &blk) push_predicate(:all?, name, bud_instance, the_schema, &blk) end
any?(name=nil, bud_instance=nil, the_schema=nil, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 332 def any?(name=nil, bud_instance=nil, the_schema=nil, &blk) push_predicate(:any?, name, bud_instance, the_schema, &blk) end
argagg(aggname, gbkey_cols, collection, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 294 def argagg(aggname, gbkey_cols, collection, &blk) gbkey_cols ||= [] gbkey_cols = gbkey_cols.map{|c| canonicalize_col(c)} collection = canonicalize_col(collection) toplevel = @bud_instance.toplevel agg = toplevel.send(aggname, collection)[0] unless agg.class <= Bud::ArgExemplary raise Bud::Error, "#{aggname} not declared exemplary" end aggpairs = [[agg, collection]] aa = Bud::PushArgAgg.new("argagg#{Time.new.tv_usec}".to_sym, toplevel.this_rule_context, @collection_name, gbkey_cols, aggpairs, schema, &blk) self.wire_to(aa) toplevel.push_elems[[self.object_id, :argagg, gbkey_cols, aggpairs, blk]] = aa return aa end
argmax(gbcols, col, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 311 def argmax(gbcols, col, &blk) argagg(:max, gbcols, col, &blk) end
argmin(gbcols, col, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 314 def argmin(gbcols, col, &blk) argagg(:min, gbcols, col, &blk) end
check_wiring()
click to toggle source
# File lib/bud/executor/elements.rb, line 69 def check_wiring if @blk.nil? and @outputs.empty? and @pendings.empty? and @deletes.empty? and @delete_keys.empty? raise Bud::Error, "no output specified for PushElement #{@qualified_tabname}" end end
each_with_index(&blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 214 def each_with_index(&blk) toplevel = @bud_instance.toplevel elem = Bud::PushEachWithIndex.new("each_with_index#{object_id}".to_sym, toplevel.this_rule_context, @collection_name) elem.set_block(&blk) self.wire_to(elem) toplevel.push_elems[[self.object_id, :each_with_index, blk]] = elem end
flush()
click to toggle source
# File lib/bud/executor/elements.rb, line 189 def flush end
group(keycols, *aggpairs, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 269 def group(keycols, *aggpairs, &blk) # establish schema keycols ||= [] keycols = keycols.map{|c| canonicalize_col(c)} keynames = keycols.map{|k| k[2]} aggcolsdups = aggpairs.map{|ap| ap[0].class.name.split("::").last} aggcols = [] aggcolsdups.each_with_index do |n, i| aggcols << "#{n.downcase}_#{i}".to_sym end if aggcols.empty? the_schema = keynames else the_schema = { keynames => aggcols } end aggpairs = prep_aggpairs(aggpairs) toplevel = @bud_instance.toplevel g = Bud::PushGroup.new("grp#{Time.new.tv_usec}".to_sym, toplevel.this_rule_context, @collection_name, keycols, aggpairs, the_schema, &blk) self.wire_to(g) toplevel.push_elems[[self.object_id, :group, keycols, aggpairs, blk]] = g return g end
include?(name=nil, bud_instance=nil, the_schema=nil, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 335 def include?(name=nil, bud_instance=nil, the_schema=nil, &blk) push_predicate(:include?, name, bud_instance, the_schema, &blk) end
insert(item, source=nil)
click to toggle source
# File lib/bud/executor/elements.rb, line 104 def insert(item, source=nil) push_out(item) end
inspected()
click to toggle source
# File lib/bud/executor/elements.rb, line 366 def inspected toplevel = @bud_instance.toplevel if toplevel.push_elems[[self.object_id, :inspected]].nil? ins = pro{|i| [i.inspect]} self.wire_to(ins) toplevel.push_elems[[self.object_id, :inspected]] = ins end toplevel.push_elems[[self.object_id, :inspected]] end
invalidate_cache()
click to toggle source
# File lib/bud/executor/elements.rb, line 191 def invalidate_cache end
invalidate_tables(rescan, invalidate)
click to toggle source
# File lib/bud/executor/elements.rb, line 169 def invalidate_tables(rescan, invalidate) # Exchange rescan and invalidate information with tables. If this node is # in rescan, it may invalidate an output table (if it is a scratch). And # if the output table is going to be invalidated, this node marks itself # for rescan to enable a refill of that table at run-time. [@outputs, @pendings].each do |v| v.each do |o| unless o.class <= PushElement o.add_rescan_invalidate(rescan, invalidate) rescan << self if invalidate.member? o end end end end
join(elem2, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 224 def join(elem2, &blk) elem2 = elem2.to_push_elem unless elem2.kind_of? PushElement toplevel = @bud_instance.toplevel join = Bud::PushSHJoin.new([self, elem2], toplevel.this_rule_context, []) self.wire_to(join) elem2.wire_to(join) toplevel.push_elems[[self.object_id, :join, [self, elem2], toplevel, blk]] = join toplevel.push_joins[toplevel.this_stratum] << join return join end
member?(name=nil, bud_instance=nil, the_schema=nil, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 338 def member?(name=nil, bud_instance=nil, the_schema=nil, &blk) push_predicate(:member?, name, bud_instance, the_schema, &blk) end
merge(source)
click to toggle source
# File lib/bud/executor/elements.rb, line 248 def merge(source) if source.class <= PushElement and @bud_instance.wiring? source.wire_to(self) else source.each {|i| self << i} end end
Also aliased as: <=
none?(name=nil, bud_instance=nil, the_schema=nil, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 341 def none?(name=nil, bud_instance=nil, the_schema=nil, &blk) push_predicate(:none?, name, bud_instance, the_schema, &blk) end
notin(elem2, *preds, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 238 def notin(elem2, *preds, &blk) elem2 = elem2.to_push_elem unless elem2.kind_of? PushElement toplevel = @bud_instance.toplevel notin_elem = Bud::PushNotIn.new([self, elem2], toplevel.this_rule_context, preds, &blk) self.wire_to(notin_elem) elem2.wire_to(notin_elem) toplevel.push_elems[[self.object_id, :notin, [self, elem2], toplevel, blk]] = notin_elem return notin_elem end
on_include?(item, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 357 def on_include?(item, &blk) toplevel = @bud_instance.toplevel if toplevel.push_elems[[self.object_id, :on_include?, item, blk]].nil? inc = pro{|i| blk.call(item) if i == item and not blk.nil?} wire_to(inc) toplevel.push_elems[[self.object_id, :on_include?, item, blk]] = inc end toplevel.push_elems[[self.object_id, :on_include?, item, blk]] end
one?(name=nil, bud_instance=nil, the_schema=nil, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 344 def one?(name=nil, bud_instance=nil, the_schema=nil, &blk) push_predicate(:one?, name, bud_instance, the_schema, &blk) end
print_wiring(depth=0, accum="")
click to toggle source
# File lib/bud/executor/elements.rb, line 37 def print_wiring(depth=0, accum="") depth.times {print " "} puts "#{accum} #{(self.object_id*2).to_s(16)}: #{qualified_tabname} (#{self.class})" [@outputs, @pendings, @deletes, @delete_keys].each do |kind| case kind.object_id when @outputs.object_id next_accum = "=> " when @pendings.object_id next_accum = "+> " when @deletes.object_id, @delete_keys.object_id next_accum = "-> " end kind.each do |o| if o.respond_to?(:print_wiring) o.print_wiring(depth+1, next_accum) else (depth+1).times {print " "} print "#{next_accum} " if o.class <= Bud::BudCollection puts "#{(o.object_id*2).to_s(16)}: #{o.qualified_tabname} (#{o.class})" elsif o.class <= Bud::LatticeWrapper puts "#{o.inspect}" else puts "#{(o.object_id*2).to_s(16)}: (#{o.class.name})" end end end end end
pro(the_name=elem_name, the_schema=schema, &blk)
click to toggle source
and now, the Bloom-facing methods XXX: “the_name” parameter is unused
# File lib/bud/executor/elements.rb, line 200 def pro(the_name=elem_name, the_schema=schema, &blk) toplevel = @bud_instance.toplevel elem = Bud::PushElement.new("project#{object_id}".to_sym, toplevel.this_rule_context, @collection_name, the_schema) self.wire_to(elem) elem.set_block(&blk) toplevel.push_elems[[self.object_id, :pro, blk]] = elem return elem end
Also aliased as: each, on_exists?
push_out(item, do_block=true)
click to toggle source
# File lib/bud/executor/elements.rb, line 116 def push_out(item, do_block=true) if do_block && @blk item = item.to_a if @blk.arity > 1 item = @blk.call item return if item.nil? end @outputs.each do |ou| if ou.class <= Bud::PushElement ou.insert(item, self) elsif ou.class <= Bud::BudCollection ou.do_insert(item, ou.new_delta) elsif ou.class <= Bud::LatticeWrapper ou.insert(item, self) else raise Bud::Error, "unexpected output target: #{ou.class}" end end # for the following, o is a BudCollection @deletes.each{|o| o.pending_delete([item])} @delete_keys.each{|o| o.pending_delete_keys([item])} # o is a LatticeWrapper or a BudCollection @pendings.each do |o| if o.class <= Bud::LatticeWrapper o <+ item else o.pending_merge([item]) end end end
push_predicate(pred_symbol, name=nil, bud_instance=nil, the_schema=nil, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 322 def push_predicate(pred_symbol, name=nil, bud_instance=nil, the_schema=nil, &blk) elem = Bud::PushPredicate.new(pred_symbol, name, bud_instance, the_schema, &blk) wire_to(elem) elem end
reduce(initial, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 348 def reduce(initial, &blk) retval = Bud::PushReduce.new("reduce#{Time.new.tv_usec}".to_sym, @bud_instance, @collection_name, schema, initial, &blk) self.wire_to(retval) retval end
rescan_at_tick()
click to toggle source
# File lib/bud/executor/elements.rb, line 100 def rescan_at_tick false end
set_block(&blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 75 def set_block(&blk) @blk = blk end
sort(name=nil, bud_instance=nil, the_schema=nil, &blk)
click to toggle source
# File lib/bud/executor/elements.rb, line 317 def sort(name=nil, bud_instance=nil, the_schema=nil, &blk) elem = Bud::PushSort.new(name, bud_instance, the_schema, &blk) wire_to(elem) elem end
stratum_end()
click to toggle source
# File lib/bud/executor/elements.rb, line 193 def stratum_end end
tick()
click to toggle source
# File lib/bud/executor/elements.rb, line 108 def tick invalidate_cache if @invalidated end
tick_deltas()
click to toggle source
# File lib/bud/executor/elements.rb, line 112 def tick_deltas @found_delta = false end
wire_to(element, kind=:output)
click to toggle source
# File lib/bud/executor/elements.rb, line 79 def wire_to(element, kind=:output) unless @bud_instance.wiring? raise Bud::Error, "wire_to called outside wiring phase" end case kind when :output @outputs << element when :pending @pendings << element when :delete @deletes << element when :delete_by_key @delete_keys << element else raise Bud::Error, "unrecognized wiring kind: #{kind}" end element.wired_by << self if element.respond_to? :wired_by end
wirings()
click to toggle source
# File lib/bud/executor/elements.rb, line 32 def wirings @wirings ||= @outputs + @pendings + @deletes + @delete_keys end