class Bud::PushNotIn
Consider “u <= s.notin(t, s.a => t.b)”. notin is a non-monotonic operator, where u depends positively on s, but negatively on t. Stratification ensures that t is fully computed in a lower stratum, which means that we can expect multiple iterators on s's side only. If t's scanner were to push its elements down first, every insert of s merely needs to be cross checked with the cached elements of 't', and pushed down to the next element if s notin
-
However, if s's scanner were to fire first, we have to wait until the
first flush, at which point we are sure to have seen all the t-side tuples in this tick.
Public Instance Methods
do_insert(item, offset)
click to toggle source
# File lib/bud/executor/join.rb, line 578 def do_insert(item, offset) key = get_key(item, offset) (@hash_tables[offset][key] ||= Set.new).add item if @rhs_rcvd and offset == 0 rhs_values = @hash_tables[1][key] process_match(item, rhs_values) end end
find_col(colspec, rel)
click to toggle source
# File lib/bud/executor/join.rb, line 543 def find_col(colspec, rel) case colspec when Symbol unless rel.respond_to? colspec raise Bud::Error, "attribute :#{colspec} not found in #{rel.qualified_tabname}" end col_desc = rel.send(colspec) when Array col_desc = colspec else raise Bud::Error, "symbol or column spec expected. Got #{colspec}" end col_desc[1] # col_desc is of the form [tabname, colnum, colname, seqno] end
flush()
click to toggle source
# File lib/bud/executor/join.rb, line 587 def flush # When flush is called the first time, both lhs and rhs scanners have been # invoked, and because of stratification we know that the rhs is not # growing any more, until the next tick. unless @rhs_rcvd @rhs_rcvd = true rhs_hash = @hash_tables[1] @hash_tables[0].each do |key,values| rhs_values = rhs_hash[key] values.each {|item| process_match(item, rhs_values)} end end end
get_key(item, offset)
click to toggle source
# File lib/bud/executor/join.rb, line 558 def get_key(item, offset) keycols = (offset == 0 ? @lhs_keycols : @rhs_keycols) keycols.nil? ? [] : item.values_at(*keycols) end
insert(item, source)
click to toggle source
# File lib/bud/executor/join.rb, line 568 def insert(item, source) if source == @lhs && source == @rhs # Self join do_insert(item, 0) do_insert(item, 1) else offset = source == @lhs ? 0 : 1 do_insert(item, offset) end end
invalidate_cache()
click to toggle source
# File lib/bud/executor/join.rb, line 616 def invalidate_cache raise Bud::Error if @rhs_rcvd # sanity check; should already be reset if @lhs.rescan puts "#{tabname} rel:#{@lhs.qualified_tabname} invalidated" if $BUD_DEBUG @hash_tables[0] = {} end if @rhs.rescan puts "#{tabname} rel:#{@rhs.qualified_tabname} invalidated" if $BUD_DEBUG @hash_tables[1] = {} end end
process_match(lhs_item, rhs_values)
click to toggle source
# File lib/bud/executor/join.rb, line 601 def process_match(lhs_item, rhs_values) if rhs_values.nil? # no corresponding rhs. Include in output exclude = false elsif not @blk.nil? # for any lhs * rhs pair, if block returns true, do not push lhs. lhs is pushed # only if there is no match (anti-join) exclude = rhs_values.any?{|rhs_item| @blk.call(lhs_item, rhs_item)} else exclude = true end push_out(lhs_item, false) unless exclude end
rescan_at_tick()
click to toggle source
# File lib/bud/executor/join.rb, line 564 def rescan_at_tick true end
setup_preds(preds)
click to toggle source
# File lib/bud/executor/join.rb, line 528 def setup_preds(preds) # This is simpler than PushSHJoin's setup_preds, because notin is a binary # operator where both lhs and rhs are collections. preds is an array of # hash_pairs. For now assume that the attributes are in the same order as # the tables. @lhs_keycols, @rhs_keycols = preds.reduce([[], []]) do |memo, item| # each item is a hash item.each_pair do |l, r| memo[0] << find_col(l, @lhs) memo[1] << find_col(r, @rhs) end memo end end
stratum_end()
click to toggle source
# File lib/bud/executor/join.rb, line 629 def stratum_end @rhs_rcvd = false end