class RFlow::Configuration::RubyDSL

Ruby DSL config file controller. TODO: more docs and examples

Constants

COMPONENT_PORT_STRING_REGEX

Splits the connection string into component/port parts

Attributes

connection_specs[RW]
default_shard[RW]
setting_specs[RW]
shard_specs[RW]

Public Class Methods

configure() { |config_file| ... } click to toggle source

Method called within the config file itself

# File lib/rflow/configuration/ruby_dsl.rb, line 97
def self.configure
  config_file = self.new
  yield config_file
  config_file.process_objects
end
new() click to toggle source
# File lib/rflow/configuration/ruby_dsl.rb, line 12
def initialize
  @default_shard = {:name => 'DEFAULT', :type => :process, :count => 1, :components => []}
  @current_shard = default_shard

  @setting_specs = []
  @shard_specs = [default_shard]
  @connection_specs = []
end

Public Instance Methods

component(name, specification, options = {}) click to toggle source

DSL method to specify a component. Expects a name, specification, and set of component specific options, that must be marshallable into the database (i.e. should all be strings)

# File lib/rflow/configuration/ruby_dsl.rb, line 58
def component(name, specification, options = {})
  @current_shard[:components] << {
    :name => name,
    :specification => specification.to_s, :options => options,
    :config_line => get_config_line(caller)
  }
end
connect(hash) click to toggle source

DSL method to specify a connection between a component/output_port and another component/input_port. The component/port specification is a string where the names of the two elements are separated by '#', and the 'connection' is specified by a Ruby Hash, i.e.:

connect 'componentA#output' => 'componentB#input'

Array ports are specified with an key suffix in standard progamming syntax, i.e.

connect 'componentA#arrayport[2]' => 'componentB#in[1]'

Uses the model to assign random UUIDs

# File lib/rflow/configuration/ruby_dsl.rb, line 76
def connect(hash)
  delivery = hash[:delivery] || 'round-robin'
  hash.except(:delivery).each do |output_string, input_string|
    output_component_name, output_port_name, output_port_key = parse_connection_string(output_string)
    input_component_name, input_port_name, input_port_key = parse_connection_string(input_string)

    connection_specs << {
      :name => output_string + '=>' + input_string,
      :delivery => delivery,
      :output_component_name => output_component_name,
      :output_port_name => output_port_name, :output_port_key => output_port_key,
      :output_string => output_string,
      :input_component_name => input_component_name,
      :input_port_name => input_port_name, :input_port_key => input_port_key,
      :input_string => input_string,
      :config_line => get_config_line(caller)
    }
  end
end
process(name, options = {}, &block) click to toggle source

shortcut

# File lib/rflow/configuration/ruby_dsl.rb, line 46
def process(name, options = {}, &block)
  shard(name, options.merge(:type => :process), &block)
end
process_objects() click to toggle source

Method to process the 'DSL' objects into the config database via ActiveRecord

# File lib/rflow/configuration/ruby_dsl.rb, line 105
def process_objects
  process_setting_specs
  process_shard_specs
  process_connection_specs
end
setting(name, value) click to toggle source

DSL method to specify a name/value pair. RFlow core uses the 'rflow.' prefix on all of its settings. Custom settings should use a custom (unique) prefix

# File lib/rflow/configuration/ruby_dsl.rb, line 24
def setting(name, value)
  setting_specs << {:name => name.to_s, :value => value.to_s, :config_line => get_config_line(caller)}
end
shard(name, options = {}) { |self| ... } click to toggle source

DSL method to specify a shard block for either a process or thread

# File lib/rflow/configuration/ruby_dsl.rb, line 29
def shard(name, options = {})
  raise ArgumentError, 'Cannot use DEFAULT as a shard name' if name == 'DEFAULT'
  raise ArgumentError, 'Cannot nest shards' if @current_shard != default_shard

  type = if options[:thread] || options[:type] == :thread; :thread
         else :process
         end

  count = options[type] || options[:count] || 1

  @current_shard = {:name => name, :type => type, :count => count, :components => [], :config_line => get_config_line(caller)}
  shard_specs << @current_shard
  yield self
  @current_shard = default_shard
end
thread(name, options = {}, &block) click to toggle source

shortcut

# File lib/rflow/configuration/ruby_dsl.rb, line 51
def thread(name, options = {}, &block)
  shard(name, options.merge(:type => :thread), &block)
end

Private Instance Methods

get_config_line(call_history) click to toggle source

Helper function to extract the line of the config that specified the operation. Useful in printing helpful error messages

# File lib/rflow/configuration/ruby_dsl.rb, line 114
def get_config_line(call_history)
  call_history.first.split(':in').first
end
parse_connection_string(string) click to toggle source
# File lib/rflow/configuration/ruby_dsl.rb, line 121
def parse_connection_string(string)
  matched = COMPONENT_PORT_STRING_REGEX.match(string)
  raise ArgumentError, "Invalid component/port string specification: #{string}" unless matched
  component_name, port_name, port_key = matched.captures
  [component_name, port_name, port_key]
end
process_connection_specs() click to toggle source

For each given connection, break up each input/output component/port specification, ensure that the component already exists in the database (by name). Chooses the best connection type for any pair of components.

