module Bud

The root Bud module. To cause an instance of Bud to begin executing, there are three main options:

  1. Synchronously. To do this, instantiate your program and then call tick() one or more times; each call evaluates a single Bud timestep. In this mode, any network messages or timer events that occur will be buffered until the next call to tick(). This is mostly intended for “one-shot” programs that compute a single result and then terminate, or for interactively “single-stepping” through the execution of an event-driven system.

  2. In a separate thread in the foreground. To do this, instantiate your program and then call run_fg(). The Bud interpreter will then run, handling network events and evaluating new timesteps as appropriate. The run_fg() method will not return unless an error occurs.

  3. In a separate thread in the background. To do this, instantiate your program and then call run_bg(). The Bud interpreter will run asynchronously. To interact with Bud (e.g., insert additional data or inspect the state of a Bud collection), use the sync_do and async_do methods.

Most programs should use method #3. Note that in all three cases, the stop() method should be used to shutdown a Bud instance and release any resources it is using.

Constants

HAVE_ZOOKEEPER
VERSION

Attributes

app_tables[R]
budtime[R]
builtin_tables[R]
channels[R]
dbm_tables[R]
done_bootstrap[R]
dsock[R]
inbound[R]
inside_tick[R]
lattices[R]
merge_targets[R]
meta_parser[R]
metrics[RW]
options[R]
periodics[RW]
push_elems[R]
push_joins[R]
push_sources[R]
qualified_name[RW]
rtracer[R]
running_async[R]
scanners[R]
stratified_rules[RW]
tables[R]
this_rule_context[R]
this_stratum[R]
viz[R]
zk_tables[R]

Public Class Methods

new(options={}) click to toggle source

options to the Bud runtime are passed in a hash, with the following keys

  • network configuration

    • :ip IP address string for this instance

    • :port port number for this instance

    • :ext_ip IP address at which external nodes can contact this instance

    • :ext_port port number to go with :ext_ip

  • operating system interaction

    • :stdin if non-nil, reading from the stdio collection results in reading from this IO handle

    • :stdout writing to the stdio collection results in writing to this IO handle; defaults to $stdout

    • :signal_handling how to handle SIGINT and SIGTERM. If :none, these signals are ignored. Else shutdown all bud instances.

  • tracing and output

    • :quiet if true, suppress certain messages

    • :trace if true, generate budvis outputs

    • :rtrace if true, generate budplot outputs

    • :dump_rewrite if true, dump results of internal rewriting of Bloom code to a file

    • :print_wiring if true, print the wiring diagram of the program to stdout

    • :metrics if true, dumps a hash of internal performance metrics

  • controlling execution

    • :tag a name for this instance, suitable for display during tracing and visualization

    • :channel_filter a code block that can be used to filter the network messages delivered to this Bud instance. At the start of each tick, the code block is invoked for every channel with any incoming messages; the code block is passed the name of the channel and an array containing the inbound messages. It should return a two-element array containing “accepted” and “postponed” messages, respectively. Accepted messages are delivered during this tick, and postponed messages are buffered and passed to the filter in subsequent ticks. Any messages that aren't in either array are dropped.

  • storage configuration

    • :dbm_dir filesystem directory to hold DBM-backed collections

    • :dbm_truncate if true, DBM-backed collections are opened with OTRUNC

# File lib/bud.rb, line 111
def initialize(options={})
  options[:dump_rewrite] ||= ENV["BUD_DUMP_REWRITE"].to_i > 0
  options[:dump_ast]     ||= ENV["BUD_DUMP_AST"].to_i > 0
  options[:print_wiring] ||= ENV["BUD_PRINT_WIRING"].to_i > 0
  @qualified_name = ""
  @tables = {}
  @lattices = {}
  @channels = {}
  @dbm_tables = {}
  @zk_tables = {}
  @stratified_rules = []
  @push_elems = {}
  @callbacks = {}
  @callback_id = 0
  @shutdown_callbacks = {}
  @shutdown_callback_id = 0
  @post_shutdown_callbacks = []
  @timers = []
  @app_tables = []
  @inside_tick = false
  @tick_clock_time = nil
  @budtime = 0
  @inbound = {}
  @done_bootstrap = false
  @done_wiring = false
  @instance_id = ILLEGAL_INSTANCE_ID # Assigned when we start running
  @metrics = {}
  @endtime = nil
  @this_stratum = -1
  @this_rule_id = -1
  @push_sorted_elems = nil
  @running_async = false
  @bud_started = false

  # Setup options (named arguments), along with default values
  @options = options.clone
  @options[:ip] ||= "127.0.0.1"
  @ip = @options[:ip]
  @options[:port] ||= 0
  @options[:port] = @options[:port].to_i
  # NB: If using an ephemeral port (specified by port = 0), the actual port
  # number won't be known until we start EM

  load_lattice_defs
  builtin_state
  resolve_imports
  call_state_methods

  @viz = VizOnline.new(self) if @options[:trace]
  @rtracer = RTrace.new(self) if @options[:rtrace]

  do_rewrite
  if toplevel == self
    # initialize per-stratum state
    @num_strata = @stratified_rules.length
    @scanners = @num_strata.times.map{{}}
    @push_sources = @num_strata.times.map{{}}
    @push_joins = @num_strata.times.map{[]}
    @merge_targets = @num_strata.times.map{Set.new}
  end
end

Private Class Methods

do_fork() { || ... } click to toggle source

Fork a new process. This is identical to Kernel#fork, except that it also cleans up Bud and EventMachine-related state. As with Kernel#fork, the caller supplies a code block that is run in the child process; the PID of the child is returned by this method.

# File lib/bud.rb, line 1290
def self.do_fork
  Kernel.fork do
    srand
    # This is somewhat grotty: we basically clone what EM::fork_reactor does,
    # except that we don't want the user-supplied block to be invoked by the
    # reactor thread.
    if EventMachine::reactor_running?
      EventMachine::stop_event_loop
      EventMachine::release_machine
      EventMachine::instance_variable_set('@reactor_running', false)
    end

    # Shutdown all the Bud instances inherited from the parent process, but
    # don't invoke their shutdown callbacks
    Bud.shutdown_all_instances(false)
    $got_shutdown_signal = false
    $signal_handler_setup = false

    yield
  end
end
init_signal_handlers(b) click to toggle source

Signal handling. If multiple Bud instances are running inside a single process, we want a SIGINT or SIGTERM signal to cleanly shutdown all of them. Note that we don't try to do any significant work in the signal handlers themselves: we just set a flag that is checked by a periodic timer.

# File lib/bud.rb, line 1327
def self.init_signal_handlers(b)
  $signal_lock.synchronize {
    # Initialize or re-initialize signal handlers if necessary.
    unless b.options[:signal_handling] == :none || $signal_handler_setup
      EventMachine::PeriodicTimer.new(SIGNAL_CHECK_PERIOD) do
        if $got_shutdown_signal
          Bud.shutdown_all_instances
          $got_shutdown_signal = false
        end
      end

      ["INT", "TERM"].each do |signal|
        Signal.trap(signal) {
          $got_shutdown_signal = true
        }
      end
      $signal_handler_setup = true
    end

    $instance_id += 1
    $bud_instances[$instance_id] = b
    return $instance_id
  }
end
shutdown_all_instances(do_shutdown_cb=true) click to toggle source
# File lib/bud.rb, line 1352
def self.shutdown_all_instances(do_shutdown_cb=true)
  instances = nil
  $signal_lock.synchronize {
    instances = $bud_instances.clone
  }

  instances.each_value {|b| b.stop(false, do_shutdown_cb) }
