class Bud::BudCollection

Attributes

accumulate_tick_deltas[RW]
invalidated[RW]
is_source[RW]
rescan[RW]
scanner_cnt[R]
struct[R]
wired_by[R]

Private Class Methods

parse_schema(given_schema) click to toggle source

The user-specified schema might come in two forms: a hash of Array => Array (key_cols => remaining columns), or simply an Array of columns (if no key_cols were specified). Return a pair: [list of (all) columns, list of key columns]

# File lib/bud/collections.rb, line 84
def self.parse_schema(given_schema)
  if given_schema.respond_to? :keys
    raise Bud::Error, "invalid schema for #{tabname}" if given_schema.length != 1
    key_cols = given_schema.keys.first
    val_cols = given_schema.values.first
  else
    key_cols = given_schema
    val_cols = []
  end

  cols = key_cols + val_cols
  cols.each do |c|
    if c.class != Symbol
      raise Bud::Error, "invalid column name \"#{c}\", type \"#{c.class}\""
    end
  end
  if cols.uniq.length < cols.length
    raise Bud::Error, "schema #{given_schema.inspect} contains duplicate names"
  end

  return [cols, key_cols]
end

Public Instance Methods

*(collection) click to toggle source

form a collection containing all pairs of items in self and items in collection

# File lib/bud/collections.rb, line 774
def *(collection)
  return to_push_elem.join(collection)
end
<<(item) click to toggle source

instantaneously place an individual item from rhs into collection on lhs

# File lib/bud/collections.rb, line 505
def <<(item)
  insert(item)
end
<=(collection) click to toggle source

instantaneously merge items from collection o into buf

# File lib/bud/collections.rb, line 601
def <=(collection)
  unless bud_instance.toplevel.inside_tick
    raise Bud::CompileError, "illegal use of <= outside of bloom block, use <+ instead"
  end

  merge(collection)
end
[](k) click to toggle source

return item with key k

# File lib/bud/collections.rb, line 359
def [](k)
  # assumes that key is in storage or delta, but not both
  # is this enforced in do_insert?
  check_enumerable(k)
  t = @storage[k]
  return t.nil? ? @delta[k] : t
end
add_rescan_invalidate(rescan, invalidate) click to toggle source
# File lib/bud/collections.rb, line 681
def add_rescan_invalidate(rescan, invalidate)
  # No change. Most collections don't need to rescan on every tick (only do
  # so on negate). Also, there's no cache to invalidate by default.
  # Scratches and PushElements override this method.
end
argagg(aggname, gbkey_cols, collection, &blk) click to toggle source

a generalization of argmin/argmax to arbitrary exemplary aggregates. for each distinct value of the grouping key columns, return the items in that group that have the value of the exemplary aggregate aggname

# File lib/bud/collections.rb, line 746
def argagg(aggname, gbkey_cols, collection, &blk)
  elem = to_push_elem
  gbkey_cols = gbkey_cols.map{|k| canonicalize_col(k)} unless gbkey_cols.nil?
  retval = elem.argagg(aggname, gbkey_cols, canonicalize_col(collection), &blk)
  # PushElement inherits the schema accessors from this Collection
  retval.extend @cols_access
  retval
end
argmax(gbkey_cols, col, &blk) click to toggle source

for each distinct value of the grouping key columns, return the items in that group that have the maximum value of the attribute col. Note that multiple tuples might be returned.

# File lib/bud/collections.rb, line 767
def argmax(gbkey_cols, col, &blk)
  argagg(:max, gbkey_cols, col, &blk)
end
argmin(gbkey_cols, col, &blk) click to toggle source

for each distinct value of the grouping key columns, return the items in that group that have the minimum value of the attribute col. Note that multiple tuples might be returned.

# File lib/bud/collections.rb, line 759
def argmin(gbkey_cols, col, &blk)
  argagg(:min, gbkey_cols, col, &blk)
end
bootstrap() click to toggle source
# File lib/bud/collections.rb, line 687
def bootstrap
  unless @pending.empty?
    @delta = @pending
    @pending = {}
  end
end
canonicalize_col(col) click to toggle source
# File lib/bud/collections.rb, line 801
def canonicalize_col(col)
  col.class <= Symbol ? self.send(col) : col
end
do_insert(t, store) click to toggle source
# File lib/bud/collections.rb, line 440
def do_insert(t, store)
  if $BUD_DEBUG
    storetype = case store.object_id
                  when @storage.object_id; "storage"
                  when @pending.object_id; "pending"
                  when @delta.object_id; "delta"
                  when @new_delta.object_id; "new_delta"
                end
    puts "#{qualified_tabname}.#{storetype} ==> #{t.inspect}"
  end
  return if t.nil? # silently ignore nils resulting from map predicates failing
  t = prep_tuple(t)
  key = get_key_vals(t)
  merge_to_buf(store, key, t, store[key])
