module Bud
The root Bud
module. To cause an instance of Bud
to begin executing, there are three main options:
-
Synchronously. To do this, instantiate your program and then call tick() one or more times; each call evaluates a single
Bud
timestep. In this mode, any network messages or timer events that occur will be buffered until the next call to tick(). This is mostly intended for “one-shot” programs that compute a single result and then terminate, or for interactively “single-stepping” through the execution of an event-driven system. -
In a separate thread in the foreground. To do this, instantiate your program and then call
run_fg
(). TheBud
interpreter will then run, handling network events and evaluating new timesteps as appropriate. Therun_fg
() method will not return unless an error occurs. -
In a separate thread in the background. To do this, instantiate your program and then call
run_bg
(). TheBud
interpreter will run asynchronously. To interact withBud
(e.g., insert additional data or inspect the state of aBud
collection), use thesync_do
andasync_do
methods.
Most programs should use method #3. Note that in all three cases, the stop() method should be used to shutdown a Bud
instance and release any resources it is using.
Constants
- HAVE_ZOOKEEPER
- VERSION
Attributes
Public Class Methods
options to the Bud
runtime are passed in a hash, with the following keys
-
network configuration
-
:ip
IP address string for this instance -
:port
port number for this instance -
:ext_ip
IP address at which external nodes can contact this instance -
:ext_port
port number to go with:ext_ip
-
-
operating system interaction
-
:stdin
if non-nil, reading from thestdio
collection results in reading from thisIO
handle -
:stdout
writing to thestdio
collection results in writing to thisIO
handle; defaults to$stdout
-
:signal_handling
how to handleSIGINT
andSIGTERM
. If:none
, these signals are ignored. Else shutdown all bud instances.
-
-
tracing and output
-
:quiet
if true, suppress certain messages -
:trace
if true, generatebudvis
outputs -
:rtrace
if true, generatebudplot
outputs -
:dump_rewrite
if true, dump results of internal rewriting of Bloom code to a file -
:print_wiring
if true, print the wiring diagram of the program to stdout -
:metrics
if true, dumps a hash of internal performance metrics
-
-
controlling execution
-
:tag
a name for this instance, suitable for display during tracing and visualization -
:channel_filter
a code block that can be used to filter the network messages delivered to thisBud
instance. At the start of each tick, the code block is invoked for every channel with any incoming messages; the code block is passed the name of the channel and an array containing the inbound messages. It should return a two-element array containing “accepted” and “postponed” messages, respectively. Accepted messages are delivered during this tick, and postponed messages are buffered and passed to the filter in subsequent ticks. Any messages that aren't in either array are dropped.
-
-
storage configuration
-
:dbm_dir
filesystem directory to hold DBM-backed collections -
:dbm_truncate
if true, DBM-backed collections are opened withOTRUNC
-
# File lib/bud.rb, line 111 def initialize(options={}) options[:dump_rewrite] ||= ENV["BUD_DUMP_REWRITE"].to_i > 0 options[:dump_ast] ||= ENV["BUD_DUMP_AST"].to_i > 0 options[:print_wiring] ||= ENV["BUD_PRINT_WIRING"].to_i > 0 @qualified_name = "" @tables = {} @lattices = {} @channels = {} @dbm_tables = {} @zk_tables = {} @stratified_rules = [] @push_elems = {} @callbacks = {} @callback_id = 0 @shutdown_callbacks = {} @shutdown_callback_id = 0 @post_shutdown_callbacks = [] @timers = [] @app_tables = [] @inside_tick = false @tick_clock_time = nil @budtime = 0 @inbound = {} @done_bootstrap = false @done_wiring = false @instance_id = ILLEGAL_INSTANCE_ID # Assigned when we start running @metrics = {} @endtime = nil @this_stratum = -1 @this_rule_id = -1 @push_sorted_elems = nil @running_async = false @bud_started = false # Setup options (named arguments), along with default values @options = options.clone @options[:ip] ||= "127.0.0.1" @ip = @options[:ip] @options[:port] ||= 0 @options[:port] = @options[:port].to_i # NB: If using an ephemeral port (specified by port = 0), the actual port # number won't be known until we start EM load_lattice_defs builtin_state resolve_imports call_state_methods @viz = VizOnline.new(self) if @options[:trace] @rtracer = RTrace.new(self) if @options[:rtrace] do_rewrite if toplevel == self # initialize per-stratum state @num_strata = @stratified_rules.length @scanners = @num_strata.times.map{{}} @push_sources = @num_strata.times.map{{}} @push_joins = @num_strata.times.map{[]} @merge_targets = @num_strata.times.map{Set.new} end end
Private Class Methods
Fork a new process. This is identical to Kernel#fork, except that it also cleans up Bud
and EventMachine-related state. As with Kernel#fork, the caller supplies a code block that is run in the child process; the PID of the child is returned by this method.
# File lib/bud.rb, line 1290 def self.do_fork Kernel.fork do srand # This is somewhat grotty: we basically clone what EM::fork_reactor does, # except that we don't want the user-supplied block to be invoked by the # reactor thread. if EventMachine::reactor_running? EventMachine::stop_event_loop EventMachine::release_machine EventMachine::instance_variable_set('@reactor_running', false) end # Shutdown all the Bud instances inherited from the parent process, but # don't invoke their shutdown callbacks Bud.shutdown_all_instances(false) $got_shutdown_signal = false $signal_handler_setup = false yield end end
Signal handling. If multiple Bud
instances are running inside a single process, we want a SIGINT or SIGTERM signal to cleanly shutdown all of them. Note that we don't try to do any significant work in the signal handlers themselves: we just set a flag that is checked by a periodic timer.
# File lib/bud.rb, line 1327 def self.init_signal_handlers(b) $signal_lock.synchronize { # Initialize or re-initialize signal handlers if necessary. unless b.options[:signal_handling] == :none || $signal_handler_setup EventMachine::PeriodicTimer.new(SIGNAL_CHECK_PERIOD) do if $got_shutdown_signal Bud.shutdown_all_instances $got_shutdown_signal = false end end ["INT", "TERM"].each do |signal| Signal.trap(signal) { $got_shutdown_signal = true } end $signal_handler_setup = true end $instance_id += 1 $bud_instances[$instance_id] = b return $instance_id } end
# File lib/bud.rb, line 1352 def self.shutdown_all_instances(do_shutdown_cb=true) instances = nil $signal_lock.synchronize { instances = $bud_instances.clone } instances.each_value {|b| b.stop(false, do_shutdown_cb) } end
Note that this affects anyone else in the same process who happens to be using EventMachine! This is also a non-blocking call; to block until EM has completely shutdown, join on EM::reactor_thread.
# File lib/bud.rb, line 1315 def self.stop_em_loop EventMachine::stop_event_loop # If another instance of Bud is started later, we'll need to reinitialize # the signal handlers (since they depend on EM). $signal_handler_setup = false end
Public Instance Methods
aggregate method to be used in Bud::BudCollection.group
. accumulates all x inputs into a set.
# File lib/bud/aggs.rb, line 207 def accum(x) [Accum.new, x] end
aggregate method to be used in Bud::BudCollection.group
. accumulates x, y inputs into a set of pairs (two element arrays).
# File lib/bud/aggs.rb, line 223 def accum_pair(x, y) [AccumPair.new, x, y] end
Like sync_do
, but does not block the caller's thread: the given callback will be invoked at some future time. Note that calls to async_do
respect FIFO order.
# File lib/bud.rb, line 818 def async_do EventMachine::schedule do yield if block_given? # Do another tick, in case the user-supplied block inserted any data tick_internal end end
aggregate method to be used in Bud::BudCollection.group
. computes average of a multiset of x values
# File lib/bud/aggs.rb, line 191 def avg(x) [Avg.new, x] end
# File lib/bud/aggs.rb, line 81 def bool_and(x) [BooleanAnd.new, x] end
# File lib/bud/aggs.rb, line 95 def bool_or(x) [BooleanOr.new, x] end
Returns the wallclock time associated with the current Bud
tick. That is, this value is guaranteed to remain the same for the duration of a single tick, but will likely change between ticks.
# File lib/bud.rb, line 1181 def bud_clock raise Bud::Error, "bud_clock undefined outside tick" unless @inside_tick @tick_clock_time ||= Time.now @tick_clock_time end
# File lib/bud.rb, line 778 def cancel_shutdown_cb(id) schedule_and_wait do raise Bud::Error unless @shutdown_callbacks.has_key? id @shutdown_callbacks.delete(id) end end
declare a transient network collection. default schema [:address, :val] => []
# File lib/bud/state.rb, line 115 def channel(name, schema=nil, loopback=false) define_collection(name) @tables[name] = Bud::BudChannel.new(name, self, schema, loopback) @channels[name] = @tables[name] end
exemplary aggregate method to be used in Bud::BudCollection.group
. arbitrarily but deterministically chooses among x entries being aggregated.
# File lib/bud/aggs.rb, line 114 def choose(x) [Choose.new, x] end
exemplary aggregate method to be used in Bud::BudCollection.group
. randomly chooses among x entries being aggregated.
# File lib/bud/aggs.rb, line 144 def choose_rand(x=nil) [ChooseOneRand.new, x] end
declare a collection-generating expression. default schema [:key] => [:val]
.
# File lib/bud/state.rb, line 64 def coll_expr(name, expr, schema=nil) define_collection(name) @tables[name] = Bud::BudCollExpr.new(name, self, expr, schema) end
Return the stratum number of the given collection. NB: if a collection does not appear on the lhs or rhs of any rules, it is not currently assigned to a strata.
# File lib/bud.rb, line 1190 def collection_stratum(collection) t_stratum.each do |t| return t.stratum if t.predicate == collection end return nil end
aggregate method to be used in Bud::BudCollection.group
. counts number of entries aggregated. argument is ignored.
# File lib/bud/aggs.rb, line 171 def count(x=nil) [Count.new] end
A common special case for sync_callback
: block on a delta to a table.
# File lib/bud.rb, line 908 def delta(out_tbl) sync_callback(nil, nil, out_tbl) end
Evaluate all bootstrap blocks and tick deltas
# File lib/bud.rb, line 1052 def do_bootstrap # Evaluate bootstrap for imported modules @this_rule_context = self imported = import_defs.keys imported.each do |mod_alias| wrapper = import_instance mod_alias wrapper.do_bootstrap end self.class.ancestors.reverse.each do |anc| anc.instance_methods(false).each do |m| if /^__bootstrap__/.match m self.method(m.to_sym).call end end end bootstrap if toplevel == self @tables.each_value {|t| t.bootstrap} @lattices.each_value {|l| l.bootstrap} end @done_bootstrap = true end
# File lib/bud.rb, line 1076 def do_invalidate_rescan @default_rescan.each {|elem| elem.rescan = true} @default_invalidate.each {|elem| elem.invalidated = true # Call tick on tables here itself. The rest below elem.invalidate_cache unless elem.class <= PushElement } # The following loop invalidates additional (non-default) elements and # tables that depend on the run-time invalidation state of a table. Loop # once to set the flags. each_scanner do |scanner, stratum| if scanner.rescan scanner.rescan_set.each {|e| e.rescan = true} scanner.invalidate_set.each {|e| e.invalidated = true e.invalidate_cache unless e.class <= PushElement } end end # Loop a second time to actually call invalidate_cache. We can't merge this # with the loops above because some versions of invalidate_cache (e.g., # join) depend on the rescan state of other elements. @num_strata.times do |stratum| @push_sorted_elems[stratum].each {|e| e.invalidate_cache if e.invalidated} end end
# File lib/bud.rb, line 848 def do_register_callback(tbl_name, &blk) unless @tables.has_key? tbl_name raise Bud::Error, "no such collection: #{tbl_name}" end raise Bud::Error if @callbacks.has_key? @callback_id @callbacks[@callback_id] = [tbl_name, blk] cb_id = @callback_id @callback_id += 1 return cb_id end
declare a collection to be read from filename
. rhs of statements only
# File lib/bud/state.rb, line 131 def file_reader(name, filename) define_collection(name) @tables[name] = Bud::BudFileReader.new(name, filename, self) end
# File lib/bud.rb, line 198 def import_instance(name) name = "@" + name.to_s instance_variable_get(name) if instance_variable_defined? name end
XXX make tag specific
# File lib/bud.rb, line 913 def inspect "#{self.class}:#{self.object_id.to_s(16)}" end
Returns the internal IP and port. See ip_port.
# File lib/bud.rb, line 1041 def int_ip_port raise Bud::Error, "int_ip_port called before port defined" if @port.nil? and @options[:port] == 0 @port.nil? ? "#{@ip}:#{@options[:port]}" : "#{@ip}:#{@port}" end
declare a transient collection to be an input or output interface
# File lib/bud/state.rb, line 51 def interface(mode, name, schema=nil) define_collection(name) t_provides << [name.to_s, mode] @tables[name] = (mode ? BudInputInterface : BudOutputInterface).new(name, self, schema) end
an alternative approach to declaring interfaces
# File lib/bud/state.rb, line 157 def interfaces(direction, collections) mode = case direction when :input then true when :output then false else raise Bud::CompileError, "unrecognized interface type #{direction}" end collections.each do |tab| t_provides << [tab.to_s, mode] end end
# File lib/bud.rb, line 1030 def ip options[:ext_ip] ? "#{@options[:ext_ip]}" : "#{@ip}" end
Returns the IP and port of the Bud
instance as a string. In addition to the local IP and port, the user may define an external IP and/or port. The external version of each is returned if available. If not, the local version is returned. There are use cases for mixing and matching local and external. local_ip:external_port would be if you have local port forwarding, and external_ip:local_port would be if you're in a DMZ, for example.
# File lib/bud.rb, line 1025 def ip_port raise Bud::Error, "ip_port called before port defined" if port.nil? ip.to_s + ":" + port.to_s end
Define methods to implement the state declarations for every registered kind of lattice.
# File lib/bud/state.rb, line 171 def load_lattice_defs Bud::Lattice.global_mfuncs.each do |m| next if RuleRewriter::MONOTONE_WHITELIST.include? m if Bud::BudCollection.instance_methods.include? m.to_s puts "monotone method #{m} conflicts with non-monotonic method in BudCollection" end end Bud::Lattice.global_morphs.each do |m| next if RuleRewriter::MONOTONE_WHITELIST.include? m if Bud::BudCollection.instance_methods.include? m.to_s puts "morphism #{m} conflicts with non-monotonic method in BudCollection" end end # Sanity-check lattice definitions # XXX: We should do this only once per lattice Bud::Lattice.lattice_kinds.each do |wrap_name, klass| unless klass.method_defined? :merge raise Bud::CompileError, "lattice #{wrap_name} does not define a merge function" end # If a method is marked as monotone in any lattice, every lattice that # declares a method of that name must also mark it as monotone. meth_list = klass.instance_methods(false).to_set Bud::Lattice.global_mfuncs.each do |m| next unless meth_list.include? m.to_s unless klass.mfuncs.include? m raise Bud::CompileError, "method #{m} in #{wrap_name} must be monotone" end end # Apply a similar check for morphs Bud::Lattice.global_morphs.each do |m| next unless meth_list.include? m.to_s unless klass.morphs.include? m raise Bud::CompileError, "method #{m} in #{wrap_name} must be a morph" end end # Similarly, check for non-monotone lattice methods that are found in the # builtin list of monotone operators. The "merge" method is implicitly # monotone (XXX: should it be declared as a morph or monotone function?) meth_list.each do |m_str| m = m_str.to_sym next unless RuleRewriter::MONOTONE_WHITELIST.include? m # XXX: ugly hack. We want to allow lattice class implementations to # define their own equality semantics. next if m == :== unless klass.mfuncs.include?(m) || klass.morphs.include?(m) || m == :merge raise Bud::CompileError, "method #{m} in #{wrap_name} must be monotone" end end # XXX: replace "self" with toplevel? self.singleton_class.send(:define_method, wrap_name) do |lat_name| define_lattice(lat_name) @lattices[lat_name] = Bud::LatticeWrapper.new(lat_name, klass, self) end end end
declare a transient network collection that delivers facts back to the current Bud
instance. This is syntax sugar for a channel that always delivers to the IP/port of the current Bud
instance. Default schema [:key] => [:val]
# File lib/bud/state.rb, line 125 def loopback(name, schema=nil) schema ||= {[:key] => [:val]} channel(name, schema, true) end
exemplary aggregate method to be used in Bud::BudCollection.group
. computes maximum of x entries aggregated.
# File lib/bud/aggs.rb, line 67 def max(x) [Max.new, x] end
exemplary aggregate method to be used in Bud::BudCollection.group
. computes minimum of x entries aggregated.
# File lib/bud/aggs.rb, line 50 def min(x) [Min.new, x] end
# File lib/bud.rb, line 173 def module_wrapper_class(mod) class_name = "#{mod}__wrap" begin klass = Module.const_get(class_name.to_sym) unless klass.is_a? Class raise Bud::Error, "internal error: #{class_name} is in use" end rescue NameError # exception if class class_name doesn't exist end klass ||= eval "class #{class_name}; include Bud; include #{mod}; end" klass end
Register a callback that will be invoked when this instance of Bud
is shutting down. XXX: The naming of this method (and cancel_shutdown_cb
) is inconsistent with the naming of register_callback
and friends.
# File lib/bud.rb, line 766 def on_shutdown(&blk) # Start EM if not yet started start_reactor rv = nil schedule_and_wait do rv = @shutdown_callback_id @shutdown_callbacks[@shutdown_callback_id] = blk @shutdown_callback_id += 1 end return rv end
Pause an instance of Bud
that is running asynchronously. That is, this method allows a Bud
instance operating in run_bg
or run_fg
mode to be switched to “single-stepped” mode; timesteps can be manually invoked via tick(). To switch back to running Bud
asynchronously, call run_bg
().
# File lib/bud.rb, line 698 def pause schedule_and_wait do @running_async = false end end
declare a collection to be auto-populated every period
seconds. schema [:key] => [:val]
. rhs of statements only.
# File lib/bud/state.rb, line 138 def periodic(name, period=1) define_collection(name) raise Bud::Error if @periodics.has_key? [name] @periodics << [name, period] @tables[name] = Bud::BudPeriodic.new(name, self) end
# File lib/bud.rb, line 1034 def port return nil if @port.nil? and @options[:port] == 0 and not @options[:ext_port] return @options[:ext_port] ? @options[:ext_port] : (@port.nil? ? @options[:port] : @port) end
Register a callback that will be invoked when after this instance of Bud
has been shutdown.
# File lib/bud.rb, line 787 def post_shutdown(&blk) # Start EM if not yet started start_reactor schedule_and_wait do @post_shutdown_callbacks << blk end end
# File lib/bud/state.rb, line 102 def readonly(name, schema=nil) define_collection(name) @tables[name] = Bud::BudReadOnly.new(name, self, schema) end
Register a new callback. Given the name of a Bud
collection, this method arranges for the given block to be invoked at the end of any tick in which any tuples have been inserted into the specified collection. The code block is passed the collection as an argument; this provides a convenient way to examine the tuples inserted during that fixpoint. (Note that because the Bud
runtime is blocked while the callback is invoked, it can also examine any other Bud
state freely.)
Note that registering callbacks on persistent collections (e.g., tables, syncs and stores) is probably not wise: as long as any tuples are stored in the collection, the callback will be invoked at the end of every tick.
# File lib/bud.rb, line 837 def register_callback(tbl_name, &blk) # We allow callbacks to be added before or after EM has been started. To # simplify matters, we start EM if it hasn't been started yet. start_reactor cb_id = nil schedule_and_wait do cb_id = do_register_callback(tbl_name, &blk) end return cb_id end
Run Bud
in the background (in a different thread). This means that the Bud
interpreter will run asynchronously from the caller, so care must be used when interacting with it. For example, it is not safe to directly examine Bud
collections from the caller's thread (see async_do
and sync_do
).
This instance of Bud
will run until stop() is called.
# File lib/bud.rb, line 637 def run_bg start schedule_and_wait do if @running_async raise Bud::Error, "run_bg called on already-running Bud instance" end @running_async = true # Consume any events received while we weren't running async tick_internal end @rtracer.sleep if options[:rtrace] end
Run Bud
in the “foreground” – the caller's thread will be used to run the Bud
interpreter. This means this method won't return unless an error occurs or Bud
is halted. It is often more useful to run Bud
in a different thread: see run_bg.
# File lib/bud.rb, line 708 def run_fg # If we're called from the EventMachine thread (and EM is running), blocking # the current thread would imply deadlocking ourselves. if Thread.current == EventMachine::reactor_thread and EventMachine::reactor_running? raise Bud::Error, "cannot invoke run_fg from inside EventMachine" end q = Queue.new # Note that this must be a post-shutdown callback: if this is the only # thread, then the program might exit after run_fg() returns. If run_fg() # blocked on a normal shutdown callback, the program might exit before the # other shutdown callbacks have a chance to run. post_shutdown do q.push(true) end run_bg # Block caller's thread until Bud has shutdown q.pop report_metrics if options[:metrics] end
declare a transient collection. default schema [:key] => [:val]
# File lib/bud/state.rb, line 97 def scratch(name, schema=nil) define_collection(name) @tables[name] = Bud::BudScratch.new(name, self, schema) end
Startup a Bud
instance if it is not currently started. This starts EventMachine (if needed) and binds to a UDP server socket. If do_tick is true, we also execute a single Bloom timestep. Regardless, calling this method does NOT cause Bud
to begin running asynchronously (see run_bg
).
# File lib/bud.rb, line 657 def start(do_tick=false) start_reactor schedule_and_wait do do_startup unless @bud_started tick_internal if do_tick end end
Shutdown a Bud
instance and release any resources that it was using. This method blocks until Bud
has been shutdown. If stop_em
is true, the EventMachine event loop is also shutdown; this will interfere with the execution of any other Bud
instances in the same process (as well as anything else that happens to use EventMachine). We always shutdown the EM loop if there are no more running Bud
instances (this does interfere with other EM-using apps, but it is necessary).
# File lib/bud.rb, line 737 def stop(stop_em=false, do_shutdown_cb=true) schedule_and_wait do do_shutdown(do_shutdown_cb) end # If we're shutting down the last active Bud instance, shutdown the EM event # loop as well. This is probably good practice in general, but it also # prevents weird EM behavior -- it seems as though EM::ConnectionNotBound # exceptions can be raised if the EM event loop is left running and # subsequent events arrive. $signal_lock.synchronize { stop_em = true if $bud_instances.empty? and EventMachine::reactor_running? } if stop_em Bud.stop_em_loop unless Thread.current == EventMachine::reactor_thread EventMachine::reactor_thread.join end end report_metrics if options[:metrics] end
# File lib/bud/state.rb, line 81 def store(name, storage, schema=nil) define_collection(name) case storage when :zookeeper # treat "schema" as a hash of options options = schema raise Bud::Error, "Zookeeper tables require a :path option" if options[:path].nil? options[:addr] ||= "localhost:2181" @tables[name] = Bud::BudZkTable.new(name, options[:path], options[:addr], self) @zk_tables[name] = @tables[name] else raise Bud::Error, "unknown async storage engine #{storage.to_s}" end end
aggregate method to be used in Bud::BudCollection.group
. computes sum of x entries aggregated.
# File lib/bud/aggs.rb, line 156 def sum(x) [Sum.new, x] end
declare a syncronously-flushed persistent collection. default schema [:key] => [:val]
.
# File lib/bud/state.rb, line 70 def sync(name, storage, schema=nil) define_collection(name) case storage when :dbm @tables[name] = Bud::BudDbmTable.new(name, self, schema) @dbm_tables[name] = @tables[name] else raise Bud::Error, "unknown synchronous storage engine #{storage.to_s}" end end
sync_callback
supports synchronous interaction with Bud
modules. The caller supplies the name of an input collection, a set of tuples to insert, and an output collection on which to 'listen.' The call blocks until tuples are inserted into the output collection: these are returned to the caller.
# File lib/bud.rb, line 872 def sync_callback(in_tbl, tupleset, out_tbl) q = Queue.new # If the runtime shuts down before we see anything in the output collection, # make sure we hear about it so we can raise an error # XXX: Using two separate callbacks here is ugly. shutdown_cb = on_shutdown do q.push :callback end cb = nil sync_do { if in_tbl t = @tables[in_tbl] if t.class <= Bud::BudChannel or t.class <= Bud::BudZkTable t <~ tupleset else t <+ tupleset end end cb = do_register_callback(out_tbl) do |c| q.push c.to_a end } result = q.pop if result == :callback # Don't try to unregister the callbacks first: runtime is already shutdown raise Bud::ShutdownWithCallbacksError, "Bud instance shutdown before sync_callback completed" end unregister_callback(cb) cancel_shutdown_cb(shutdown_cb) return result end
Given a block, evaluate that block inside the background Ruby thread at some time in the future, and then perform a Bloom tick. Because the block is evaluate inside the background Ruby thread, the block can safely examine Bud
state. Note that calling sync_do
blocks the caller until the block has been evaluated; for a non-blocking version, see async_do.
Note that the block is invoked after one Bud
timestep has ended but before the next timestep begins. Hence, synchronous accumulation (<=) into a Bud
scratch collection in a callback is typically not a useful thing to do: when the next tick begins, the content of any scratch collections will be emptied, which includes anything inserted by a sync_do
block using <=. To avoid this behavior, insert into scratches using <+.
# File lib/bud.rb, line 807 def sync_do schedule_and_wait do yield if block_given? # Do another tick, in case the user-supplied block inserted any data tick_internal end end
declare an in-memory, non-transient collection. default schema [:key] => [:val]
.
# File lib/bud/state.rb, line 58 def table(name, schema=nil) define_collection(name) @tables[name] = Bud::BudTable.new(name, self, schema) end
declare a scratch in a bloom statement lhs. schema inferred from rhs.
# File lib/bud/state.rb, line 108 def temp(name) define_collection(name) # defer schema definition until merge @tables[name] = Bud::BudTemp.new(name, self, nil, true) end
From client code, manually trigger a timestep of Bloom execution.
# File lib/bud.rb, line 1047 def tick start(true) end
One timestep of Bloom execution. This MUST be invoked from the EventMachine thread; it is not intended to be called directly by client code.
# File lib/bud.rb, line 1107 def tick_internal puts "#{object_id}/#{port} : ============================================= (#{@budtime})" if $BUD_DEBUG begin starttime = Time.now if options[:metrics] if options[:metrics] and not @endtime.nil? @metrics[:betweentickstats] ||= initialize_stats @metrics[:betweentickstats] = running_stats(@metrics[:betweentickstats], starttime - @endtime) end @inside_tick = true unless @done_bootstrap do_bootstrap do_wiring else # inform tables and elements about beginning of tick. @app_tables.each {|t| t.tick} do_invalidate_rescan end receive_inbound # compute fixpoint for each stratum in order @stratified_rules.each_with_index do |rules,stratum| fixpoint = false first_iter = true until fixpoint @scanners[stratum].each_value {|s| s.scan(first_iter)} fixpoint = true first_iter = false # flush any tuples in the pipes @push_sorted_elems[stratum].each {|p| p.flush} # tick deltas on any merge targets and look for more deltas # check to see if any joins saw a delta @push_joins[stratum].each do |p| if p.found_delta fixpoint = false p.tick_deltas end end @merge_targets[stratum].each do |t| fixpoint = false if t.tick_deltas end end # push end-of-fixpoint @push_sorted_elems[stratum].each do |p| p.stratum_end end @merge_targets[stratum].each do |t| t.flush_deltas end end @viz.do_cards if @options[:trace] do_flush invoke_callbacks @budtime += 1 @inbound.clear @reset_list.each {|e| e.invalidated = false; e.rescan = false} ensure @inside_tick = false @tick_clock_time = nil end if options[:metrics] @endtime = Time.now @metrics[:tickstats] ||= initialize_stats @metrics[:tickstats] = running_stats(@metrics[:tickstats], @endtime - starttime) end end
# File lib/bud.rb, line 186 def toplevel @toplevel = (@options[:toplevel] || self) end
# File lib/bud.rb, line 194 def toplevel? toplevel.object_id == self.object_id end
Unregister the callback that has the given ID.
# File lib/bud.rb, line 861 def unregister_callback(id) schedule_and_wait do raise Bud::Error, "missing callback: #{id.inspect}" unless @callbacks.has_key? id @callbacks.delete(id) end end
Are we currently in the process of wiring together the dataflow?
# File lib/bud.rb, line 208 def wiring? toplevel? ? (@done_bootstrap && !@done_wiring) : toplevel.wiring? end
Private Instance Methods
Builtin Bud
state (predefined collections). We could define this using the standard state block syntax, but we want to ensure that builtin state is initialized before user-defined state.
# File lib/bud.rb, line 1203 def builtin_state # We expect there to be no previously-defined tables raise Bud::Error unless @tables.empty? loopback :localtick, [:col1] @stdio = terminal :stdio scratch :halt, [:key] @periodics = table :periodics_tbl, [:pername] => [:period] # for BUD reflection table :t_cycle, [:predicate, :via, :neg, :temporal] table :t_depends, [:bud_obj, :rule_id, :lhs, :op, :body] => [:nm, :in_body] table :t_provides, [:interface] => [:input] table :t_rule_stratum, [:bud_obj, :rule_id] => [:stratum] table :t_rules, [:bud_obj, :rule_id] => [:lhs, :op, :src, :orig_src, :unsafe_funcs_called] table :t_stratum, [:predicate] => [:stratum] table :t_table_info, [:tab_name, :tab_type] table :t_table_schema, [:tab_name, :col_name, :ord, :loc] table :t_underspecified, t_provides.schema # Identify builtin tables as such @builtin_tables = @tables.clone if toplevel end
If module Y is a parent module of X, X's state block might reference state defined in Y. Hence, we want to invoke Y's state block first. However, when “import” and “include” are combined, we can't use the inheritance hierarchy to do this. When a module Z is imported, the import process inlines all the modules Z includes into a single module. Hence, we can no longer rely on the inheritance hierarchy to respect dependencies between modules. To fix this, we add an increasing ID to each state block's method name (assigned according to the order in which the state blocks are defined); we then sort by this order before invoking the state blocks.
# File lib/bud.rb, line 303 def call_state_methods meth_map = {} # map from ID => [Method] self.class.instance_methods.each do |m| next unless m =~ /^__state(\d+)__/ id = Regexp.last_match.captures.first.to_i meth_map[id] ||= [] meth_map[id] << self.method(m) end meth_map.keys.sort.each do |i| meth_map[i].each {|m| m.call} end end
methods for registering collection types
# File lib/bud/state.rb, line 4 def check_collection_name(name) if @tables.has_key? name or @lattices.has_key? name raise Bud::CompileError, "collection already exists: #{name}" end # Rule out collection names that use reserved words, including # previously-defined method names. reserved = eval "defined?(#{name})" unless reserved.nil? raise Bud::CompileError, "symbol :#{name} reserved, cannot be used as collection name" end end
# File lib/bud/state.rb, line 17 def define_collection(name) check_collection_name(name) self.singleton_class.send(:define_method, name) do |*args, &blk| if blk.nil? return @tables[name] else return @tables[name].pro(&blk) end end end
# File lib/bud/state.rb, line 29 def define_lattice(name) check_collection_name(name) self.singleton_class.send(:define_method, name) do |*args, &blk| if blk.nil? return @lattices[name] else return @lattices[name].pro(&blk) end end end
“Flush” any tuples that need to be flushed. This does two things:
-
Emit outgoing tuples in channels and ZK tables.
-
Commit to disk any changes made to on-disk tables.
# File lib/bud.rb, line 1242 def do_flush @channels.each_value { |c| c.flush } @zk_tables.each_value { |t| t.flush } @dbm_tables.each_value { |t| t.flush } end
# File lib/bud.rb, line 612 def do_rewrite @meta_parser = BudMeta.new(self) @stratified_rules = @meta_parser.meta_rewrite end
# File lib/bud.rb, line 983 def do_shutdown(do_shutdown_cb=true) # Silently ignore duplicate shutdown requests or attempts to shutdown an # instance that hasn't been started yet. return if @instance_id == ILLEGAL_INSTANCE_ID $signal_lock.synchronize { raise Bud::Error unless $bud_instances.has_key? @instance_id $bud_instances.delete @instance_id @instance_id = ILLEGAL_INSTANCE_ID } unregister_callback(@halt_cb) if do_shutdown_cb @shutdown_callbacks.each_value {|cb| cb.call} end @timers.each {|t| t.cancel} @tables.each_value {|t| t.close} if EventMachine::reactor_running? and @bud_started @dsock.close_connection end @bud_started = false @running_async = false if do_shutdown_cb @post_shutdown_callbacks.each {|cb| cb.call} end end
# File lib/bud.rb, line 1009 def do_start_server @dsock = EventMachine::open_datagram_socket(@ip, @options[:port], BudServer, self, @options[:channel_filter]) @port = Socket.unpack_sockaddr_in(@dsock.get_sockname)[0] end
# File lib/bud.rb, line 666 def do_startup raise Bud::Error, "EventMachine not started" unless EventMachine::reactor_running? raise Bud::Error unless EventMachine::reactor_thread? @instance_id = Bud.init_signal_handlers(self) do_start_server @bud_started = true # Initialize periodics @periodics.each do |p| @timers << make_periodic_timer(p.pername, p.period) end # Arrange for Bud to read from stdin if enabled. Note that we can't do this # earlier because we need to wait for EventMachine startup. @stdio.start_stdin_reader if @options[:stdin] @zk_tables.each_value {|t| t.start_watchers} @halt_cb = register_callback(:halt) do |t| stop if t.first.key == :kill Bud.shutdown_all_instances Bud.stop_em_loop end end end
# File lib/bud.rb, line 317 def do_wiring @stratified_rules.each_with_index { |rules, stratum| eval_rules(rules, stratum) } # Prepare list of tables that will be actively used at run time. First, all # the user-defined tables and lattices. We start @app_tables off as a set, # then convert to an array later. @app_tables = (@tables.keys - @builtin_tables.keys).map {|t| @tables[t]}.to_set @app_tables.merge(@lattices.values) # Check scan and merge_targets to see if any builtin_tables need to be added as well. @scanners.each do |scs| @app_tables.merge(scs.values.map {|s| s.collection}) end @merge_targets.each do |mts| #mts == merge_targets at stratum @app_tables.merge(mts) end @app_tables = @app_tables.to_a # for each stratum create a sorted list of push elements in topological order @push_sorted_elems = [] @scanners.each do |scs| # scs's values constitute scanners at a stratum # start with scanners and transitively add all reachable elements in a # breadth-first order working = scs.values seen = Set.new(working) sorted_elems = [] # sorted elements in this stratum while not working.empty? sorted_elems.concat(working) wired_to = [] working.each do |e| e.wirings.each do |out| if ((out.class <= PushElement || out.class <= LatticePushElement) and not seen.member?(out)) seen << out wired_to << out end end end working = wired_to end @push_sorted_elems << sorted_elems end @merge_targets.each_with_index do |stratum_targets, stratum| @scanners[stratum].each_value do |s| stratum_targets << s.collection end end # We create "orphan" scanners for collections that don't appear on the RHS # of any rules, but do appear on the LHS of at least one rule. These # scanners aren't needed to compute the fixpoint, but they are used as part # of rescan/invalidation (e.g., if an orphaned collection receives a manual # deletion operation, we need to arrange for the collection to be # re-filled). @orphan_scanners = [] # Pairs of [scanner, stratum] @app_tables.each do |t| next unless t.class <= Bud::BudCollection # skip lattice wrappers next if t.scanner_cnt > 0 stratum = collection_stratum(t.qualified_tabname.to_s) # if the collection also doesn't appear on any LHSs, skip it next if stratum.nil? @orphan_scanners << [Bud::ScannerElement.new(t.tabname, self, t, t.schema), stratum] end # Sanity check @push_sorted_elems.each do |stratum_elems| stratum_elems.each {|se| se.check_wiring} end # Create sets of elements and collections to invalidate or rescan at the # beginning of each tick prepare_invalidation_scheme # For all tables that are accessed (scanned) in a stratum higher than the # one they are updated in, set a flag to track deltas accumulated in that # tick (see: collection.tick_delta) stratum_accessed = {} (@num_strata-1).downto(0) do |stratum| @scanners[stratum].each_value do |s| stratum_accessed[s.collection] ||= stratum end end @merge_targets.each_with_index do |stratum_targets, stratum| stratum_targets.each {|tab| tab.accumulate_tick_deltas = true if stratum_accessed[tab] and stratum_accessed[tab] > stratum } end @done_wiring = true if @options[:print_wiring] @push_sources.each do |strat| strat.each_value {|src| src.print_wiring} end end end
# File lib/bud.rb, line 600 def each_scanner @num_strata.times do |stratum| @scanners[stratum].each_value do |scanner| yield scanner, stratum end end @orphan_scanners.each do |scanner,stratum| yield scanner, stratum end end
# File lib/bud.rb, line 1248 def eval_rule(__obj__, __src__) __obj__.instance_eval __src__ # ensure that the only local variables are __obj__ and __src__ end
# File lib/bud.rb, line 1252 def eval_rules(rules, strat_num) # This routine evals the rules in a given stratum, which results in a wiring # of PushElements @this_stratum = strat_num rules.each_with_index do |rule, i| # user-supplied code blocks will be evaluated in this context at run-time @this_rule_context = rule.bud_obj begin eval_rule(rule.bud_obj, rule.src) rescue Exception => e err_msg = "** Exception while wiring rule: #{rule.orig_src}\n ****** #{e}" # Create a new exception for accomodating err_msg, but reuse original backtrace new_e = (e.class <= Bud::Error) ? e.class.new(err_msg) : Bud::Error.new(err_msg) new_e.set_backtrace(e.backtrace) raise new_e end end @this_rule_context = nil @this_stratum = -1 end
ids and timers
# File lib/bud.rb, line 1274 def gen_id Time.new.to_i.to_s << rand.to_s end
# File lib/bud.rb, line 213 def import_defs @imported_defs = {} # mod name -> Module map self.class.ancestors.each do |anc| if anc.respond_to? :bud_import_table anc_imp_tbl = anc.bud_import_table anc_imp_tbl.each do |nm, mod| # Ensure that two modules have not been imported under one alias. if @imported_defs.member? nm and not @imported_defs[nm] == anc_imp_tbl[nm] raise Bud::CompileError, "conflicting imports: modules #{@imported_defs[nm]} and #{anc_imp_tbl[nm]} both are imported as '#{nm}'" end @imported_defs[nm] = mod end end end @imported_defs ||= self.class.ancestors.inject({}) {|tbl, e| tbl.merge(e.bud_import_table)} end
# File lib/bud.rb, line 919 def invoke_callbacks @callbacks.each_value do |cb| tbl_name, block = cb tbl = @tables[tbl_name] unless tbl.empty? block.call(tbl) end end end
# File lib/bud.rb, line 1278 def make_periodic_timer(name, period) EventMachine::PeriodicTimer.new(period) do @inbound[name.to_sym] ||= [] @inbound[name.to_sym] << [gen_id, Time.now] tick_internal if @running_async end end
All collections (elements included) are semantically required to erase any cached information at the start of a tick and start from a clean slate. prepare_invalidation_scheme
prepares a just-in-time invalidation scheme that permits us to preserve data from one tick to the next, and to keep things in incremental mode unless there's a negation.
This scheme solves the following constraints.
-
A full scan of an element's contents results in downstream elements getting full scans themselves (i.e., no deltas). This effect is transitive.
-
Invalidation of an element's cache results in rebuilding of the cache and a consequent fullscan. See next.
-
Invalidation of an element requires upstream elements to rescan their contents, or to transitively pass the request on further upstream. Any element that has a cache can rescan without passing on the request to higher levels.
This set of constraints is solved once during wiring, resulting in four data structures:
@default_invalidate = Set of elements and tables to always invalidate at
every tick.
@default_rescan = Set of elements and tables to always scan fully in the
first iteration of every tick.
scanner.invalidate_set = Set of elements to additionally invalidate
if the scanner's table is invalidated at run-time.
scanner.rescan_set = Similar to above.
# File lib/bud.rb, line 447 def prepare_invalidation_scheme if $BUD_SAFE @app_tables = @tables.values + @lattices.values # No collections excluded rescan = Set.new invalidate = @app_tables.select {|t| t.class <= BudScratch}.to_set @num_strata.times do |stratum| @push_sorted_elems[stratum].each do |elem| invalidate << elem rescan << elem end end @default_rescan = rescan.to_a @default_invalidate = invalidate.to_a @reset_list = [] # Nothing to reset at end of tick. It'll be overwritten anyway return end # By default, all tables are considered sources unless they appear on the # lhs. We only consider non-temporal rules because invalidation is only # about this tick. Also, we track (in unsafe_targets) those tables that are # the targets of user-defined code blocks that call "unsafe" functions that # produce a different value in every tick (e.g., budtime). Elements that # feed these tables are forced to rescan their contents, and thus forced to # re-execute these code blocks. unsafe_targets = Set.new t_rules.each do |rule| lhs = rule.lhs.to_sym if rule.op == "<=" # Note that lattices cannot be sources @tables[lhs].is_source = false if @tables.has_key? lhs end unsafe_targets << lhs if rule.unsafe_funcs_called end # Compute a set of tables and elements that should be explicitly told to # invalidate or rescan. Start with a set of tables that always invalidate # and elements that always rescan. invalidate = @app_tables.select {|t| t.invalidate_at_tick}.to_set rescan = Set.new @num_strata.times do |stratum| @push_sorted_elems[stratum].each do |elem| rescan << elem if elem.rescan_at_tick if elem.outputs.any?{|tab| not(tab.class <= PushElement) and not(tab.class <= LatticePushElement) and unsafe_targets.member? tab.qualified_tabname.to_sym } rescan.merge(elem.wired_by) end end rescan_invalidate_tc(stratum, rescan, invalidate) end @default_rescan = rescan.to_a @default_invalidate = invalidate.to_a if $BUD_DEBUG puts "Default rescan: #{rescan.inspect}" puts "Default inval: #{invalidate.inspect}" puts "Unsafe targets: #{unsafe_targets.inspect}" end # For each collection that is to be scanned, compute the set of dependent # tables and elements that will need invalidation and/or rescan if that # table were to be invalidated at runtime. dflt_rescan = rescan dflt_invalidate = invalidate to_reset = rescan + invalidate each_scanner do |scanner, stratum| # If it is going to be always invalidated, it doesn't need further # examination. Lattice scanners also don't get invalidated. next if dflt_rescan.member? scanner next if scanner.class <= LatticeScanner rescan = dflt_rescan.clone invalidate = dflt_invalidate + [scanner.collection] rescan_invalidate_tc(stratum, rescan, invalidate) prune_rescan_set(rescan) # Make sure we reset the rescan/invalidate flag for this scanner at # end-of-tick, but we can remove the scanner from its own # rescan_set/inval_set. to_reset.merge(rescan) to_reset.merge(invalidate) rescan.delete(scanner) invalidate.delete(scanner.collection) # Give the diffs (from default) to scanner; these are elements that are # dependent on this scanner diffscan = (rescan - dflt_rescan).find_all {|elem| elem.class <= PushElement} scanner.invalidate_at_tick(diffscan, (invalidate - dflt_invalidate).to_a) end @reset_list = to_reset.to_a # For each lattice, find the collections that should be rescanned when there # is a new delta for the lattice. That is, if we have a rule like: # "t2 <= t1 {|t| [t.key, lat_foo]}", whenever there is a delta on lat_foo we # should rescan t1 (to produce tuples with the updated lat_foo value). # # TODO: # (1) if t1 is fed by rules r1 and r2 but only r1 references lattice x, # don't trigger rescan of r2 on deltas for x (hard) t_depends.each do |dep| src, target_name = dep.body.to_sym, dep.lhs.to_sym if @lattices.has_key? src and dep.in_body src_lat = @lattices[src] if @tables.has_key? target_name target = @tables[target_name] else target = @lattices[target_name] end # Conservatively, we rescan all the elements that feed the lhs (target) # collection via positive (non-deletion) rules; we then also need to # potentially rescan ancestors of those elements as well (e.g., setting # a stateless PushElement to rescan does nothing; we want to tell its # ancestor ScannerElement to rescan). # # XXX: do we need to consider all transitively reachable nodes for # rescan? lat_rescan = target.positive_predecessors.to_set lat_inval = Set.new target.positive_predecessors.each do |e| e.add_rescan_invalidate(lat_rescan, lat_inval) end src_lat.rescan_on_delta.merge(lat_rescan) end end end
# File lib/bud.rb, line 596 def prune_rescan_set(rescan) rescan.delete_if {|e| e.rescan_at_tick} end
Handle external inputs: channels, terminals, and periodics. Received messages are placed directly into the storage of the appropriate local collection. The inbound queue is cleared at the end of the tick.
# File lib/bud.rb, line 1230 def receive_inbound @inbound.each do |tbl_name, msg_buf| puts "recv via #{tbl_name}: #{msg_buf}" if $BUD_DEBUG msg_buf.each do |b| tables[tbl_name] << b end end end
Given rescan, invalidate sets, compute transitive closure
# File lib/bud.rb, line 577 def rescan_invalidate_tc(stratum, rescan, invalidate) # XXX: hack. If there's nothing in the given stratum, don't do # anything. This can arise if we have an orphan scanner whose input is a # non-monotonic operator; the stratum(LHS) = stratum(RHS) + 1, but there's # nothing else in stratum(LHS). return if @push_sorted_elems[stratum].nil? rescan_len = rescan.size invalidate_len = invalidate.size while true # Ask each element if it wants to add itself to either set, depending on # who else is in those sets already. @push_sorted_elems[stratum].each {|t| t.add_rescan_invalidate(rescan, invalidate)} break if rescan_len == rescan.size and invalidate_len == invalidate.size rescan_len = rescan.size invalidate_len = invalidate.size end end
absorb rules and dependencies from imported modules. The corresponding module instantiations would themselves have resolved their own imports.
# File lib/bud.rb, line 232 def resolve_imports import_tbl = import_defs import_tbl.each_pair do |local_name, mod_name| # corresponding to "import <mod_name> => :<local_name>" mod_inst = send(local_name) if mod_inst.nil? # create wrapper instances #puts "=== resolving #{self}.#{mod_name} => #{local_name}" klass = module_wrapper_class(mod_name) qlocal_name = toplevel? ? local_name.to_s : "#{self.qualified_name}.#{local_name}" # this instantiation will resolve the imported module's own imports mod_inst = klass.new(:toplevel => toplevel, :qualified_name => qlocal_name) instance_variable_set("@#{local_name}", mod_inst) end # Absorb the module wrapper's user-defined state. mod_inst.tables.each_pair do |name, t| # Don't try to import module definitions for builtin Bud state. Note # that @tables only includes the builtin tables, because resolve_imports # is called before user-defined state blocks are run. unless @tables.has_key? t.tabname qname = "#{local_name}.#{name}" tables[qname.to_sym] = t end end mod_inst.lattices.each_pair do |name, t| qname = "#{local_name}.#{name}".to_sym raise Bud::Error if lattices.has_key? qname lattices[qname] = t end mod_inst.t_rules.each do |imp_rule| qname = "#{local_name}.#{imp_rule.lhs}" self.t_rules << [imp_rule.bud_obj, imp_rule.rule_id, qname, imp_rule.op, imp_rule.src, imp_rule.orig_src, imp_rule.unsafe_funcs_called] end mod_inst.t_depends.each do |imp_dep| qlname = "#{local_name}.#{imp_dep.lhs}" qrname = "#{local_name}.#{imp_dep.body}" self.t_depends << [imp_dep.bud_obj, imp_dep.rule_id, qlname, imp_dep.op, qrname, imp_dep.nm, imp_dep.in_body] end mod_inst.t_provides.each do |imp_pro| qintname = "#{local_name}.#{imp_pro.interface}" self.t_provides << [qintname, imp_pro.input] end mod_inst.channels.each do |name, ch| qname = "#{local_name}.#{name}" @channels[qname.to_sym] = ch end mod_inst.dbm_tables.each do |name, t| qname = "#{local_name}.#{name}" @dbm_tables[qname.to_sym] = t end mod_inst.periodics.each do |p| qname = "#{local_name}.#{p.pername}" @periodics << [qname.to_sym, p.period] end end nil end
Schedule a block to be evaluated by EventMachine in the future, and block until this has happened.
# File lib/bud.rb, line 960 def schedule_and_wait # If EM isn't running, just run the user's block immediately # XXX: not clear that this is the right behavior unless EventMachine::reactor_running? yield return end q = Queue.new EventMachine::schedule do ret = false begin yield rescue Exception ret = $! end q.push(ret) end resp = q.pop raise resp if resp end
# File lib/bud.rb, line 929 def start_reactor return if EventMachine::reactor_running? EventMachine::error_handler do |e| # Only print a backtrace if a non-Bud::Error is raised (this presumably # indicates an unexpected failure). if e.class <= Bud::Error puts "#{e.class}: #{e}" else puts "Unexpected Bud error: #{e.inspect}" puts e.backtrace.join("\n") end Bud.shutdown_all_instances raise e end # Block until EM has successfully started up. q = Queue.new # This thread helps us avoid race conditions on the start and stop of # EventMachine's event loop. Thread.new do EventMachine.run do q.push(true) end end # Block waiting for EM's event loop to start up. q.pop end