end
stop_em_loop() click to toggle source

Note that this affects anyone else in the same process who happens to be using EventMachine! This is also a non-blocking call; to block until EM has completely shutdown, join on EM::reactor_thread.

# File lib/bud.rb, line 1315
def self.stop_em_loop
  EventMachine::stop_event_loop

  # If another instance of Bud is started later, we'll need to reinitialize
  # the signal handlers (since they depend on EM).
  $signal_handler_setup = false
end

Public Instance Methods

accum(x) click to toggle source

aggregate method to be used in Bud::BudCollection.group. accumulates all x inputs into a set.

# File lib/bud/aggs.rb, line 207
def accum(x)
  [Accum.new, x]
end
accum_pair(x, y) click to toggle source

aggregate method to be used in Bud::BudCollection.group. accumulates x, y inputs into a set of pairs (two element arrays).

# File lib/bud/aggs.rb, line 223
def accum_pair(x, y)
  [AccumPair.new, x, y]
end
async_do() { || ... } click to toggle source

Like sync_do, but does not block the caller's thread: the given callback will be invoked at some future time. Note that calls to async_do respect FIFO order.

# File lib/bud.rb, line 818
def async_do
  EventMachine::schedule do
    yield if block_given?
    # Do another tick, in case the user-supplied block inserted any data
    tick_internal
  end
end
avg(x) click to toggle source

aggregate method to be used in Bud::BudCollection.group. computes average of a multiset of x values

# File lib/bud/aggs.rb, line 191
def avg(x)
  [Avg.new, x]
end
bool_and(x) click to toggle source
# File lib/bud/aggs.rb, line 81
def bool_and(x)
  [BooleanAnd.new, x]
end
bool_or(x) click to toggle source
# File lib/bud/aggs.rb, line 95
def bool_or(x)
  [BooleanOr.new, x]
end
bud_clock() click to toggle source

Returns the wallclock time associated with the current Bud tick. That is, this value is guaranteed to remain the same for the duration of a single tick, but will likely change between ticks.

# File lib/bud.rb, line 1181
def bud_clock
  raise Bud::Error, "bud_clock undefined outside tick" unless @inside_tick
  @tick_clock_time ||= Time.now
  @tick_clock_time
end
cancel_shutdown_cb(id) click to toggle source
# File lib/bud.rb, line 778
def cancel_shutdown_cb(id)
  schedule_and_wait do
    raise Bud::Error unless @shutdown_callbacks.has_key? id
    @shutdown_callbacks.delete(id)
  end
end
channel(name, schema=nil, loopback=false) click to toggle source

declare a transient network collection. default schema [:address, :val] => []

# File lib/bud/state.rb, line 115
def channel(name, schema=nil, loopback=false)
  define_collection(name)
  @tables[name] = Bud::BudChannel.new(name, self, schema, loopback)
  @channels[name] = @tables[name]
end
choose(x) click to toggle source

exemplary aggregate method to be used in Bud::BudCollection.group. arbitrarily but deterministically chooses among x entries being aggregated.

# File lib/bud/aggs.rb, line 114
def choose(x)
  [Choose.new, x]
end
choose_rand(x=nil) click to toggle source

exemplary aggregate method to be used in Bud::BudCollection.group. randomly chooses among x entries being aggregated.

# File lib/bud/aggs.rb, line 144
def choose_rand(x=nil)
  [ChooseOneRand.new, x]
end
coll_expr(name, expr, schema=nil) click to toggle source

declare a collection-generating expression. default schema [:key] => [:val].

# File lib/bud/state.rb, line 64
def coll_expr(name, expr, schema=nil)
  define_collection(name)
  @tables[name] = Bud::BudCollExpr.new(name, self, expr, schema)
end
collection_stratum(collection) click to toggle source

Return the stratum number of the given collection. NB: if a collection does not appear on the lhs or rhs of any rules, it is not currently assigned to a strata.

# File lib/bud.rb, line 1190
def collection_stratum(collection)
  t_stratum.each do |t|
    return t.stratum if t.predicate == collection
  end

  return nil
end
count(x=nil) click to toggle source

aggregate method to be used in Bud::BudCollection.group. counts number of entries aggregated. argument is ignored.

# File lib/bud/aggs.rb, line 171
def count(x=nil)
  [Count.new]
end
delta(out_tbl) click to toggle source

A common special case for sync_callback: block on a delta to a table.

# File lib/bud.rb, line 908
def delta(out_tbl)
  sync_callback(nil, nil, out_tbl)
end
do_bootstrap() click to toggle source

Evaluate all bootstrap blocks and tick deltas

# File lib/bud.rb, line 1052
def do_bootstrap
  # Evaluate bootstrap for imported modules
  @this_rule_context = self
  imported = import_defs.keys
  imported.each do |mod_alias|
    wrapper = import_instance mod_alias
    wrapper.do_bootstrap
  end
  self.class.ancestors.reverse.each do |anc|
    anc.instance_methods(false).each do |m|
      if /^__bootstrap__/.match m
        self.method(m.to_sym).call
      end
    end
  end
  bootstrap

  if toplevel == self
    @tables.each_value {|t| t.bootstrap}
    @lattices.each_value {|l| l.bootstrap}
  end
  @done_bootstrap = true
end
do_invalidate_rescan() click to toggle source
# File lib/bud.rb, line 1076
def do_invalidate_rescan
  @default_rescan.each {|elem| elem.rescan = true}
  @default_invalidate.each {|elem|
    elem.invalidated = true
    # Call tick on tables here itself. The rest below
    elem.invalidate_cache unless elem.class <= PushElement
  }

  # The following loop invalidates additional (non-default) elements and
  # tables that depend on the run-time invalidation state of a table.  Loop
  # once to set the flags.
  each_scanner do |scanner, stratum|
    if scanner.rescan
      scanner.rescan_set.each {|e| e.rescan = true}
      scanner.invalidate_set.each {|e|
        e.invalidated = true
        e.invalidate_cache unless e.class <= PushElement
      }
    end
  end

  # Loop a second time to actually call invalidate_cache.  We can't merge this
  # with the loops above because some versions of invalidate_cache (e.g.,
  # join) depend on the rescan state of other elements.
  @num_strata.times do |stratum|
    @push_sorted_elems[stratum].each {|e| e.invalidate_cache if e.invalidated}
  end
end
do_register_callback(tbl_name, &blk) click to toggle source
# File lib/bud.rb, line 848
def do_register_callback(tbl_name, &blk)
  unless @tables.has_key? tbl_name
    raise Bud::Error, "no such collection: #{tbl_name}"
  end

  raise Bud::Error if @callbacks.has_key? @callback_id
  @callbacks[@callback_id] = [tbl_name, blk]
  cb_id = @callback_id
  @callback_id += 1
  return cb_id
end
file_reader(name, filename) click to toggle source

declare a collection to be read from filename. rhs of statements only

# File lib/bud/state.rb, line 131
def file_reader(name, filename)
  define_collection(name)
  @tables[name] = Bud::BudFileReader.new(name, filename, self)
end
import_instance(name) click to toggle source
# File lib/bud.rb, line 198
def import_instance(name)
  name = "@" + name.to_s
  instance_variable_get(name) if instance_variable_defined? name
end
inspect() click to toggle source

XXX make tag specific