end
each_delta(&block) click to toggle source
# File lib/bud/collections.rb, line 281
def each_delta(&block)
  each_from([@delta], &block)
end
each_raw(&block) click to toggle source
# File lib/bud/collections.rb, line 276
def each_raw(&block)
  each_from([@storage], &block)
end
each_tick_delta(&block) click to toggle source
# File lib/bud/collections.rb, line 286
def each_tick_delta(&block)
  @tick_delta.each(&block)
end
each_with_index(the_name=tabname, the_schema=schema, &blk) click to toggle source

XXX: Although we support each_with_index over Bud collections, using it is probably not a great idea: the index assigned to a given collection member is not defined by the language semantics.

Calls superclass method
# File lib/bud/collections.rb, line 214
def each_with_index(the_name=tabname, the_schema=schema, &blk)
  if @bud_instance.wiring?
    pusher = to_push_elem(the_name, the_schema)
    pusher.each_with_index(&blk)
  else
    super(&blk)
  end
end
empty?() click to toggle source
# File lib/bud/collections.rb, line 380
def empty?
  length == 0
end
exists?() { |t| ... } click to toggle source

checks for an item for which block produces a match

# File lib/bud/collections.rb, line 386
def exists?(&block)
  if length == 0
    return false
  elsif not block_given?
    return true
  else
    return ((detect{|t| yield t}).nil?) ? false : true
  end
end
flat_map(&blk) click to toggle source

ruby 1.9 defines flat_map to return “a new array with the concatenated results of running block once for every element”. So we wire the input to a pro(&blk), and wire the output of that pro to a group that does accum.

# File lib/bud/collections.rb, line 228
def flat_map(&blk)
  if @bud_instance.wiring?
    pusher = self.pro(&blk)
    toplevel = @bud_instance.toplevel
    elem = Bud::PushElement.new(tabname, toplevel.this_rule_context, tabname)
    pusher.wire_to(elem)
    f = Proc.new do |t|
      t.each do |i|
        elem.push_out(i, false)
      end
      nil
    end
    elem.set_block(&f)
    toplevel.push_elems[[self.object_id, :flatten]] = elem
    elem
  else
    @storage.flat_map(&blk)
  end
end
flush() click to toggle source
# File lib/bud/collections.rb, line 621
def flush ; end
flush_deltas() click to toggle source
# File lib/bud/collections.rb, line 695
def flush_deltas
  if $BUD_DEBUG
    puts "#{qualified_tabname}.flush delta --> storage" unless @delta.empty?
    puts "#{qualified_tabname}.flush new_delta --> storage" unless @new_delta.empty?
  end
  unless @delta.empty?
    @storage.merge!(@delta)
    @tick_delta.concat(@delta.values) if accumulate_tick_deltas
    @delta.clear
  end
  unless @new_delta.empty?
    @storage.merge!(@new_delta)
    @new_delta.clear
  end
  # @tick_delta kept around for higher strata.
end
group(key_cols, *aggpairs, &blk) click to toggle source
# File lib/bud/collections.rb, line 789
def group(key_cols, *aggpairs, &blk)
  key_cols = key_cols.map{|k| canonicalize_col(k)} unless key_cols.nil?
  aggpairs = prep_aggpairs(aggpairs)
  return to_push_elem.group(key_cols, *aggpairs, &blk)
end
has_key?(k) click to toggle source

checks for key k in the key columns

# File lib/bud/collections.rb, line 351
def has_key?(k)
  check_enumerable(k)
  return false if k.nil? or self[k].nil?
  return true
end
include?(item) click to toggle source

checks for item in the collection

# File lib/bud/collections.rb, line 369
def include?(item)
  return true if key_cols.nil?
  return false if item.nil?
  key = get_key_vals(item)
  return (item == self[key])
end
init_schema(given_schema) click to toggle source
# File lib/bud/collections.rb, line 45
def init_schema(given_schema)
  given_schema ||= {[:key]=>[:val]}
  @given_schema = given_schema
  @cols, @key_cols = BudCollection.parse_schema(given_schema)

  # Check that no location specifiers appear in the schema. In the case of
  # channels, the location specifier has already been stripped from the
  # user-specified schema.
  @cols.each do |s|
    if s.to_s.start_with? "@"
      raise Bud::CompileError, "illegal use of location specifier (@) in column #{s} of non-channel collection #{tabname}"
    end
  end

  @key_colnums = @key_cols.map {|k| @cols.index(k)}
  @val_colnums = val_cols.map {|k| @cols.index(k)}

  if @cols.empty?
    @cols = nil
  else
    @struct = Bud::TupleStruct.new_struct(@cols)
    @structlen = @struct.members.length
  end
  setup_accessors
