class Bud::PushArgAgg

Public Class Methods

new(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk) click to toggle source
Calls superclass method Bud::PushGroup::new
# File lib/bud/executor/group.rb, line 92
def initialize(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk)
  unless aggpairs_in.length == 1
    raise Bud::Error, "multiple aggpairs #{aggpairs_in.map{|a| a.class.name}} in ArgAgg; only one allowed"
  end
  super(elem_name, bud_instance, collection_name, keys_in, aggpairs_in, schema_in, &blk)
  @winners = {}
end

Public Instance Methods

flush() click to toggle source
# File lib/bud/executor/group.rb, line 139
def flush
  # Don't emit fresh output unless a rescan is needed
  return unless @rescan
  @rescan = false

  @groups.each_key do |g|
    @winners[g].each do |t|
      push_out(t)
    end
  end
end
insert(item, source) click to toggle source
# File lib/bud/executor/group.rb, line 106
def insert(item, source)
  key = @keys.map{|k| item[k]}
  group_state = @groups[key]
  if group_state.nil?
    @groups[key] = @aggpairs.map do |ap|
      @winners[key] = [item]
      input_vals = item.values_at(*ap[1])
      ap[0].init(*input_vals)
    end
  else
    @aggpairs.each_with_index do |ap, agg_ix|
      input_vals = item.values_at(*ap[1])
      state_val, flag, *rest = ap[0].trans(group_state[agg_ix], *input_vals)
      group_state[agg_ix] = state_val

      case flag
      when :ignore
        # do nothing
      when :replace
        @winners[key] = [item]
      when :keep
        @winners[key] << item
      when :delete
        rest.each do |t|
          @winners[key].delete t
        end
      else
        raise Bud::Error, "strange result from argagg transition func: #{flag}"
      end
    end
  end
end
invalidate_cache() click to toggle source
Calls superclass method Bud::PushGroup#invalidate_cache
# File lib/bud/executor/group.rb, line 101
def invalidate_cache
  super
  @winners.clear
end