# File lib/bud.rb, line 913
def inspect
  "#{self.class}:#{self.object_id.to_s(16)}"
end
int_ip_port() click to toggle source

Returns the internal IP and port. See ip_port.

# File lib/bud.rb, line 1041
def int_ip_port
  raise Bud::Error, "int_ip_port called before port defined" if @port.nil? and @options[:port] == 0
  @port.nil? ? "#{@ip}:#{@options[:port]}" : "#{@ip}:#{@port}"
end
interface(mode, name, schema=nil) click to toggle source

declare a transient collection to be an input or output interface

# File lib/bud/state.rb, line 51
def interface(mode, name, schema=nil)
  define_collection(name)
  t_provides << [name.to_s, mode]
  @tables[name] = (mode ? BudInputInterface : BudOutputInterface).new(name, self, schema)
end
interfaces(direction, collections) click to toggle source

an alternative approach to declaring interfaces

# File lib/bud/state.rb, line 157
def interfaces(direction, collections)
  mode = case direction
    when :input then true
    when :output then false
  else
    raise Bud::CompileError, "unrecognized interface type #{direction}"
  end
  collections.each do |tab|
    t_provides << [tab.to_s, mode]
  end 
end
ip() click to toggle source
# File lib/bud.rb, line 1030
def ip
  options[:ext_ip] ? "#{@options[:ext_ip]}" : "#{@ip}"
end
ip_port() click to toggle source

Returns the IP and port of the Bud instance as a string. In addition to the local IP and port, the user may define an external IP and/or port. The external version of each is returned if available. If not, the local version is returned. There are use cases for mixing and matching local and external. local_ip:external_port would be if you have local port forwarding, and external_ip:local_port would be if you're in a DMZ, for example.

# File lib/bud.rb, line 1025
def ip_port
  raise Bud::Error, "ip_port called before port defined" if port.nil?
  ip.to_s + ":" + port.to_s
end
load_lattice_defs() click to toggle source

Define methods to implement the state declarations for every registered kind of lattice.

# File lib/bud/state.rb, line 171
def load_lattice_defs
  Bud::Lattice.global_mfuncs.each do |m|
    next if RuleRewriter::MONOTONE_WHITELIST.include? m
    if Bud::BudCollection.instance_methods.include? m.to_s
      puts "monotone method #{m} conflicts with non-monotonic method in BudCollection"
    end
  end

  Bud::Lattice.global_morphs.each do |m|
    next if RuleRewriter::MONOTONE_WHITELIST.include? m
    if Bud::BudCollection.instance_methods.include? m.to_s
      puts "morphism #{m} conflicts with non-monotonic method in BudCollection"
    end
  end

  # Sanity-check lattice definitions
  # XXX: We should do this only once per lattice
  Bud::Lattice.lattice_kinds.each do |wrap_name, klass|
    unless klass.method_defined? :merge
      raise Bud::CompileError, "lattice #{wrap_name} does not define a merge function"
    end

    # If a method is marked as monotone in any lattice, every lattice that
    # declares a method of that name must also mark it as monotone.
    meth_list = klass.instance_methods(false).to_set
    Bud::Lattice.global_mfuncs.each do |m|
      next unless meth_list.include? m.to_s
      unless klass.mfuncs.include? m
        raise Bud::CompileError, "method #{m} in #{wrap_name} must be monotone"
      end
    end

    # Apply a similar check for morphs
    Bud::Lattice.global_morphs.each do |m|
      next unless meth_list.include? m.to_s
      unless klass.morphs.include? m
        raise Bud::CompileError, "method #{m} in #{wrap_name} must be a morph"
      end
    end

    # Similarly, check for non-monotone lattice methods that are found in the
    # builtin list of monotone operators. The "merge" method is implicitly
    # monotone (XXX: should it be declared as a morph or monotone function?)
    meth_list.each do |m_str|
      m = m_str.to_sym
      next unless RuleRewriter::MONOTONE_WHITELIST.include? m
      # XXX: ugly hack. We want to allow lattice class implementations to
      # define their own equality semantics.
      next if m == :==
      unless klass.mfuncs.include?(m) || klass.morphs.include?(m) || m == :merge
        raise Bud::CompileError, "method #{m} in #{wrap_name} must be monotone"
      end
    end

    # XXX: replace "self" with toplevel?
    self.singleton_class.send(:define_method, wrap_name) do |lat_name|
      define_lattice(lat_name)
      @lattices[lat_name] = Bud::LatticeWrapper.new(lat_name, klass, self)
    end
  end
end
loopback(name, schema=nil) click to toggle source

declare a transient network collection that delivers facts back to the current Bud instance. This is syntax sugar for a channel that always delivers to the IP/port of the current Bud instance. Default schema [:key] => [:val]

# File lib/bud/state.rb, line 125
def loopback(name, schema=nil)
  schema ||= {[:key] => [:val]}
  channel(name, schema, true)
end
max(x) click to toggle source

exemplary aggregate method to be used in Bud::BudCollection.group. computes maximum of x entries aggregated.

# File lib/bud/aggs.rb, line 67
def max(x)
  [Max.new, x]
end
min(x) click to toggle source

exemplary aggregate method to be used in Bud::BudCollection.group. computes minimum of x entries aggregated.

# File lib/bud/aggs.rb, line 50
def min(x)
  [Min.new, x]
end
module_wrapper_class(mod) click to toggle source
# File lib/bud.rb, line 173
def module_wrapper_class(mod)
  class_name = "#{mod}__wrap"
  begin
    klass = Module.const_get(class_name.to_sym)
    unless klass.is_a? Class
      raise Bud::Error, "internal error: #{class_name} is in use"
    end
  rescue NameError # exception if class class_name doesn't exist
  end
  klass ||= eval "class #{class_name}; include Bud; include #{mod}; end"
  klass
end
on_shutdown(&blk) click to toggle source

Register a callback that will be invoked when this instance of Bud is shutting down. XXX: The naming of this method (and cancel_shutdown_cb) is inconsistent with the naming of register_callback and friends.

# File lib/bud.rb, line 766
def on_shutdown(&blk)
  # Start EM if not yet started
  start_reactor
  rv = nil
  schedule_and_wait do
    rv = @shutdown_callback_id
    @shutdown_callbacks[@shutdown_callback_id] = blk
    @shutdown_callback_id += 1
  end
  return rv
end
pause() click to toggle source

Pause an instance of Bud that is running asynchronously. That is, this method allows a Bud instance operating in run_bg or run_fg mode to be switched to “single-stepped” mode; timesteps can be manually invoked via tick(). To switch back to running Bud asynchronously, call run_bg().

# File lib/bud.rb, line 698
def pause
  schedule_and_wait do
    @running_async = false
  end
end
periodic(name, period=1) click to toggle source

declare a collection to be auto-populated every period seconds. schema [:key] => [:val]. rhs of statements only.

# File lib/bud/state.rb, line 138
def periodic(name, period=1)
  define_collection(name)
  raise Bud::Error if @periodics.has_key? [name]
  @periodics << [name, period]
  @tables[name] = Bud::BudPeriodic.new(name, self)
end
port() click to toggle source
# File lib/bud.rb, line 1034
def port
  return nil if @port.nil? and @options[:port] == 0 and not @options[:ext_port]
  return @options[:ext_port] ? @options[:ext_port] :
    (@port.nil? ? @options[:port] : @port)
end
post_shutdown(&blk) click to toggle source

Register a callback that will be invoked when after this instance of Bud has been shutdown.