# File lib/rflow/configuration/ruby_dsl.rb, line 171
def process_connection_specs
  connection_specs.each do |spec|
    begin
      RFlow.logger.debug "Found connection from '#{spec[:output_string]}' to '#{spec[:input_string]}', creating"

      # an input port can be associated with multiple outputs, but
      # an output port can only be associated with one input
      output_component = RFlow::Configuration::Component.find_by_name spec[:output_component_name]
      raise RFlow::Configuration::Connection::ConnectionInvalid,
        "Component '#{spec[:output_component_name]}' not found at #{spec[:config_line]}" unless output_component
      output_port = output_component.output_ports.find_or_initialize_by :name => spec[:output_port_name]
      output_port.save!

      input_component = RFlow::Configuration::Component.find_by_name spec[:input_component_name]
      raise RFlow::Configuration::Connection::ConnectionInvalid,
        "Component '#{spec[:input_component_name]}' not found at #{spec[:config_line]}" unless input_component
      input_port = input_component.input_ports.find_or_initialize_by :name => spec[:input_port_name]
      input_port.save!

      output_shards = output_component.shard.count
      input_shards = input_component.shard.count

      broadcast_connection = spec[:delivery] == 'broadcast'
      in_shard_connection = output_component.shard == input_component.shard
      one_to_one = output_shards == 1 && input_shards == 1
      one_to_many = output_shards == 1 && input_shards > 1
      many_to_one = output_shards > 1 && input_shards == 1
      many_to_many = output_shards > 1 && input_shards > 1

      use_broker = many_to_many && (broadcast_connection || !in_shard_connection)
      connection_type = use_broker ? RFlow::Configuration::BrokeredZMQConnection : RFlow::Configuration::ZMQConnection

      conn = connection_type.create!(:name => spec[:name],
                                     :delivery => spec[:delivery],
                                     :output_port_key => spec[:output_port_key],
                                     :input_port_key => spec[:input_port_key],
                                     :output_port => output_port,
                                     :input_port => input_port)

      # bind on the cardinality-1 side, connect on the cardinality-n side
      if in_shard_connection && !use_broker
        conn.options['output_responsibility'] = 'connect'
        conn.options['input_responsibility'] = 'bind'
        conn.options['output_address'] = "inproc://rflow.#{conn.uuid}"
        conn.options['input_address'] = "inproc://rflow.#{conn.uuid}"
      elsif many_to_one
        conn.options['output_responsibility'] = 'connect'
        conn.options['input_responsibility'] = 'bind'
      elsif one_to_many
        conn.options['output_responsibility'] = 'bind'
        conn.options['input_responsibility'] = 'connect'
      end

      case spec[:delivery]
      when 'broadcast'
        conn.options['output_socket_type'] = 'PUB'
        conn.options['input_socket_type'] = 'SUB'
      when 'round-robin'
        conn.options['output_socket_type'] = 'PUSH'
        conn.options['input_socket_type'] = 'PULL'
      else
        raise RFlow::Configuration::Connection::ConnectionInvalid,
          "Delivery type '#{spec[:delivery]}' unknown at #{spec[:config_line]}"
      end

      conn.save!
      conn
    rescue Exception => e
      # TODO: Figure out why an ArgumentError doesn't put the
      # offending message into e.message, even though it is printed
      # out if not caught
      raise RFlow::Configuration::Connection::ConnectionInvalid, "#{e.class}: #{e.message} at config '#{spec[:config_line]}'"
    end
  end
end
process_setting_specs() click to toggle source

Iterates through each setting specified in the DSL and creates rows in the database corresponding to the setting

# File lib/rflow/configuration/ruby_dsl.rb, line 130
def process_setting_specs
  setting_specs.each do |spec|
    RFlow.logger.debug "Found config file setting '#{spec[:name]}' = (#{Dir.getwd}) '#{spec[:value]}'"
    RFlow::Configuration::Setting.create! :name => spec[:name], :value => spec[:value]
  end
end
process_shard_specs() click to toggle source

Iterates through each shard specified in the DSL and creates rows in the database corresponding to the shard and included components

# File lib/rflow/configuration/ruby_dsl.rb, line 140
def process_shard_specs
  shard_specs.each do |spec|
    RFlow.logger.debug "Found #{spec[:type]} shard '#{spec[:name]}', creating"

    if spec[:components].empty?
      RFlow.logger.warn "Skipping shard '#{spec[:name]}' because it has no components"
      next
    end

    clazz = case spec[:type]
            when :process; RFlow::Configuration::ProcessShard
            when :thread; RFlow::Configuration::ThreadShard
            else raise RFlow::Configuration::Shard::ShardInvalid, "Invalid shard: #{spec.inspect}"
            end

    shard = clazz.create! :name => spec[:name], :count => spec[:count]

    spec[:components].each do |component_spec|
      RFlow.logger.debug "Shard '#{spec[:name]}' found component '#{component_spec[:name]}', creating"
      RFlow::Configuration::Component.create!(:shard => shard,
                                              :name => component_spec[:name],
                                              :specification => component_spec[:specification],
                                              :options => component_spec[:options])
    end
  end
end