end
inspect() click to toggle source
# File lib/bud/collections.rb, line 75
def inspect
  "#{self.class}:#{self.object_id.to_s(16)} [#{qualified_tabname}]"
end
inspected() click to toggle source

map each item in the collection into a string, suitable for placement in stdio

# File lib/bud/collections.rb, line 189
def inspected
  self.pro{|t| [t.inspect]}
end
invalidate_at_tick() click to toggle source
# File lib/bud/collections.rb, line 291
def invalidate_at_tick
  true # being conservative here as a default.
end
keys() click to toggle source

project the collection to its key attributes

# File lib/bud/collections.rb, line 177
def keys
  self.pro{|t| get_key_vals(t)}
end
length() click to toggle source
# File lib/bud/collections.rb, line 376
def length
  @storage.length + @delta.length
end
non_temporal_predecessors() click to toggle source
# File lib/bud/collections.rb, line 296
def non_temporal_predecessors
  @wired_by.select {|e| e.outputs.include? self}
end
notin(collection, *preds, &blk) click to toggle source
# File lib/bud/collections.rb, line 795
def notin(collection, *preds, &blk)
  elem1 = to_push_elem
  elem2 = collection.to_push_elem
  return elem1.notin(elem2, *preds, &blk)
end
null_tuple() click to toggle source

generate a tuple with the schema of this collection and nil values in each attribute

# File lib/bud/collections.rb, line 171
def null_tuple
  @struct.new
end
positive_predecessors() click to toggle source
# File lib/bud/collections.rb, line 301
def positive_predecessors
  @wired_by.select {|e| e.outputs.include?(self) || e.pendings.include?(self)}
end
prep_aggpairs(aggpairs) click to toggle source
# File lib/bud/collections.rb, line 778
def prep_aggpairs(aggpairs)
  aggpairs.map do |ap|
    agg, *rest = ap
    if rest.empty?
      [agg]
    else
      [agg] + rest.map {|c| canonicalize_col(c)}
    end
  end
end
pro(the_name=tabname, the_schema=schema, &blk) click to toggle source

projection

# File lib/bud/collections.rb, line 195
def pro(the_name=tabname, the_schema=schema, &blk)
  if @bud_instance.wiring?
    pusher = to_push_elem(the_name, the_schema)
    # If there is no code block evaluate, use the scanner directly
    pusher = pusher.pro(&blk) unless blk.nil?
    pusher
  else
    rv = []
    self.each do |t|
      t = blk.call(t)
      rv << t unless t.nil?
    end
    rv
  end
end
qualified_tabname() click to toggle source
# File lib/bud/collections.rb, line 71
def qualified_tabname
  @qualified_tabname ||= @bud_instance.toplevel?  ? tabname : "#{@bud_instance.qualified_name}.#{tabname}".to_sym
end
reduce(initial, &blk) click to toggle source
# File lib/bud/collections.rb, line 805
def reduce(initial, &blk)
  return to_push_elem.reduce(initial, &blk)
end
register_coll_expr(expr) click to toggle source
# File lib/bud/collections.rb, line 592
def register_coll_expr(expr)
  coll_name = "expr_#{expr.object_id}"
  cols = (1..@cols.length).map{|i| "c#{i}".to_sym} unless @cols.nil?
  @bud_instance.coll_expr(coll_name.to_sym, expr, cols)
  @bud_instance.send(coll_name)
end
rename(the_name, the_schema=nil, &blk) click to toggle source
# File lib/bud/collections.rb, line 258
def rename(the_name, the_schema=nil, &blk)
  raise Bud::Error unless @bud_instance.wiring?
  # a scratch with this name should have been defined during rewriting
  unless @bud_instance.respond_to? the_name
    raise Bud::Error, "rename failed to define a scratch named #{the_name}"
  end
  pro(the_name, the_schema, &blk)
end
schema() click to toggle source

produces the schema in a format that is useful as the schema specification for another table

# File lib/bud/collections.rb, line 109
def schema
  return nil if @cols.nil?
  return key_cols if val_cols.empty?
  return { key_cols => val_cols }
