class Bud::PushArgAgg
Public Class Methods
new(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk)
click to toggle source
Calls superclass method
Bud::PushGroup::new
# File lib/bud/executor/group.rb, line 92 def initialize(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk) unless aggpairs_in.length == 1 raise Bud::Error, "multiple aggpairs #{aggpairs_in.map{|a| a.class.name}} in ArgAgg; only one allowed" end super(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk) @winners = {} end
Public Instance Methods
flush()
click to toggle source
# File lib/bud/executor/group.rb, line 139 def flush # Don't emit fresh output unless a rescan is needed return unless @rescan @rescan = false @groups.each_key do |g| @winners[g].each do |t| push_out(t) end end end
insert(item, source)
click to toggle source
# File lib/bud/executor/group.rb, line 106 def insert(item, source) key = @keys.map{|k| item[k]} group_state = @groups[key] if group_state.nil? @groups[key] = @aggpairs.map do |ap| @winners[key] = [item] input_vals = item.values_at(*ap[1]) ap[0].init(*input_vals) end else @aggpairs.each_with_index do |ap, agg_ix| input_vals = item.values_at(*ap[1]) state_val, flag, *rest = ap[0].trans(group_state[agg_ix], *input_vals) group_state[agg_ix] = state_val case flag when :ignore # do nothing when :replace @winners[key] = [item] when :keep @winners[key] << item when :delete rest.each do |t| @winners[key].delete t end else raise Bud::Error, "strange result from argagg transition func: #{flag}" end end end end
invalidate_cache()
click to toggle source
Calls superclass method
Bud::PushGroup#invalidate_cache
# File lib/bud/executor/group.rb, line 101 def invalidate_cache super @winners.clear end