# File lib/bud.rb, line 787
def post_shutdown(&blk)
  # Start EM if not yet started
  start_reactor
  schedule_and_wait do
    @post_shutdown_callbacks << blk
  end
end
readonly(name, schema=nil) click to toggle source
# File lib/bud/state.rb, line 102
def readonly(name, schema=nil)
  define_collection(name)
  @tables[name] = Bud::BudReadOnly.new(name, self, schema)
end
register_callback(tbl_name, &blk) click to toggle source

Register a new callback. Given the name of a Bud collection, this method arranges for the given block to be invoked at the end of any tick in which any tuples have been inserted into the specified collection. The code block is passed the collection as an argument; this provides a convenient way to examine the tuples inserted during that fixpoint. (Note that because the Bud runtime is blocked while the callback is invoked, it can also examine any other Bud state freely.)

Note that registering callbacks on persistent collections (e.g., tables, syncs and stores) is probably not wise: as long as any tuples are stored in the collection, the callback will be invoked at the end of every tick.

# File lib/bud.rb, line 837
def register_callback(tbl_name, &blk)
  # We allow callbacks to be added before or after EM has been started. To
  # simplify matters, we start EM if it hasn't been started yet.
  start_reactor
  cb_id = nil
  schedule_and_wait do
    cb_id = do_register_callback(tbl_name, &blk)
  end
  return cb_id
end
run_bg() click to toggle source

Run Bud in the background (in a different thread). This means that the Bud interpreter will run asynchronously from the caller, so care must be used when interacting with it. For example, it is not safe to directly examine Bud collections from the caller's thread (see async_do and sync_do).

This instance of Bud will run until stop() is called.

# File lib/bud.rb, line 637
def run_bg
  start

  schedule_and_wait do
    if @running_async
      raise Bud::Error, "run_bg called on already-running Bud instance"
    end
    @running_async = true

    # Consume any events received while we weren't running async
    tick_internal
  end

  @rtracer.sleep if options[:rtrace]
end
run_fg() click to toggle source

Run Bud in the “foreground” – the caller's thread will be used to run the Bud interpreter. This means this method won't return unless an error occurs or Bud is halted. It is often more useful to run Bud in a different thread: see run_bg.

# File lib/bud.rb, line 708
def run_fg
  # If we're called from the EventMachine thread (and EM is running), blocking
  # the current thread would imply deadlocking ourselves.
  if Thread.current == EventMachine::reactor_thread and EventMachine::reactor_running?
    raise Bud::Error, "cannot invoke run_fg from inside EventMachine"
  end

  q = Queue.new
  # Note that this must be a post-shutdown callback: if this is the only
  # thread, then the program might exit after run_fg() returns. If run_fg()
  # blocked on a normal shutdown callback, the program might exit before the
  # other shutdown callbacks have a chance to run.
  post_shutdown do
    q.push(true)
  end

  run_bg
  # Block caller's thread until Bud has shutdown
  q.pop
  report_metrics if options[:metrics]
end
scratch(name, schema=nil) click to toggle source

declare a transient collection. default schema [:key] => [:val]

# File lib/bud/state.rb, line 97
def scratch(name, schema=nil)
  define_collection(name)
  @tables[name] = Bud::BudScratch.new(name, self, schema)
end
start(do_tick=false) click to toggle source

Startup a Bud instance if it is not currently started. This starts EventMachine (if needed) and binds to a UDP server socket. If do_tick is true, we also execute a single Bloom timestep. Regardless, calling this method does NOT cause Bud to begin running asynchronously (see run_bg).

# File lib/bud.rb, line 657
def start(do_tick=false)
  start_reactor
  schedule_and_wait do
    do_startup unless @bud_started
    tick_internal if do_tick
  end
end
stop(stop_em=false, do_shutdown_cb=true) click to toggle source

Shutdown a Bud instance and release any resources that it was using. This method blocks until Bud has been shutdown. If stop_em is true, the EventMachine event loop is also shutdown; this will interfere with the execution of any other Bud instances in the same process (as well as anything else that happens to use EventMachine). We always shutdown the EM loop if there are no more running Bud instances (this does interfere with other EM-using apps, but it is necessary).

# File lib/bud.rb, line 737
def stop(stop_em=false, do_shutdown_cb=true)
  schedule_and_wait do
    do_shutdown(do_shutdown_cb)
  end

  # If we're shutting down the last active Bud instance, shutdown the EM event
  # loop as well. This is probably good practice in general, but it also
  # prevents weird EM behavior -- it seems as though EM::ConnectionNotBound
  # exceptions can be raised if the EM event loop is left running and
  # subsequent events arrive.
  $signal_lock.synchronize {
    stop_em = true if $bud_instances.empty? and EventMachine::reactor_running?
  }

  if stop_em
    Bud.stop_em_loop
    unless Thread.current == EventMachine::reactor_thread
      EventMachine::reactor_thread.join
    end
  end

  report_metrics if options[:metrics]
end
Also aliased as: stop_bg
stop_bg(stop_em=false, do_shutdown_cb=true)
Alias for: stop
store(name, storage, schema=nil) click to toggle source
# File lib/bud/state.rb, line 81
def store(name, storage, schema=nil)
  define_collection(name)
  case storage
  when :zookeeper
    # treat "schema" as a hash of options
    options = schema
    raise Bud::Error, "Zookeeper tables require a :path option" if options[:path].nil?
    options[:addr] ||= "localhost:2181"
    @tables[name] = Bud::BudZkTable.new(name, options[:path], options[:addr], self)
    @zk_tables[name] = @tables[name]
  else
    raise Bud::Error, "unknown async storage engine #{storage.to_s}"
  end
end
sum(x) click to toggle source

aggregate method to be used in Bud::BudCollection.group. computes sum of x entries aggregated.

# File lib/bud/aggs.rb, line 156
def sum(x)
  [Sum.new, x]
end
sync(name, storage, schema=nil) click to toggle source

declare a syncronously-flushed persistent collection. default schema [:key] => [:val].

# File lib/bud/state.rb, line 70
def sync(name, storage, schema=nil)
  define_collection(name)
  case storage
  when :dbm
    @tables[name] = Bud::BudDbmTable.new(name, self, schema)
    @dbm_tables[name] = @tables[name]
  else
    raise Bud::Error, "unknown synchronous storage engine #{storage.to_s}"
  end
end
sync_callback(in_tbl, tupleset, out_tbl) click to toggle source

sync_callback supports synchronous interaction with Bud modules. The caller supplies the name of an input collection, a set of tuples to insert, and an output collection on which to 'listen.' The call blocks until tuples are inserted into the output collection: these are returned to the caller.

# File lib/bud.rb, line 872
def sync_callback(in_tbl, tupleset, out_tbl)
  q = Queue.new

  # If the runtime shuts down before we see anything in the output collection,
  # make sure we hear about it so we can raise an error
  # XXX: Using two separate callbacks here is ugly.
  shutdown_cb = on_shutdown do
    q.push :callback
  end

  cb = nil
  sync_do {
    if in_tbl
      t = @tables[in_tbl]
      if t.class <= Bud::BudChannel or t.class <= Bud::BudZkTable
        t <~ tupleset
      else
        t <+ tupleset
      end
    end

    cb = do_register_callback(out_tbl) do |c|
      q.push c.to_a
    end
  }
  result = q.pop
  if result == :callback
    # Don't try to unregister the callbacks first: runtime is already shutdown
    raise Bud::ShutdownWithCallbacksError, "Bud instance shutdown before sync_callback completed"
  end
  unregister_callback(cb)
  cancel_shutdown_cb(shutdown_cb)
  return result
