class Bud::PushElement

p.insert(1) p.insert(nil)

Attributes

elem_name[RW]
found_delta[R]
invalidated[RW]
outputs[R]
pendings[R]
rescan[RW]
wired_by[R]

Public Class Methods

new(name_in, bud_instance, collection_name=nil, given_schema=nil, defer_schema=false, &blk) click to toggle source
Calls superclass method
# File lib/bud/executor/elements.rb, line 17
def initialize(name_in, bud_instance, collection_name=nil, given_schema=nil, defer_schema=false, &blk)
  super(name_in, bud_instance, given_schema, defer_schema)
  @blk = blk
  @outputs = Set.new
  @pendings = Set.new
  @deletes = Set.new
  @delete_keys = Set.new
  @wired_by = []
  @elem_name = name_in
  @found_delta = false
  @collection_name = collection_name
  @invalidated = true
  @rescan = true
end

Public Instance Methods

*(elem2, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 234
def *(elem2, &blk)
  join(elem2, &blk)
end
<<(i) click to toggle source
# File lib/bud/executor/elements.rb, line 184
def <<(i)
  insert(i, nil)
end
<=(source)
Alias for: merge
add_rescan_invalidate(rescan, invalidate) click to toggle source

default for stateless elements

# File lib/bud/executor/elements.rb, line 151
def add_rescan_invalidate(rescan, invalidate)
  # If any sources are in rescan mode, then put this node in rescan
  srcs = non_temporal_predecessors
  if srcs.any?{|p| rescan.member? p}
    rescan << self
  end

  # Pass the current state to each output collection and see if they end up
  # marking this node for rescan
  invalidate_tables(rescan, invalidate)

  # Finally, if this node is in rescan, pass the request on to all source
  # elements
  if rescan.member? self
    rescan.merge(srcs)
  end
end
all?(name=nil, bud_instance=nil, the_schema=nil, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 329
def all?(name=nil, bud_instance=nil, the_schema=nil, &blk)
  push_predicate(:all?, name, bud_instance, the_schema, &blk)
end
any?(name=nil, bud_instance=nil, the_schema=nil, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 332
def any?(name=nil, bud_instance=nil, the_schema=nil, &blk)
  push_predicate(:any?, name, bud_instance, the_schema, &blk)
end
argagg(aggname, gbkey_cols, collection, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 294
def argagg(aggname, gbkey_cols, collection, &blk)
  gbkey_cols ||= []
  gbkey_cols = gbkey_cols.map{|c| canonicalize_col(c)}
  collection = canonicalize_col(collection)
  toplevel = @bud_instance.toplevel
  agg = toplevel.send(aggname, collection)[0]
  unless agg.class <= Bud::ArgExemplary
    raise Bud::Error, "#{aggname} not declared exemplary"
  end

  aggpairs = [[agg, collection]]
  aa = Bud::PushArgAgg.new("argagg#{Time.new.tv_usec}".to_sym, toplevel.this_rule_context,
                           @collection_name, gbkey_cols, aggpairs, schema, &blk)
  self.wire_to(aa)
  toplevel.push_elems[[self.object_id, :argagg, gbkey_cols, aggpairs, blk]] = aa
  return aa
end
argmax(gbcols, col, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 311
def argmax(gbcols, col, &blk)
  argagg(:max, gbcols, col, &blk)
end
argmin(gbcols, col, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 314
def argmin(gbcols, col, &blk)
  argagg(:min, gbcols, col, &blk)
end
check_wiring() click to toggle source
# File lib/bud/executor/elements.rb, line 69
def check_wiring
  if @blk.nil? and @outputs.empty? and @pendings.empty? and @deletes.empty? and @delete_keys.empty?
    raise Bud::Error, "no output specified for PushElement #{@qualified_tabname}"
  end
end
each(the_name=elem_name, the_schema=schema, &blk)
Alias for: pro
each_with_index(&blk) click to toggle source
# File lib/bud/executor/elements.rb, line 214
def each_with_index(&blk)
  toplevel = @bud_instance.toplevel
  elem = Bud::PushEachWithIndex.new("each_with_index#{object_id}".to_sym,
                                    toplevel.this_rule_context,
                                    @collection_name)
  elem.set_block(&blk)
  self.wire_to(elem)
  toplevel.push_elems[[self.object_id, :each_with_index, blk]] = elem
end
flush() click to toggle source
# File lib/bud/executor/elements.rb, line 189
def flush
end
group(keycols, *aggpairs, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 269
def group(keycols, *aggpairs, &blk)
  # establish schema
  keycols ||= []
  keycols = keycols.map{|c| canonicalize_col(c)}
  keynames = keycols.map{|k| k[2]}
  aggcolsdups = aggpairs.map{|ap| ap[0].class.name.split("::").last}
  aggcols = []
  aggcolsdups.each_with_index do |n, i|
    aggcols << "#{n.downcase}_#{i}".to_sym
  end
  if aggcols.empty?
    the_schema = keynames
  else
    the_schema = { keynames => aggcols }
  end

  aggpairs = prep_aggpairs(aggpairs)
  toplevel = @bud_instance.toplevel
  g = Bud::PushGroup.new("grp#{Time.new.tv_usec}".to_sym, toplevel.this_rule_context,
                         @collection_name, keycols, aggpairs, the_schema, &blk)
  self.wire_to(g)
  toplevel.push_elems[[self.object_id, :group, keycols, aggpairs, blk]] = g
  return g
end
include?(name=nil, bud_instance=nil, the_schema=nil, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 335
def include?(name=nil, bud_instance=nil, the_schema=nil, &blk)
  push_predicate(:include?, name, bud_instance, the_schema, &blk)
end
insert(item, source=nil) click to toggle source
# File lib/bud/executor/elements.rb, line 104
def insert(item, source=nil)
  push_out(item)
end
inspected() click to toggle source
# File lib/bud/executor/elements.rb, line 366
def inspected
  toplevel = @bud_instance.toplevel
  if toplevel.push_elems[[self.object_id, :inspected]].nil?
    ins = pro{|i| [i.inspect]}
    self.wire_to(ins)
    toplevel.push_elems[[self.object_id, :inspected]] = ins
  end
  toplevel.push_elems[[self.object_id, :inspected]]
end
invalidate_cache() click to toggle source
# File lib/bud/executor/elements.rb, line 191
def invalidate_cache
end
invalidate_tables(rescan, invalidate) click to toggle source
# File lib/bud/executor/elements.rb, line 169
def invalidate_tables(rescan, invalidate)
  # Exchange rescan and invalidate information with tables. If this node is
  # in rescan, it may invalidate an output table (if it is a scratch). And
  # if the output table is going to be invalidated, this node marks itself
  # for rescan to enable a refill of that table at run-time.
  [@outputs, @pendings].each do |v|
    v.each do |o|
      unless o.class <= PushElement
        o.add_rescan_invalidate(rescan, invalidate)
        rescan << self if invalidate.member? o
      end
    end
  end
end
join(elem2, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 224
def join(elem2, &blk)
  elem2 = elem2.to_push_elem unless elem2.kind_of? PushElement
  toplevel = @bud_instance.toplevel
  join = Bud::PushSHJoin.new([self, elem2], toplevel.this_rule_context, [])
  self.wire_to(join)
  elem2.wire_to(join)
  toplevel.push_elems[[self.object_id, :join, [self, elem2], toplevel, blk]] = join
  toplevel.push_joins[toplevel.this_stratum] << join
  return join
end
member?(name=nil, bud_instance=nil, the_schema=nil, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 338
def member?(name=nil, bud_instance=nil, the_schema=nil, &blk)
  push_predicate(:member?, name, bud_instance, the_schema, &blk)
end
merge(source) click to toggle source
# File lib/bud/executor/elements.rb, line 248
def merge(source)
  if source.class <= PushElement and @bud_instance.wiring?
    source.wire_to(self)
  else
    source.each {|i| self << i}
  end
end
Also aliased as: <=
none?(name=nil, bud_instance=nil, the_schema=nil, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 341
def none?(name=nil, bud_instance=nil, the_schema=nil, &blk)
  push_predicate(:none?, name, bud_instance, the_schema, &blk)
end
notin(elem2, *preds, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 238
def notin(elem2, *preds, &blk)
  elem2 = elem2.to_push_elem unless elem2.kind_of? PushElement
  toplevel = @bud_instance.toplevel
  notin_elem = Bud::PushNotIn.new([self, elem2], toplevel.this_rule_context, preds, &blk)
  self.wire_to(notin_elem)
  elem2.wire_to(notin_elem)
  toplevel.push_elems[[self.object_id, :notin, [self, elem2], toplevel, blk]] = notin_elem
  return notin_elem
end
on_exists?(the_name=elem_name, the_schema=schema, &blk)
Alias for: pro
on_include?(item, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 357
def on_include?(item, &blk)
  toplevel = @bud_instance.toplevel
  if toplevel.push_elems[[self.object_id, :on_include?, item, blk]].nil?
    inc = pro{|i| blk.call(item) if i == item and not blk.nil?}
    wire_to(inc)
    toplevel.push_elems[[self.object_id, :on_include?, item, blk]] = inc
  end
  toplevel.push_elems[[self.object_id, :on_include?, item, blk]]
end
one?(name=nil, bud_instance=nil, the_schema=nil, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 344
def one?(name=nil, bud_instance=nil, the_schema=nil, &blk)
  push_predicate(:one?, name, bud_instance, the_schema, &blk)
end
print_wiring(depth=0, accum="") click to toggle source
pro(the_name=elem_name, the_schema=schema, &blk) click to toggle source

and now, the Bloom-facing methods XXX: “the_name” parameter is unused

# File lib/bud/executor/elements.rb, line 200
def pro(the_name=elem_name, the_schema=schema, &blk)
  toplevel = @bud_instance.toplevel
  elem = Bud::PushElement.new("project#{object_id}".to_sym,
                              toplevel.this_rule_context,
                              @collection_name, the_schema)
  self.wire_to(elem)
  elem.set_block(&blk)
  toplevel.push_elems[[self.object_id, :pro, blk]] = elem
  return elem
end
Also aliased as: each, on_exists?
push_out(item, do_block=true) click to toggle source
# File lib/bud/executor/elements.rb, line 116
def push_out(item, do_block=true)
  if do_block && @blk
    item = item.to_a if @blk.arity > 1
    item = @blk.call item
    return if item.nil?
  end

  @outputs.each do |ou|
    if ou.class <= Bud::PushElement
      ou.insert(item, self)
    elsif ou.class <= Bud::BudCollection
      ou.do_insert(item, ou.new_delta)
    elsif ou.class <= Bud::LatticeWrapper
      ou.insert(item, self)
    else
      raise Bud::Error, "unexpected output target: #{ou.class}"
    end
  end

  # for the following, o is a BudCollection
  @deletes.each{|o| o.pending_delete([item])}
  @delete_keys.each{|o| o.pending_delete_keys([item])}

  # o is a LatticeWrapper or a BudCollection
  @pendings.each do |o|
    if o.class <= Bud::LatticeWrapper
      o <+ item
    else
      o.pending_merge([item])
    end
  end
end
push_predicate(pred_symbol, name=nil, bud_instance=nil, the_schema=nil, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 322
def push_predicate(pred_symbol, name=nil, bud_instance=nil,
                   the_schema=nil, &blk)
  elem = Bud::PushPredicate.new(pred_symbol, name, bud_instance,
                                the_schema, &blk)
  wire_to(elem)
  elem
end
reduce(initial, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 348
def reduce(initial, &blk)
  retval = Bud::PushReduce.new("reduce#{Time.new.tv_usec}".to_sym,
                               @bud_instance, @collection_name,
                               schema, initial, &blk)
  self.wire_to(retval)
  retval
end
rescan_at_tick() click to toggle source
# File lib/bud/executor/elements.rb, line 100
def rescan_at_tick
  false
end
set_block(&blk) click to toggle source
# File lib/bud/executor/elements.rb, line 75
def set_block(&blk)
  @blk = blk
end
sort(name=nil, bud_instance=nil, the_schema=nil, &blk) click to toggle source
# File lib/bud/executor/elements.rb, line 317
def sort(name=nil, bud_instance=nil, the_schema=nil, &blk)
  elem = Bud::PushSort.new(name, bud_instance, the_schema, &blk)
  wire_to(elem)
  elem
end
stratum_end() click to toggle source
# File lib/bud/executor/elements.rb, line 193
def stratum_end
end
tick() click to toggle source
# File lib/bud/executor/elements.rb, line 108
def tick
  invalidate_cache if @invalidated
end
tick_deltas() click to toggle source
# File lib/bud/executor/elements.rb, line 112
def tick_deltas
  @found_delta = false
end
wire_to(element, kind=:output) click to toggle source
# File lib/bud/executor/elements.rb, line 79
def wire_to(element, kind=:output)
  unless @bud_instance.wiring?
    raise Bud::Error, "wire_to called outside wiring phase"
  end

  case kind
  when :output
    @outputs << element
  when :pending
    @pendings << element
  when :delete
    @deletes << element
  when :delete_by_key
    @delete_keys << element
  else
    raise Bud::Error, "unrecognized wiring kind: #{kind}"
  end

  element.wired_by << self if element.respond_to? :wired_by
end
wirings() click to toggle source
# File lib/bud/executor/elements.rb, line 32
def wirings
  @wirings ||= @outputs + @pendings + @deletes + @delete_keys
end