class Bud::BudCollection
Attributes
Private Class Methods
The user-specified schema might come in two forms: a hash of Array
=> Array
(key_cols => remaining columns), or simply an Array
of columns (if no key_cols were specified). Return a pair: [list of (all) columns, list of key columns]
# File lib/bud/collections.rb, line 84 def self.parse_schema(given_schema) if given_schema.respond_to? :keys raise Bud::Error, "invalid schema for #{tabname}" if given_schema.length != 1 key_cols = given_schema.keys.first val_cols = given_schema.values.first else key_cols = given_schema val_cols = [] end cols = key_cols + val_cols cols.each do |c| if c.class != Symbol raise Bud::Error, "invalid column name \"#{c}\", type \"#{c.class}\"" end end if cols.uniq.length < cols.length raise Bud::Error, "schema #{given_schema.inspect} contains duplicate names" end return [cols, key_cols] end
Public Instance Methods
form a collection containing all pairs of items in self
and items in collection
# File lib/bud/collections.rb, line 774 def *(collection) return to_push_elem.join(collection) end
instantaneously place an individual item from rhs into collection on lhs
# File lib/bud/collections.rb, line 505 def <<(item) insert(item) end
instantaneously merge items from collection o
into buf
# File lib/bud/collections.rb, line 601 def <=(collection) unless bud_instance.toplevel.inside_tick raise Bud::CompileError, "illegal use of <= outside of bloom block, use <+ instead" end merge(collection) end
return item with key k
# File lib/bud/collections.rb, line 359 def [](k) # assumes that key is in storage or delta, but not both # is this enforced in do_insert? check_enumerable(k) t = @storage[k] return t.nil? ? @delta[k] : t end
# File lib/bud/collections.rb, line 681 def add_rescan_invalidate(rescan, invalidate) # No change. Most collections don't need to rescan on every tick (only do # so on negate). Also, there's no cache to invalidate by default. # Scratches and PushElements override this method. end
a generalization of argmin/argmax to arbitrary exemplary aggregates. for each distinct value of the grouping key columns, return the items in that group that have the value of the exemplary aggregate aggname
# File lib/bud/collections.rb, line 746 def argagg(aggname, gbkey_cols, collection, &blk) elem = to_push_elem gbkey_cols = gbkey_cols.map{|k| canonicalize_col(k)} unless gbkey_cols.nil? retval = elem.argagg(aggname, gbkey_cols, canonicalize_col(collection), &blk) # PushElement inherits the schema accessors from this Collection retval.extend @cols_access retval end
for each distinct value of the grouping key columns, return the items in that group that have the maximum value of the attribute col
. Note that multiple tuples might be returned.
# File lib/bud/collections.rb, line 767 def argmax(gbkey_cols, col, &blk) argagg(:max, gbkey_cols, col, &blk) end
for each distinct value of the grouping key columns, return the items in that group that have the minimum value of the attribute col
. Note that multiple tuples might be returned.
# File lib/bud/collections.rb, line 759 def argmin(gbkey_cols, col, &blk) argagg(:min, gbkey_cols, col, &blk) end
# File lib/bud/collections.rb, line 687 def bootstrap unless @pending.empty? @delta = @pending @pending = {} end end
# File lib/bud/collections.rb, line 801 def canonicalize_col(col) col.class <= Symbol ? self.send(col) : col end
# File lib/bud/collections.rb, line 440 def do_insert(t, store) if $BUD_DEBUG storetype = case store.object_id when @storage.object_id; "storage" when @pending.object_id; "pending" when @delta.object_id; "delta" when @new_delta.object_id; "new_delta" end puts "#{qualified_tabname}.#{storetype} ==> #{t.inspect}" end return if t.nil? # silently ignore nils resulting from map predicates failing t = prep_tuple(t) key = get_key_vals(t) merge_to_buf(store, key, t, store[key]) end
# File lib/bud/collections.rb, line 281 def each_delta(&block) each_from([@delta], &block) end
# File lib/bud/collections.rb, line 276 def each_raw(&block) each_from([@storage], &block) end
# File lib/bud/collections.rb, line 286 def each_tick_delta(&block) @tick_delta.each(&block) end
XXX: Although we support each_with_index
over Bud
collections, using it is probably not a great idea: the index assigned to a given collection member is not defined by the language semantics.
# File lib/bud/collections.rb, line 214 def each_with_index(the_name=tabname, the_schema=schema, &blk) if @bud_instance.wiring? pusher = to_push_elem(the_name, the_schema) pusher.each_with_index(&blk) else super(&blk) end end
# File lib/bud/collections.rb, line 380 def empty? length == 0 end
checks for an item for which block
produces a match
# File lib/bud/collections.rb, line 386 def exists?(&block) if length == 0 return false elsif not block_given? return true else return ((detect{|t| yield t}).nil?) ? false : true end end
ruby 1.9 defines flat_map
to return “a new array with the concatenated results of running block once for every element”. So we wire the input to a pro(&blk), and wire the output of that pro to a group that does accum.
# File lib/bud/collections.rb, line 228 def flat_map(&blk) if @bud_instance.wiring? pusher = self.pro(&blk) toplevel = @bud_instance.toplevel elem = Bud::PushElement.new(tabname, toplevel.this_rule_context, tabname) pusher.wire_to(elem) f = Proc.new do |t| t.each do |i| elem.push_out(i, false) end nil end elem.set_block(&f) toplevel.push_elems[[self.object_id, :flatten]] = elem elem else @storage.flat_map(&blk) end end
# File lib/bud/collections.rb, line 621 def flush ; end
# File lib/bud/collections.rb, line 695 def flush_deltas if $BUD_DEBUG puts "#{qualified_tabname}.flush delta --> storage" unless @delta.empty? puts "#{qualified_tabname}.flush new_delta --> storage" unless @new_delta.empty? end unless @delta.empty? @storage.merge!(@delta) @tick_delta.concat(@delta.values) if accumulate_tick_deltas @delta.clear end unless @new_delta.empty? @storage.merge!(@new_delta) @new_delta.clear end # @tick_delta kept around for higher strata. end
# File lib/bud/collections.rb, line 789 def group(key_cols, *aggpairs, &blk) key_cols = key_cols.map{|k| canonicalize_col(k)} unless key_cols.nil? aggpairs = prep_aggpairs(aggpairs) return to_push_elem.group(key_cols, *aggpairs, &blk) end
checks for key k
in the key columns
# File lib/bud/collections.rb, line 351 def has_key?(k) check_enumerable(k) return false if k.nil? or self[k].nil? return true end
checks for item
in the collection
# File lib/bud/collections.rb, line 369 def include?(item) return true if key_cols.nil? return false if item.nil? key = get_key_vals(item) return (item == self[key]) end
# File lib/bud/collections.rb, line 45 def init_schema(given_schema) given_schema ||= {[:key]=>[:val]} @given_schema = given_schema @cols, @key_cols = BudCollection.parse_schema(given_schema) # Check that no location specifiers appear in the schema. In the case of # channels, the location specifier has already been stripped from the # user-specified schema. @cols.each do |s| if s.to_s.start_with? "@" raise Bud::CompileError, "illegal use of location specifier (@) in column #{s} of non-channel collection #{tabname}" end end @key_colnums = @key_cols.map {|k| @cols.index(k)} @val_colnums = val_cols.map {|k| @cols.index(k)} if @cols.empty? @cols = nil else @struct = Bud::TupleStruct.new_struct(@cols) @structlen = @struct.members.length end setup_accessors end
# File lib/bud/collections.rb, line 75 def inspect "#{self.class}:#{self.object_id.to_s(16)} [#{qualified_tabname}]" end
map each item in the collection into a string, suitable for placement in stdio
# File lib/bud/collections.rb, line 189 def inspected self.pro{|t| [t.inspect]} end
# File lib/bud/collections.rb, line 291 def invalidate_at_tick true # being conservative here as a default. end
project the collection to its key attributes
# File lib/bud/collections.rb, line 177 def keys self.pro{|t| get_key_vals(t)} end
# File lib/bud/collections.rb, line 376 def length @storage.length + @delta.length end
# File lib/bud/collections.rb, line 296 def non_temporal_predecessors @wired_by.select {|e| e.outputs.include? self} end
# File lib/bud/collections.rb, line 795 def notin(collection, *preds, &blk) elem1 = to_push_elem elem2 = collection.to_push_elem return elem1.notin(elem2, *preds, &blk) end
generate a tuple with the schema of this collection and nil values in each attribute
# File lib/bud/collections.rb, line 171 def null_tuple @struct.new end
# File lib/bud/collections.rb, line 301 def positive_predecessors @wired_by.select {|e| e.outputs.include?(self) || e.pendings.include?(self)} end
# File lib/bud/collections.rb, line 778 def prep_aggpairs(aggpairs) aggpairs.map do |ap| agg, *rest = ap if rest.empty? [agg] else [agg] + rest.map {|c| canonicalize_col(c)} end end end
projection
# File lib/bud/collections.rb, line 195 def pro(the_name=tabname, the_schema=schema, &blk) if @bud_instance.wiring? pusher = to_push_elem(the_name, the_schema) # If there is no code block evaluate, use the scanner directly pusher = pusher.pro(&blk) unless blk.nil? pusher else rv = [] self.each do |t| t = blk.call(t) rv << t unless t.nil? end rv end end
# File lib/bud/collections.rb, line 71 def qualified_tabname @qualified_tabname ||= @bud_instance.toplevel? ? tabname : "#{@bud_instance.qualified_name}.#{tabname}".to_sym end
# File lib/bud/collections.rb, line 805 def reduce(initial, &blk) return to_push_elem.reduce(initial, &blk) end
# File lib/bud/collections.rb, line 592 def register_coll_expr(expr) coll_name = "expr_#{expr.object_id}" cols = (1..@cols.length).map{|i| "c#{i}".to_sym} unless @cols.nil? @bud_instance.coll_expr(coll_name.to_sym, expr, cols) @bud_instance.send(coll_name) end
# File lib/bud/collections.rb, line 258 def rename(the_name, the_schema=nil, &blk) raise Bud::Error unless @bud_instance.wiring? # a scratch with this name should have been defined during rewriting unless @bud_instance.respond_to? the_name raise Bud::Error, "rename failed to define a scratch named #{the_name}" end pro(the_name, the_schema, &blk) end
produces the schema in a format that is useful as the schema specification for another table
# File lib/bud/collections.rb, line 109 def schema return nil if @cols.nil? return key_cols if val_cols.empty? return { key_cols => val_cols } end
# File lib/bud/collections.rb, line 249 def sort(&blk) if @bud_instance.wiring? pusher = self.pro pusher.sort("sort#{object_id}".to_sym, @bud_instance, @cols, &blk) else @storage.values.sort(&blk) end end
# File lib/bud/collections.rb, line 651 def tick raise Bud::Error, "tick must be overriden in #{self.class}" end
# File lib/bud/collections.rb, line 306 def tick_metrics strat_num = bud_instance.this_stratum addr = bud_instance.ip_port unless bud_instance.port.nil? key = { :addr=>addr, :tabname=>qualified_tabname, :strat_num=>strat_num} bud_instance.metrics[:collections] ||= {} bud_instance.metrics[:collections][key] ||= 0 bud_instance.metrics[:collections][key] += 1 end
# File lib/bud/collections.rb, line 713 def to_push_elem(the_name=tabname, the_schema=schema) # if no push source yet, set one up toplevel = @bud_instance.toplevel this_stratum = toplevel.this_stratum oid = self.object_id unless toplevel.scanners[this_stratum][[oid, the_name]] scanner = Bud::ScannerElement.new(the_name, @bud_instance, self, the_schema) toplevel.scanners[this_stratum][[oid, the_name]] = scanner toplevel.push_sources[this_stratum][[oid, the_name]] = scanner @scanner_cnt += 1 end return toplevel.scanners[this_stratum][[oid, the_name]] end
project the collection to its non-key attributes
# File lib/bud/collections.rb, line 183 def values self.pro{|t| (self.key_cols.length..self.cols.length-1).map{|i| t[i]}} end
Protected Instance Methods
# File lib/bud/collections.rb, line 550 def add_merge_target toplevel = @bud_instance.toplevel if toplevel.done_bootstrap toplevel.merge_targets[toplevel.this_stratum] << self end end
Private Instance Methods
we only do grouping during first iteration of stratum. group and argagg should never deal with deltas. This assumes that stratification is done right, and it will be sensitive to bugs in the stratification!
# File lib/bud/collections.rb, line 734 def agg_in if not respond_to?(:bud_instance) or bud_instance.nil? return self else return [] end end
# File lib/bud/collections.rb, line 510 def check_enumerable(o) unless o.nil? or o.class < Enumerable or o.class <= Proc raise Bud::TypeError, "collection #{qualified_tabname} expected Enumerable value, not #{o.inspect} (class = #{o.class})" end end
Copy over the schema from o
if available
# File lib/bud/collections.rb, line 534 def deduce_schema(o) if @cols.nil? and o.class <= Bud::BudCollection and not o.cols.nil? # must have been initialized with defer_schema==true. take schema from rhs init_schema(o.cols) end # if nothing available, leave @cols unchanged end
Assign self a schema, by hook or by crook. If o
is schemaless and empty, will leave @cols as is.
# File lib/bud/collections.rb, line 519 def establish_schema(o) # use o's schema if available deduce_schema(o) # else use arity of first non-nil tuple of o if @cols.nil? o.each do |t| next if t.nil? fit_schema(t.size) break end end end
manufacture schema of the form [:c0, :c1, …] with width = arity
# File lib/bud/collections.rb, line 544 def fit_schema(arity) # rhs is schemaless. create schema from first tuple merged init_schema((0..arity-1).map{|indx| ("c"+indx.to_s).to_sym}) end
# File lib/bud/collections.rb, line 435 def get_key_vals(t) t.values_at(*@key_colnums) end
# File lib/bud/collections.rb, line 38 def init_buffers init_storage init_pending init_deltas end
# File lib/bud/collections.rb, line 339 def init_deltas @delta = {} @new_delta = {} @tick_delta = [] end
# File lib/bud/collections.rb, line 334 def init_pending @pending = {} end
# File lib/bud/collections.rb, line 329 def init_storage @storage = {} end
# File lib/bud/collections.rb, line 403 def is_lattice_val(v) v.kind_of? Bud::Lattice end
Merge “tup” with key values “key” into “buf”. “old” is an existing tuple with the same key columns as “tup” (if any such tuple exists). If “old” exists and “tup” is not a duplicate, check whether the two tuples disagree on a non-key, non-lattice value; if so, raise a PK error. Otherwise, construct and return a merged tuple by using lattice merge functions.
# File lib/bud/collections.rb, line 462 def merge_to_buf(buf, key, tup, old) if old.nil? # no matching tuple found buf[key] = tup return end return if tup == old # ignore duplicates # Check for PK violation @val_colnums.each do |i| old_v = old[i] new_v = tup[i] unless old_v == new_v || (is_lattice_val(old_v) && is_lattice_val(new_v)) raise_pk_error(tup, old) end end # Construct new tuple version. We discard the newly-constructed tuple if # merging every lattice field doesn't yield a new value. new_t = null_tuple saw_change = false @val_colnums.each do |i| if old[i] == tup[i] new_t[i] = old[i] else new_t[i] = old[i].merge(tup[i]) saw_change = true if new_t[i].reveal != old[i].reveal end end if saw_change @key_colnums.each {|k| new_t[k] = old[k]} buf[key] = new_t end end
# File lib/bud/collections.rb, line 154 def name_reserved?(colname) reserved = eval "defined?(#{colname})" return false if reserved.nil? if reserved == "method" and (method(colname).arity == 0 or method(colname).arity == -1) begin ret = eval("#{colname}") if ret.kind_of? Array and ret.size == 3 and ret[0] == tabname return false # schema redefinition (see tupaccess above), so name is not considered reserved end rescue # in case calling method throws an error end end return true end
# File lib/bud/collections.rb, line 408 def prep_tuple(o) return o if o.class == @struct if o.kind_of? Array if @struct.nil? sch = (1 .. o.length).map{|i| "c#{i}".to_sym} init_schema(sch) end elsif o.kind_of? Struct init_schema(o.members.map{|m| m.to_sym}) if @struct.nil? else raise Bud::TypeError, "array or struct type expected in \"#{qualified_tabname}\": #{o.inspect}" end @key_colnums.each do |i| next if i >= o.length if is_lattice_val(o[i]) raise Bud::TypeError, "lattice value cannot be a key for #{qualified_tabname}: #{o[i].inspect}" end end if o.length > @structlen raise Bud::TypeError, "too many columns for \"#{qualified_tabname}\": #{o.inspect}" end return @struct.new(*o) end
# File lib/bud/collections.rb, line 397 def raise_pk_error(new, old) key = get_key_vals(old) raise Bud::KeyConstraintError, "key conflict inserting #{new.inspect} into \"#{qualified_tabname}\": existing tuple #{old.inspect}, key = #{key.inspect}" end
define methods to turn 'table.col' into a [table,col] pair e.g. to support something like
j = join link, path, {link.to => path.from}
# File lib/bud/collections.rb, line 125 def setup_accessors sc = @cols return if sc.nil? sc.each do |colname| if name_reserved? colname raise Bud::Error, "symbol :#{colname} reserved, cannot be used as column name for #{tabname}" end end # Setup schema accessors, which are class methods. Note that the same # table/column name might appear multiple times on the LHS of a single # join (e.g., (foo * bar).combos(foo.x => bar.y, foo.x => bar.z)). Because # the join predicates are represented as a hash, we need the two instances # of foo.x to be distinct values (otherwise the resulting hash will only # have a single key). Hence, we add a unique ID to the value returned by # schema accessors. @cols_access = Module.new do sc.each_with_index do |c, i| define_method c do @counter ||= 0 @counter += 1 [qualified_tabname, i, c, @counter] end end end self.extend @cols_access end