end
sync_do() { || ... } click to toggle source

Given a block, evaluate that block inside the background Ruby thread at some time in the future, and then perform a Bloom tick. Because the block is evaluate inside the background Ruby thread, the block can safely examine Bud state. Note that calling sync_do blocks the caller until the block has been evaluated; for a non-blocking version, see async_do.

Note that the block is invoked after one Bud timestep has ended but before the next timestep begins. Hence, synchronous accumulation (<=) into a Bud scratch collection in a callback is typically not a useful thing to do: when the next tick begins, the content of any scratch collections will be emptied, which includes anything inserted by a sync_do block using <=. To avoid this behavior, insert into scratches using <+.

# File lib/bud.rb, line 807
def sync_do
  schedule_and_wait do
    yield if block_given?
    # Do another tick, in case the user-supplied block inserted any data
    tick_internal
  end
end
table(name, schema=nil) click to toggle source

declare an in-memory, non-transient collection. default schema [:key] => [:val].

# File lib/bud/state.rb, line 58
def table(name, schema=nil)
  define_collection(name)
  @tables[name] = Bud::BudTable.new(name, self, schema)
end
temp(name) click to toggle source

declare a scratch in a bloom statement lhs. schema inferred from rhs.

# File lib/bud/state.rb, line 108
def temp(name)
  define_collection(name)
  # defer schema definition until merge
  @tables[name] = Bud::BudTemp.new(name, self, nil, true)
end
tick() click to toggle source

From client code, manually trigger a timestep of Bloom execution.

# File lib/bud.rb, line 1047
def tick
  start(true)
end
tick_internal() click to toggle source

One timestep of Bloom execution. This MUST be invoked from the EventMachine thread; it is not intended to be called directly by client code.

# File lib/bud.rb, line 1107
def tick_internal
  puts "#{object_id}/#{port} : ============================================= (#{@budtime})" if $BUD_DEBUG
  begin
    starttime = Time.now if options[:metrics]
    if options[:metrics] and not @endtime.nil?
      @metrics[:betweentickstats] ||= initialize_stats
      @metrics[:betweentickstats] = running_stats(@metrics[:betweentickstats],
                                                  starttime - @endtime)
    end
    @inside_tick = true

    unless @done_bootstrap
      do_bootstrap
      do_wiring
    else
      # inform tables and elements about beginning of tick.
      @app_tables.each {|t| t.tick}
      do_invalidate_rescan
    end

    receive_inbound
    # compute fixpoint for each stratum in order
    @stratified_rules.each_with_index do |rules,stratum|
      fixpoint = false
      first_iter = true
      until fixpoint
        @scanners[stratum].each_value {|s| s.scan(first_iter)}
        fixpoint = true
        first_iter = false
        # flush any tuples in the pipes
        @push_sorted_elems[stratum].each {|p| p.flush}
        # tick deltas on any merge targets and look for more deltas
        # check to see if any joins saw a delta
        @push_joins[stratum].each do |p|
          if p.found_delta
            fixpoint = false
            p.tick_deltas
          end
        end
        @merge_targets[stratum].each do |t|
          fixpoint = false if t.tick_deltas
        end
      end
      # push end-of-fixpoint
      @push_sorted_elems[stratum].each do |p|
        p.stratum_end
      end
      @merge_targets[stratum].each do |t|
        t.flush_deltas
      end
    end
    @viz.do_cards if @options[:trace]
    do_flush

    invoke_callbacks
    @budtime += 1
    @inbound.clear
    @reset_list.each {|e| e.invalidated = false; e.rescan = false}

  ensure
    @inside_tick = false
    @tick_clock_time = nil
  end

  if options[:metrics]
    @endtime = Time.now
    @metrics[:tickstats] ||= initialize_stats
    @metrics[:tickstats] = running_stats(@metrics[:tickstats], @endtime - starttime)
  end
end
toplevel() click to toggle source
# File lib/bud.rb, line 186
def toplevel
   @toplevel = (@options[:toplevel] || self)
end
toplevel?() click to toggle source
# File lib/bud.rb, line 194
def toplevel?
  toplevel.object_id == self.object_id
end
unregister_callback(id) click to toggle source

Unregister the callback that has the given ID.

# File lib/bud.rb, line 861
def unregister_callback(id)
  schedule_and_wait do
    raise Bud::Error, "missing callback: #{id.inspect}" unless @callbacks.has_key? id
    @callbacks.delete(id)
  end
end
wiring?() click to toggle source

Are we currently in the process of wiring together the dataflow?

# File lib/bud.rb, line 208
def wiring?
  toplevel? ? (@done_bootstrap && !@done_wiring) : toplevel.wiring?
end

Private Instance Methods

builtin_state() click to toggle source

Builtin Bud state (predefined collections). We could define this using the standard state block syntax, but we want to ensure that builtin state is initialized before user-defined state.

# File lib/bud.rb, line 1203
def builtin_state
  # We expect there to be no previously-defined tables
  raise Bud::Error unless @tables.empty?

  loopback  :localtick, [:col1]
  @stdio = terminal :stdio
  scratch :halt, [:key]
  @periodics = table :periodics_tbl, [:pername] => [:period]

  # for BUD reflection
  table :t_cycle, [:predicate, :via, :neg, :temporal]
  table :t_depends, [:bud_obj, :rule_id, :lhs, :op, :body] => [:nm, :in_body]
  table :t_provides, [:interface] => [:input]
  table :t_rule_stratum, [:bud_obj, :rule_id] => [:stratum]
  table :t_rules, [:bud_obj, :rule_id] => [:lhs, :op, :src, :orig_src, :unsafe_funcs_called]
  table :t_stratum, [:predicate] => [:stratum]
  table :t_table_info, [:tab_name, :tab_type]
  table :t_table_schema, [:tab_name, :col_name, :ord, :loc]
  table :t_underspecified, t_provides.schema

  # Identify builtin tables as such
  @builtin_tables = @tables.clone if toplevel
end
call_state_methods() click to toggle source

If module Y is a parent module of X, X's state block might reference state defined in Y. Hence, we want to invoke Y's state block first. However, when “import” and “include” are combined, we can't use the inheritance hierarchy to do this. When a module Z is imported, the import process inlines all the modules Z includes into a single module. Hence, we can no longer rely on the inheritance hierarchy to respect dependencies between modules. To fix this, we add an increasing ID to each state block's method name (assigned according to the order in which the state blocks are defined); we then sort by this order before invoking the state blocks.

# File lib/bud.rb, line 303
def call_state_methods
  meth_map = {} # map from ID => [Method]
  self.class.instance_methods.each do |m|
    next unless m =~ /^__state(\d+)__/
    id = Regexp.last_match.captures.first.to_i
    meth_map[id] ||= []
    meth_map[id] << self.method(m)
  end

  meth_map.keys.sort.each do |i|
    meth_map[i].each {|m| m.call}
  end
end
check_collection_name(name) click to toggle source

methods for registering collection types

# File lib/bud/state.rb, line 4
def check_collection_name(name)
  if @tables.has_key? name or @lattices.has_key? name
    raise Bud::CompileError, "collection already exists: #{name}"
  end

  # Rule out collection names that use reserved words, including
  # previously-defined method names.
  reserved = eval "defined?(#{name})"
  unless reserved.nil?
    raise Bud::CompileError, "symbol :#{name} reserved, cannot be used as collection name"
  end