end
sort(&blk) click to toggle source
# File lib/bud/collections.rb, line 249
def sort(&blk)
  if @bud_instance.wiring?
    pusher = self.pro
    pusher.sort("sort#{object_id}".to_sym, @bud_instance, @cols, &blk)
  else
    @storage.values.sort(&blk)
  end
end
tick() click to toggle source
# File lib/bud/collections.rb, line 651
def tick
  raise Bud::Error, "tick must be overriden in #{self.class}"
end
tick_metrics() click to toggle source
# File lib/bud/collections.rb, line 306
def tick_metrics
  strat_num = bud_instance.this_stratum
  addr = bud_instance.ip_port unless bud_instance.port.nil?
  key = { :addr=>addr, :tabname=>qualified_tabname,
          :strat_num=>strat_num}

  bud_instance.metrics[:collections] ||= {}
  bud_instance.metrics[:collections][key] ||= 0
  bud_instance.metrics[:collections][key] += 1
end
to_push_elem(the_name=tabname, the_schema=schema) click to toggle source
# File lib/bud/collections.rb, line 713
def to_push_elem(the_name=tabname, the_schema=schema)
  # if no push source yet, set one up
  toplevel = @bud_instance.toplevel
  this_stratum = toplevel.this_stratum
  oid = self.object_id
  unless toplevel.scanners[this_stratum][[oid, the_name]]
    scanner = Bud::ScannerElement.new(the_name, @bud_instance,
                                      self, the_schema)
    toplevel.scanners[this_stratum][[oid, the_name]] = scanner
    toplevel.push_sources[this_stratum][[oid, the_name]] = scanner
    @scanner_cnt += 1
  end
  return toplevel.scanners[this_stratum][[oid, the_name]]
end
values() click to toggle source

project the collection to its non-key attributes

# File lib/bud/collections.rb, line 183
def values
  self.pro{|t| (self.key_cols.length..self.cols.length-1).map{|i| t[i]}}
end

Protected Instance Methods

add_merge_target() click to toggle source
# File lib/bud/collections.rb, line 550
def add_merge_target
  toplevel = @bud_instance.toplevel
  if toplevel.done_bootstrap
    toplevel.merge_targets[toplevel.this_stratum] << self
  end
end

Private Instance Methods

agg_in() click to toggle source

we only do grouping during first iteration of stratum. group and argagg should never deal with deltas. This assumes that stratification is done right, and it will be sensitive to bugs in the stratification!

# File lib/bud/collections.rb, line 734
def agg_in
  if not respond_to?(:bud_instance) or bud_instance.nil?
    return self
  else
    return []
  end
end
check_enumerable(o) click to toggle source
# File lib/bud/collections.rb, line 510
def check_enumerable(o)
  unless o.nil? or o.class < Enumerable or o.class <= Proc
    raise Bud::TypeError, "collection #{qualified_tabname} expected Enumerable value, not #{o.inspect} (class = #{o.class})"
  end
end
deduce_schema(o) click to toggle source

Copy over the schema from o if available

# File lib/bud/collections.rb, line 534
def deduce_schema(o)
  if @cols.nil? and o.class <= Bud::BudCollection and not o.cols.nil?
    # must have been initialized with defer_schema==true.  take schema from rhs
    init_schema(o.cols)
  end
  # if nothing available, leave @cols unchanged
end
establish_schema(o) click to toggle source

Assign self a schema, by hook or by crook. If o is schemaless and empty, will leave @cols as is.

# File lib/bud/collections.rb, line 519
def establish_schema(o)
  # use o's schema if available
  deduce_schema(o)
  # else use arity of first non-nil tuple of o
  if @cols.nil?
    o.each do |t|
      next if t.nil?
      fit_schema(t.size)
      break
    end
  end
end
fit_schema(arity) click to toggle source

manufacture schema of the form [:c0, :c1, …] with width = arity

# File lib/bud/collections.rb, line 544
def fit_schema(arity)
  # rhs is schemaless.  create schema from first tuple merged
  init_schema((0..arity-1).map{|indx| ("c"+indx.to_s).to_sym})
end
get_key_vals(t) click to toggle source
# File lib/bud/collections.rb, line 435
def get_key_vals(t)
  t.values_at(*@key_colnums)
end
init_buffers() click to toggle source
# File lib/bud/collections.rb, line 38
def init_buffers
  init_storage
  init_pending
  init_deltas
end
init_deltas() click to toggle source
# File lib/bud/collections.rb, line 339
def init_deltas
  @delta = {}
  @new_delta = {}
  @tick_delta = []
end
init_pending() click to toggle source
# File lib/bud/collections.rb, line 334
def init_pending
  @pending = {}
end
init_storage() click to toggle source
# File lib/bud/collections.rb, line 329
def init_storage
  @storage = {}
