class Bud::PushNotIn

Consider “u <= s.notin(t, s.a => t.b)”. notin is a non-monotonic operator, where u depends positively on s, but negatively on t. Stratification ensures that t is fully computed in a lower stratum, which means that we can expect multiple iterators on s's side only. If t's scanner were to push its elements down first, every insert of s merely needs to be cross checked with the cached elements of 't', and pushed down to the next element if s notin

  1. However, if s's scanner were to fire first, we have to wait until the

first flush, at which point we are sure to have seen all the t-side tuples in this tick.

Public Instance Methods

do_insert(item, offset) click to toggle source
# File lib/bud/executor/join.rb, line 578
def do_insert(item, offset)
  key = get_key(item, offset)
  (@hash_tables[offset][key] ||= Set.new).add item
  if @rhs_rcvd and offset == 0
    rhs_values = @hash_tables[1][key]
    process_match(item, rhs_values)
  end
end
find_col(colspec, rel) click to toggle source
# File lib/bud/executor/join.rb, line 543
def find_col(colspec, rel)
  case colspec
  when Symbol
    unless rel.respond_to? colspec
      raise Bud::Error, "attribute :#{colspec} not found in #{rel.qualified_tabname}"
    end
    col_desc = rel.send(colspec)
  when Array
    col_desc = colspec
  else
    raise Bud::Error, "symbol or column spec expected. Got #{colspec}"
  end
  col_desc[1] # col_desc is of the form [tabname, colnum, colname, seqno]
end
flush() click to toggle source
# File lib/bud/executor/join.rb, line 587
def flush
  # When flush is called the first time, both lhs and rhs scanners have been
  # invoked, and because of stratification we know that the rhs is not
  # growing any more, until the next tick.
  unless @rhs_rcvd
    @rhs_rcvd = true
    rhs_hash = @hash_tables[1]
    @hash_tables[0].each do |key,values|
      rhs_values = rhs_hash[key]
      values.each {|item| process_match(item, rhs_values)}
    end
  end
end
get_key(item, offset) click to toggle source
# File lib/bud/executor/join.rb, line 558
def get_key(item, offset)
  keycols = (offset == 0 ? @lhs_keycols : @rhs_keycols)
  keycols.nil? ? [] : item.values_at(*keycols)
end
insert(item, source) click to toggle source
# File lib/bud/executor/join.rb, line 568
def insert(item, source)
  if source == @lhs && source == @rhs       # Self join
    do_insert(item, 0)
    do_insert(item, 1)
  else
    offset = source == @lhs ? 0 : 1
    do_insert(item, offset)
  end
end
invalidate_cache() click to toggle source
# File lib/bud/executor/join.rb, line 616
def invalidate_cache
  raise Bud::Error if @rhs_rcvd     # sanity check; should already be reset

  if @lhs.rescan
    puts "#{tabname} rel:#{@lhs.qualified_tabname} invalidated" if $BUD_DEBUG
    @hash_tables[0] = {}
  end
  if @rhs.rescan
    puts "#{tabname} rel:#{@rhs.qualified_tabname} invalidated" if $BUD_DEBUG
    @hash_tables[1] = {}
  end
end
process_match(lhs_item, rhs_values) click to toggle source
# File lib/bud/executor/join.rb, line 601
def process_match(lhs_item, rhs_values)
  if rhs_values.nil?
    # no corresponding rhs. Include in output
    exclude = false
  elsif not @blk.nil?
    # for any lhs * rhs pair, if block returns true, do not push lhs. lhs is pushed
    # only if there is no match (anti-join)
    exclude = rhs_values.any?{|rhs_item| @blk.call(lhs_item, rhs_item)}
  else
    exclude = true
  end

  push_out(lhs_item, false) unless exclude
end
rescan_at_tick() click to toggle source
# File lib/bud/executor/join.rb, line 564
def rescan_at_tick
  true
end
setup_preds(preds) click to toggle source
# File lib/bud/executor/join.rb, line 528
def setup_preds(preds)
  # This is simpler than PushSHJoin's setup_preds, because notin is a binary
  # operator where both lhs and rhs are collections. preds is an array of
  # hash_pairs. For now assume that the attributes are in the same order as
  # the tables.
  @lhs_keycols, @rhs_keycols = preds.reduce([[], []]) do |memo, item|
    # each item is a hash
    item.each_pair do |l, r|
      memo[0] << find_col(l, @lhs)
      memo[1] << find_col(r, @rhs)
    end
    memo
  end
end
stratum_end() click to toggle source
# File lib/bud/executor/join.rb, line 629
def stratum_end
  @rhs_rcvd = false
end