end
define_collection(name) click to toggle source
# File lib/bud/state.rb, line 17
def define_collection(name)
  check_collection_name(name)

  self.singleton_class.send(:define_method, name) do |*args, &blk|
    if blk.nil?
      return @tables[name]
    else
      return @tables[name].pro(&blk)
    end
  end
end
define_lattice(name) click to toggle source
# File lib/bud/state.rb, line 29
def define_lattice(name)
  check_collection_name(name)

  self.singleton_class.send(:define_method, name) do |*args, &blk|
    if blk.nil?
      return @lattices[name]
    else
      return @lattices[name].pro(&blk)
    end
  end
end
do_flush() click to toggle source

“Flush” any tuples that need to be flushed. This does two things:

  1. Emit outgoing tuples in channels and ZK tables.

  2. Commit to disk any changes made to on-disk tables.

# File lib/bud.rb, line 1242
def do_flush
  @channels.each_value { |c| c.flush }
  @zk_tables.each_value { |t| t.flush }
  @dbm_tables.each_value { |t| t.flush }
end
do_rewrite() click to toggle source
# File lib/bud.rb, line 612
def do_rewrite
  @meta_parser = BudMeta.new(self)
  @stratified_rules = @meta_parser.meta_rewrite
end
do_shutdown(do_shutdown_cb=true) click to toggle source
# File lib/bud.rb, line 983
def do_shutdown(do_shutdown_cb=true)
  # Silently ignore duplicate shutdown requests or attempts to shutdown an
  # instance that hasn't been started yet.
  return if @instance_id == ILLEGAL_INSTANCE_ID
  $signal_lock.synchronize {
    raise Bud::Error unless $bud_instances.has_key? @instance_id
    $bud_instances.delete @instance_id
    @instance_id = ILLEGAL_INSTANCE_ID
  }

  unregister_callback(@halt_cb)
  if do_shutdown_cb
    @shutdown_callbacks.each_value {|cb| cb.call}
  end
  @timers.each {|t| t.cancel}
  @tables.each_value {|t| t.close}
  if EventMachine::reactor_running? and @bud_started
    @dsock.close_connection
  end
  @bud_started = false
  @running_async = false
  if do_shutdown_cb
    @post_shutdown_callbacks.each {|cb| cb.call}
  end
end
do_start_server() click to toggle source
# File lib/bud.rb, line 1009
def do_start_server
  @dsock = EventMachine::open_datagram_socket(@ip, @options[:port],
                                              BudServer, self,
                                              @options[:channel_filter])
  @port = Socket.unpack_sockaddr_in(@dsock.get_sockname)[0]
end
do_startup() click to toggle source
# File lib/bud.rb, line 666
def do_startup
  raise Bud::Error, "EventMachine not started" unless EventMachine::reactor_running?
  raise Bud::Error unless EventMachine::reactor_thread?

  @instance_id = Bud.init_signal_handlers(self)
  do_start_server
  @bud_started = true

  # Initialize periodics
  @periodics.each do |p|
    @timers << make_periodic_timer(p.pername, p.period)
  end

  # Arrange for Bud to read from stdin if enabled. Note that we can't do this
  # earlier because we need to wait for EventMachine startup.
  @stdio.start_stdin_reader if @options[:stdin]
  @zk_tables.each_value {|t| t.start_watchers}

  @halt_cb = register_callback(:halt) do |t|
    stop
    if t.first.key == :kill
      Bud.shutdown_all_instances
      Bud.stop_em_loop
    end
  end
end
do_wiring() click to toggle source
# File lib/bud.rb, line 317
def do_wiring
  @stratified_rules.each_with_index { |rules, stratum| eval_rules(rules, stratum) }

  # Prepare list of tables that will be actively used at run time. First, all
  # the user-defined tables and lattices.  We start @app_tables off as a set,
  # then convert to an array later.
  @app_tables = (@tables.keys - @builtin_tables.keys).map {|t| @tables[t]}.to_set
  @app_tables.merge(@lattices.values)

  # Check scan and merge_targets to see if any builtin_tables need to be added as well.
  @scanners.each do |scs|
    @app_tables.merge(scs.values.map {|s| s.collection})
  end
  @merge_targets.each do |mts| #mts == merge_targets at stratum
    @app_tables.merge(mts)
  end
  @app_tables = @app_tables.to_a

  # for each stratum create a sorted list of push elements in topological order
  @push_sorted_elems = []
  @scanners.each do |scs|  # scs's values constitute scanners at a stratum
    # start with scanners and transitively add all reachable elements in a
    # breadth-first order
    working = scs.values
    seen = Set.new(working)
    sorted_elems = [] # sorted elements in this stratum
    while not working.empty?
      sorted_elems.concat(working)
      wired_to = []
      working.each do |e|
        e.wirings.each do |out|
          if ((out.class <= PushElement || out.class <= LatticePushElement) and not seen.member?(out))
            seen << out
            wired_to << out
          end
        end
      end
      working = wired_to
    end
    @push_sorted_elems << sorted_elems
  end

  @merge_targets.each_with_index do |stratum_targets, stratum|
    @scanners[stratum].each_value do |s|
      stratum_targets << s.collection
    end
  end

  # We create "orphan" scanners for collections that don't appear on the RHS
  # of any rules, but do appear on the LHS of at least one rule. These
  # scanners aren't needed to compute the fixpoint, but they are used as part
  # of rescan/invalidation (e.g., if an orphaned collection receives a manual
  # deletion operation, we need to arrange for the collection to be
  # re-filled).
  @orphan_scanners = []       # Pairs of [scanner, stratum]
  @app_tables.each do |t|
    next unless t.class <= Bud::BudCollection         # skip lattice wrappers
    next if t.scanner_cnt > 0

    stratum = collection_stratum(t.qualified_tabname.to_s)
    # if the collection also doesn't appear on any LHSs, skip it
    next if stratum.nil?
    @orphan_scanners << [Bud::ScannerElement.new(t.tabname, self, t, t.schema),
                         stratum]
  end

  # Sanity check
  @push_sorted_elems.each do |stratum_elems|
    stratum_elems.each {|se| se.check_wiring}
  end

  # Create sets of elements and collections to invalidate or rescan at the
  # beginning of each tick
  prepare_invalidation_scheme

  # For all tables that are accessed (scanned) in a stratum higher than the
  # one they are updated in, set a flag to track deltas accumulated in that
  # tick (see: collection.tick_delta)
  stratum_accessed = {}
  (@num_strata-1).downto(0) do |stratum|
    @scanners[stratum].each_value do |s|
      stratum_accessed[s.collection] ||= stratum
    end
  end
  @merge_targets.each_with_index do |stratum_targets, stratum|
    stratum_targets.each {|tab|
      tab.accumulate_tick_deltas = true if stratum_accessed[tab] and stratum_accessed[tab] > stratum
    }
  end

  @done_wiring = true
  if @options[:print_wiring]
    @push_sources.each do |strat|
      strat.each_value {|src| src.print_wiring}
    end
  end
end
each_scanner() { |scanner, stratum| ... } click to toggle source
# File lib/bud.rb, line 600
def each_scanner
  @num_strata.times do |stratum|
    @scanners[stratum].each_value do |scanner|
      yield scanner, stratum
    end
  end

  @orphan_scanners.each do |scanner,stratum|
    yield scanner, stratum
  end
