class Bud::PushGroup
Public Class Methods
new(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk)
click to toggle source
Calls superclass method
Bud::PushElement::new
# File lib/bud/executor/group.rb, line 5 def initialize(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk) if keys_in.nil? @keys = [] else @keys = keys_in.map{|k| k[1]} end # An aggpair is an array: [agg class instance, array of indexes of input # agg input columns]. The second field is nil for Count. @aggpairs = aggpairs_in.map do |ap| agg, *rest = ap if rest.empty? [agg, nil] else [agg, rest.map {|r| r[1]}] end end @groups = {} # Check whether we need to eliminate duplicates from our input (we might # see duplicates because of the rescan/invalidation logic, as well as # because we don't do duplicate elimination on the output of a projection # operator). We don't need to dupelim if all the args are exemplary. @elim_dups = @aggpairs.any? {|ap| not ap[0].kind_of? ArgExemplary} @input_cache = Set.new if @elim_dups super(elem_name, bud_instance, collection_name, schema_in, &blk) end
Public Instance Methods
add_rescan_invalidate(rescan, invalidate)
click to toggle source
Calls superclass method
Bud::PushStatefulElement#add_rescan_invalidate
# File lib/bud/executor/group.rb, line 63 def add_rescan_invalidate(rescan, invalidate) # XXX: need to understand why this is necessary; it is dissimilar to the # way other stateful non-monotonic operators are handled. rescan << self super end
flush()
click to toggle source
# File lib/bud/executor/group.rb, line 76 def flush # Don't emit fresh output unless a rescan is needed return unless @rescan @rescan = false @groups.each do |key, group_state| rv = key.clone @aggpairs.each_with_index do |ap, agg_ix| rv << ap[0].final(group_state[agg_ix]) end push_out(rv) end end
insert(item, source)
click to toggle source
# File lib/bud/executor/group.rb, line 34 def insert(item, source) if @elim_dups return if @input_cache.include? item @input_cache << item end key = item.values_at(*@keys) group_state = @groups[key] if group_state.nil? @groups[key] = @aggpairs.map do |ap| if ap[1].nil? ap[0].init(item) else ap[0].init(*item.values_at(*ap[1])) end end else @aggpairs.each_with_index do |ap, agg_ix| state_val = group_state[agg_ix] if ap[1].nil? trans_rv = ap[0].trans(state_val, item) else trans_rv = ap[0].trans(state_val, *item.values_at(*ap[1])) end group_state[agg_ix] = trans_rv[0] end end end
invalidate_cache()
click to toggle source
# File lib/bud/executor/group.rb, line 70 def invalidate_cache puts "#{self.class}/#{self.tabname} invalidated" if $BUD_DEBUG @groups.clear @input_cache.clear if @elim_dups end