end
is_lattice_val(v) click to toggle source
# File lib/bud/collections.rb, line 403
def is_lattice_val(v)
  v.kind_of? Bud::Lattice
end
merge_to_buf(buf, key, tup, old) click to toggle source

Merge “tup” with key values “key” into “buf”. “old” is an existing tuple with the same key columns as “tup” (if any such tuple exists). If “old” exists and “tup” is not a duplicate, check whether the two tuples disagree on a non-key, non-lattice value; if so, raise a PK error. Otherwise, construct and return a merged tuple by using lattice merge functions.

# File lib/bud/collections.rb, line 462
def merge_to_buf(buf, key, tup, old)
  if old.nil?               # no matching tuple found
    buf[key] = tup
    return
  end
  return if tup == old      # ignore duplicates

  # Check for PK violation
  @val_colnums.each do |i|
    old_v = old[i]
    new_v = tup[i]

    unless old_v == new_v || (is_lattice_val(old_v) && is_lattice_val(new_v))
      raise_pk_error(tup, old)
    end
  end

  # Construct new tuple version. We discard the newly-constructed tuple if
  # merging every lattice field doesn't yield a new value.
  new_t = null_tuple
  saw_change = false
  @val_colnums.each do |i|
    if old[i] == tup[i]
      new_t[i] = old[i]
    else
      new_t[i] = old[i].merge(tup[i])
      saw_change = true if new_t[i].reveal != old[i].reveal
    end
  end

  if saw_change
    @key_colnums.each {|k| new_t[k] = old[k]}
    buf[key] = new_t
  end
end
name_reserved?(colname) click to toggle source
# File lib/bud/collections.rb, line 154
def name_reserved?(colname)
  reserved = eval "defined?(#{colname})"
  return false if reserved.nil?
  if reserved == "method" and (method(colname).arity == 0 or method(colname).arity == -1)
    begin
      ret = eval("#{colname}")
      if ret.kind_of? Array and ret.size == 3 and ret[0] == tabname
        return false # schema redefinition (see tupaccess above), so name is not considered reserved
      end
    rescue # in case calling method throws an error
    end
  end
  return true
end
prep_tuple(o) click to toggle source
# File lib/bud/collections.rb, line 408
def prep_tuple(o)
  return o if o.class == @struct
  if o.kind_of? Array
    if @struct.nil?
      sch = (1 .. o.length).map{|i| "c#{i}".to_sym}
      init_schema(sch)
    end
  elsif o.kind_of? Struct
    init_schema(o.members.map{|m| m.to_sym}) if @struct.nil?
  else
    raise Bud::TypeError, "array or struct type expected in \"#{qualified_tabname}\": #{o.inspect}"
  end

  @key_colnums.each do |i|
    next if i >= o.length
    if is_lattice_val(o[i])
      raise Bud::TypeError, "lattice value cannot be a key for #{qualified_tabname}: #{o[i].inspect}"
    end
  end
  if o.length > @structlen
    raise Bud::TypeError, "too many columns for \"#{qualified_tabname}\": #{o.inspect}"
  end

  return @struct.new(*o)
end
raise_pk_error(new, old) click to toggle source
# File lib/bud/collections.rb, line 397
def raise_pk_error(new, old)
  key = get_key_vals(old)
  raise Bud::KeyConstraintError, "key conflict inserting #{new.inspect} into \"#{qualified_tabname}\": existing tuple #{old.inspect}, key = #{key.inspect}"
end
setup_accessors() click to toggle source

define methods to turn 'table.col' into a [table,col] pair e.g. to support something like

j = join link, path, {link.to => path.from}
# File lib/bud/collections.rb, line 125
def setup_accessors
  sc = @cols
  return if sc.nil?
  sc.each do |colname|
    if name_reserved? colname
      raise Bud::Error, "symbol :#{colname} reserved, cannot be used as column name for #{tabname}"
    end
  end

  # Setup schema accessors, which are class methods. Note that the same
  # table/column name might appear multiple times on the LHS of a single
  # join (e.g., (foo * bar).combos(foo.x => bar.y, foo.x => bar.z)). Because
  # the join predicates are represented as a hash, we need the two instances
  # of foo.x to be distinct values (otherwise the resulting hash will only
  # have a single key). Hence, we add a unique ID to the value returned by
  # schema accessors.
  @cols_access = Module.new do
    sc.each_with_index do |c, i|
      define_method c do
        @counter ||= 0
        @counter += 1
        [qualified_tabname, i, c, @counter]
      end
    end
  end
  self.extend @cols_access
end