end
eval_rule(__obj__, __src__) click to toggle source
# File lib/bud.rb, line 1248
def eval_rule(__obj__, __src__)
  __obj__.instance_eval __src__  # ensure that the only local variables are __obj__ and __src__
end
eval_rules(rules, strat_num) click to toggle source
# File lib/bud.rb, line 1252
def eval_rules(rules, strat_num)
  # This routine evals the rules in a given stratum, which results in a wiring
  # of PushElements
  @this_stratum = strat_num
  rules.each_with_index do |rule, i|
    # user-supplied code blocks will be evaluated in this context at run-time
    @this_rule_context = rule.bud_obj
    begin
      eval_rule(rule.bud_obj, rule.src)
    rescue Exception => e
      err_msg = "** Exception while wiring rule: #{rule.orig_src}\n ****** #{e}"
      # Create a new exception for accomodating err_msg, but reuse original backtrace
      new_e = (e.class <= Bud::Error) ? e.class.new(err_msg) : Bud::Error.new(err_msg)
      new_e.set_backtrace(e.backtrace)
      raise new_e
    end
  end
  @this_rule_context = nil
  @this_stratum = -1
end
gen_id() click to toggle source

ids and timers

# File lib/bud.rb, line 1274
def gen_id
  Time.new.to_i.to_s << rand.to_s
end
import_defs() click to toggle source
# File lib/bud.rb, line 213
def import_defs
  @imported_defs = {} # mod name -> Module map
  self.class.ancestors.each do |anc|
    if anc.respond_to? :bud_import_table
      anc_imp_tbl = anc.bud_import_table
      anc_imp_tbl.each do |nm, mod|
        # Ensure that two modules have not been imported under one alias.
        if @imported_defs.member? nm and not @imported_defs[nm] == anc_imp_tbl[nm]
            raise Bud::CompileError, "conflicting imports: modules #{@imported_defs[nm]} and #{anc_imp_tbl[nm]} both are imported as '#{nm}'"
        end
        @imported_defs[nm] = mod
      end
    end
  end
  @imported_defs ||= self.class.ancestors.inject({}) {|tbl, e| tbl.merge(e.bud_import_table)}
end
invoke_callbacks() click to toggle source
# File lib/bud.rb, line 919
def invoke_callbacks
  @callbacks.each_value do |cb|
    tbl_name, block = cb
    tbl = @tables[tbl_name]
    unless tbl.empty?
      block.call(tbl)
    end
  end
end
make_periodic_timer(name, period) click to toggle source
# File lib/bud.rb, line 1278
def make_periodic_timer(name, period)
  EventMachine::PeriodicTimer.new(period) do
    @inbound[name.to_sym] ||= []
    @inbound[name.to_sym] << [gen_id, Time.now]
    tick_internal if @running_async
  end
end
prepare_invalidation_scheme() click to toggle source

All collections (elements included) are semantically required to erase any cached information at the start of a tick and start from a clean slate. prepare_invalidation_scheme prepares a just-in-time invalidation scheme that permits us to preserve data from one tick to the next, and to keep things in incremental mode unless there's a negation.

This scheme solves the following constraints.

  1. A full scan of an element's contents results in downstream elements getting full scans themselves (i.e., no deltas). This effect is transitive.

  2. Invalidation of an element's cache results in rebuilding of the cache and a consequent fullscan. See next.

  3. Invalidation of an element requires upstream elements to rescan their contents, or to transitively pass the request on further upstream. Any element that has a cache can rescan without passing on the request to higher levels.

This set of constraints is solved once during wiring, resulting in four data structures:

@default_invalidate = Set of elements and tables to always invalidate at

every tick.

@default_rescan = Set of elements and tables to always scan fully in the

first iteration of every tick.

scanner.invalidate_set = Set of elements to additionally invalidate

if the scanner's table is invalidated at
run-time.

scanner.rescan_set = Similar to above.

# File lib/bud.rb, line 447
def prepare_invalidation_scheme
  if $BUD_SAFE
    @app_tables = @tables.values + @lattices.values # No collections excluded

    rescan = Set.new
    invalidate = @app_tables.select {|t| t.class <= BudScratch}.to_set
    @num_strata.times do |stratum|
      @push_sorted_elems[stratum].each do |elem|
        invalidate << elem
        rescan << elem
      end
    end
    @default_rescan = rescan.to_a
    @default_invalidate = invalidate.to_a
    @reset_list = [] # Nothing to reset at end of tick. It'll be overwritten anyway
    return
  end

  # By default, all tables are considered sources unless they appear on the
  # lhs.  We only consider non-temporal rules because invalidation is only
  # about this tick.  Also, we track (in unsafe_targets) those tables that are
  # the targets of user-defined code blocks that call "unsafe" functions that
  # produce a different value in every tick (e.g., budtime). Elements that
  # feed these tables are forced to rescan their contents, and thus forced to
  # re-execute these code blocks.
  unsafe_targets = Set.new
  t_rules.each do |rule|
    lhs = rule.lhs.to_sym
    if rule.op == "<="
      # Note that lattices cannot be sources
      @tables[lhs].is_source = false if @tables.has_key? lhs
    end
    unsafe_targets << lhs if rule.unsafe_funcs_called
  end

  # Compute a set of tables and elements that should be explicitly told to
  # invalidate or rescan.  Start with a set of tables that always invalidate
  # and elements that always rescan.
  invalidate = @app_tables.select {|t| t.invalidate_at_tick}.to_set
  rescan = Set.new

  @num_strata.times do |stratum|
    @push_sorted_elems[stratum].each do |elem|
      rescan << elem if elem.rescan_at_tick

      if elem.outputs.any?{|tab| not(tab.class <= PushElement) and not(tab.class <= LatticePushElement) and unsafe_targets.member? tab.qualified_tabname.to_sym }
        rescan.merge(elem.wired_by)
      end
    end
    rescan_invalidate_tc(stratum, rescan, invalidate)
  end

  @default_rescan = rescan.to_a
  @default_invalidate = invalidate.to_a

  if $BUD_DEBUG
    puts "Default rescan: #{rescan.inspect}"
    puts "Default inval: #{invalidate.inspect}"
    puts "Unsafe targets: #{unsafe_targets.inspect}"
  end

  # For each collection that is to be scanned, compute the set of dependent
  # tables and elements that will need invalidation and/or rescan if that
  # table were to be invalidated at runtime.
  dflt_rescan = rescan
  dflt_invalidate = invalidate
  to_reset = rescan + invalidate
  each_scanner do |scanner, stratum|
    # If it is going to be always invalidated, it doesn't need further
    # examination. Lattice scanners also don't get invalidated.
    next if dflt_rescan.member? scanner
    next if scanner.class <= LatticeScanner

    rescan = dflt_rescan.clone
    invalidate = dflt_invalidate + [scanner.collection]
    rescan_invalidate_tc(stratum, rescan, invalidate)
    prune_rescan_set(rescan)

    # Make sure we reset the rescan/invalidate flag for this scanner at
    # end-of-tick, but we can remove the scanner from its own
    # rescan_set/inval_set.
    to_reset.merge(rescan)
    to_reset.merge(invalidate)
    rescan.delete(scanner)
    invalidate.delete(scanner.collection)

    # Give the diffs (from default) to scanner; these are elements that are
    # dependent on this scanner
    diffscan = (rescan - dflt_rescan).find_all {|elem| elem.class <= PushElement}
    scanner.invalidate_at_tick(diffscan, (invalidate - dflt_invalidate).to_a)
  end
  @reset_list = to_reset.to_a

  # For each lattice, find the collections that should be rescanned when there
  # is a new delta for the lattice. That is, if we have a rule like:
  # "t2 <= t1 {|t| [t.key, lat_foo]}", whenever there is a delta on lat_foo we
  # should rescan t1 (to produce tuples with the updated lat_foo value).
  #
  # TODO:
  # (1) if t1 is fed by rules r1 and r2 but only r1 references lattice x,
  #     don't trigger rescan of r2 on deltas for x (hard)
  t_depends.each do |dep|
    src, target_name = dep.body.to_sym, dep.lhs.to_sym
    if @lattices.has_key? src and dep.in_body
      src_lat = @lattices[src]
      if @tables.has_key? target_name
        target = @tables[target_name]
      else
        target = @lattices[target_name]
      end

      # Conservatively, we rescan all the elements that feed the lhs (target)
      # collection via positive (non-deletion) rules; we then also need to
      # potentially rescan ancestors of those elements as well (e.g., setting
      # a stateless PushElement to rescan does nothing; we want to tell its
      # ancestor ScannerElement to rescan).
      #
      # XXX: do we need to consider all transitively reachable nodes for
      # rescan?
      lat_rescan = target.positive_predecessors.to_set
      lat_inval = Set.new
      target.positive_predecessors.each do |e|
        e.add_rescan_invalidate(lat_rescan, lat_inval)
      end
      src_lat.rescan_on_delta.merge(lat_rescan)
    end
  end
end
prune_rescan_set(rescan) click to toggle source
# File lib/bud.rb, line 596
def prune_rescan_set(rescan)
  rescan.delete_if {|e| e.rescan_at_tick}
end
receive_inbound() click to toggle source

Handle external inputs: channels, terminals, and periodics. Received messages are placed directly into the storage of the appropriate local collection. The inbound queue is cleared at the end of the tick.

# File lib/bud.rb, line 1230
def receive_inbound
  @inbound.each do |tbl_name, msg_buf|
    puts "recv via #{tbl_name}: #{msg_buf}" if $BUD_DEBUG
    msg_buf.each do |b|
      tables[tbl_name] << b
    end
  end
end
rescan_invalidate_tc(stratum, rescan, invalidate) click to toggle source

Given rescan, invalidate sets, compute transitive closure

# File lib/bud.rb, line 577
def rescan_invalidate_tc(stratum, rescan, invalidate)
  # XXX: hack. If there's nothing in the given stratum, don't do
  # anything. This can arise if we have an orphan scanner whose input is a
  # non-monotonic operator; the stratum(LHS) = stratum(RHS) + 1, but there's
  # nothing else in stratum(LHS).
  return if @push_sorted_elems[stratum].nil?

  rescan_len = rescan.size
  invalidate_len = invalidate.size
  while true
    # Ask each element if it wants to add itself to either set, depending on
    # who else is in those sets already.
    @push_sorted_elems[stratum].each {|t| t.add_rescan_invalidate(rescan, invalidate)}
    break if rescan_len == rescan.size and invalidate_len == invalidate.size
    rescan_len = rescan.size
    invalidate_len = invalidate.size
  end
end
resolve_imports() click to toggle source

absorb rules and dependencies from imported modules. The corresponding module instantiations would themselves have resolved their own imports.

# File lib/bud.rb, line 232
def resolve_imports
  import_tbl = import_defs

  import_tbl.each_pair do |local_name, mod_name|
    # corresponding to "import <mod_name> => :<local_name>"
    mod_inst = send(local_name)
    if mod_inst.nil?
      # create wrapper instances
      #puts "=== resolving #{self}.#{mod_name} => #{local_name}"
      klass = module_wrapper_class(mod_name)
      qlocal_name = toplevel? ? local_name.to_s : "#{self.qualified_name}.#{local_name}"
      # this instantiation will resolve the imported module's own imports
      mod_inst = klass.new(:toplevel => toplevel, :qualified_name => qlocal_name)
      instance_variable_set("@#{local_name}", mod_inst)
    end
    # Absorb the module wrapper's user-defined state.
    mod_inst.tables.each_pair do |name, t|
      # Don't try to import module definitions for builtin Bud state. Note
      # that @tables only includes the builtin tables, because resolve_imports
      # is called before user-defined state blocks are run.
      unless @tables.has_key? t.tabname
        qname = "#{local_name}.#{name}"
        tables[qname.to_sym] = t
      end
    end
    mod_inst.lattices.each_pair do |name, t|
      qname = "#{local_name}.#{name}".to_sym
      raise Bud::Error if lattices.has_key? qname
      lattices[qname] = t
    end
    mod_inst.t_rules.each do |imp_rule|
      qname = "#{local_name}.#{imp_rule.lhs}"
      self.t_rules << [imp_rule.bud_obj, imp_rule.rule_id, qname, imp_rule.op,
                       imp_rule.src, imp_rule.orig_src, imp_rule.unsafe_funcs_called]
    end
    mod_inst.t_depends.each do |imp_dep|
      qlname = "#{local_name}.#{imp_dep.lhs}"
      qrname = "#{local_name}.#{imp_dep.body}"
      self.t_depends << [imp_dep.bud_obj, imp_dep.rule_id, qlname,
                         imp_dep.op, qrname, imp_dep.nm, imp_dep.in_body]
    end
    mod_inst.t_provides.each do |imp_pro|
      qintname = "#{local_name}.#{imp_pro.interface}"
      self.t_provides << [qintname, imp_pro.input]
    end
    mod_inst.channels.each do |name, ch|
      qname = "#{local_name}.#{name}"
      @channels[qname.to_sym] = ch
    end
    mod_inst.dbm_tables.each do |name, t|
      qname = "#{local_name}.#{name}"
      @dbm_tables[qname.to_sym] = t
    end
    mod_inst.periodics.each do |p|
      qname = "#{local_name}.#{p.pername}"
      @periodics << [qname.to_sym, p.period]
    end
  end

  nil
end
schedule_and_wait() { || ... } click to toggle source

Schedule a block to be evaluated by EventMachine in the future, and block until this has happened.

# File lib/bud.rb, line 960
def schedule_and_wait
  # If EM isn't running, just run the user's block immediately
  # XXX: not clear that this is the right behavior
  unless EventMachine::reactor_running?
    yield
    return
  end

  q = Queue.new
  EventMachine::schedule do
    ret = false
    begin
      yield
    rescue Exception
      ret = $!
    end
    q.push(ret)
  end

  resp = q.pop
  raise resp if resp
end
start_reactor() click to toggle source
# File lib/bud.rb, line 929
def start_reactor
  return if EventMachine::reactor_running?

  EventMachine::error_handler do |e|
    # Only print a backtrace if a non-Bud::Error is raised (this presumably
    # indicates an unexpected failure).
    if e.class <= Bud::Error
      puts "#{e.class}: #{e}"
    else
      puts "Unexpected Bud error: #{e.inspect}"
      puts e.backtrace.join("\n")
    end
    Bud.shutdown_all_instances
    raise e
  end

  # Block until EM has successfully started up.
  q = Queue.new
  # This thread helps us avoid race conditions on the start and stop of
  # EventMachine's event loop.
  Thread.new do
    EventMachine.run do
      q.push(true)
    end
  end
  # Block waiting for EM's event loop to start up.
  q.pop
end