class RFlow::Component
Parent class for all RFlow
components.
Attributes
The name of the component. @return [String]
Collection of the component's input and output ports. @return [PortCollection]
The UUID of the component. @return [String]
Reference to the worker process in which this instance of the component is running. @return [Shard::Worker]
Public Class Methods
Attempt to instantiate a component described by the config specification. This assumes that the specification of a component is a fully qualified Ruby class that has already been loaded. It will first attempt to find subclasses of {RFlow::Component} (in {Configuration#available_components}) and then attempt to constantize the specification into a different class. Future releases will support external (i.e. non-managed components), but the current stuff only supports Ruby classes.
@param worker [Shard::Worker] the worker process for the component to run in @param config [Configuration::Component] the component configuration @return [RFlow::Component] an instance of the component class
# File lib/rflow/component.rb, line 58 def build(worker, config) raise NotImplementedError, "Non-managed components not yet implemented for component '#{config.name}' as '#{config.specification}' (#{config.uuid})" unless config.managed? RFlow.logger.debug "Instantiating component '#{config.name}' as '#{config.specification}' (#{config.uuid})" begin component_class = RFlow.configuration.available_components[config.specification] if component_class RFlow.logger.debug "Component found in configuration.available_components['#{config.specification}']" else RFlow.logger.debug "Component not found in configuration.available_components, constantizing component '#{config.specification}'" component_class = config.specification.constantize end component_class.new(worker: worker, uuid: config.uuid, name: config.name).tap do |component| config.input_ports.each {|p| component.configure_input_port! p.name, uuid: p.uuid } config.output_ports.each {|p| component.configure_output_port! p.name, uuid: p.uuid } config.input_ports.each do |p| p.input_connections.each do |c| component.send(p.name.to_sym).add_connection c.input_port_key, Connection.build(c) end end config.output_ports.each do |p| p.output_connections.each do |c| component.send(p.name.to_sym).add_connection c.output_port_key, Connection.build(c) end end end rescue NameError => e raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): the class '#{config.specification}' could not be loaded (#{e.message})" rescue Exception => e raise RuntimeError, "Could not instantiate component '#{config.name}' as '#{config.specification}' (#{config.uuid}): #{e.class} #{e.message}, because: #{e.backtrace.inspect}" end end
@!visibility private
# File lib/rflow/component.rb, line 37 def define_port(collection, name) collection[name.to_s] = true # Create the port accessor method based on the port name define_method name.to_s.to_sym do ports.by_name[name.to_s] end end
@!visibility private
# File lib/rflow/component.rb, line 32 def defined_input_ports; @defined_input_ports ||= {}; end
@!visibility private
# File lib/rflow/component.rb, line 34 def defined_output_ports; @defined_output_ports ||= {}; end
Keep track of available component subclasses. @!visibility private
# File lib/rflow/component.rb, line 11 def inherited(subclass) RFlow::Configuration.add_available_component(subclass) end
When declaring your component class, defines an input port with a given name. Will also define a port accessor method named after the port for retrieving it.
@param name [String] @return [void]
# File lib/rflow/component.rb, line 21 def input_port(name); define_port(defined_input_ports, name); end
@param args [Hash] supported args are :name
, :uuid
, :worker
# File lib/rflow/component.rb, line 110 def initialize(args = {}) @name = args[:name] @uuid = args[:uuid] @worker = args[:worker] @ports = PortCollection.new self.class.defined_input_ports.each {|name, _| ports << InputPort.new(self, name: name) } self.class.defined_output_ports.each {|name, _| ports << OutputPort.new(self, name: name) } end
When declaring your component class, defines an output port with a given name. Will also define a port accessor method named after the port for retrieving it.
@param name [String] @return [void]
# File lib/rflow/component.rb, line 29 def output_port(name); define_port(defined_output_ports, name); end
Public Instance Methods
Method called after all components have been shutdown! and just before the global RFlow
exit. Sublcasses should implement to cleanup any leftover state, e.g. flush file handles, etc. @return [void]
# File lib/rflow/component.rb, line 217 def cleanup!; end
Method that should be overridden by a subclass to provide for component-specific configuration. The subclass should use the {configuration} attribute (+@configuration+) to store its particular configuration. @param deserialized_configuration [Hash] from the RFlow
configuration database; most likely a Hash. Don't assume that the keys are symbols! @return [void]
# File lib/rflow/component.rb, line 190 def configure!(deserialized_configuration); end
@!visibility private
# File lib/rflow/component.rb, line 136 def configure_input_port!(port_name, options = {}) RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) input port '#{port_name}' (#{options[:uuid]})" unless self.class.defined_input_ports.include? port_name raise ArgumentError, "Input port '#{port_name}' not defined on component '#{self.class}'" end ports.by_name[port_name].uuid = options[:uuid] end
@!visibility private
# File lib/rflow/component.rb, line 145 def configure_output_port!(port_name, options = {}) RFlow.logger.debug "Configuring component '#{name}' (#{uuid}) output port '#{port_name}' (#{options[:uuid]})" unless self.class.defined_output_ports.include? port_name raise ArgumentError, "Output port '#{port_name}' not defined on component '#{self.class}'" end ports.by_name[port_name].uuid = options[:uuid] end
@!visibility private Tell the component to establish its ports' connections, i.e. make the connection. Uses the underlying connection object. Also establishes the callbacks for each of the input ports
# File lib/rflow/component.rb, line 157 def connect_inputs! input_ports.each {|port| port.recv_callback = method(:process_message) } input_ports.each(&:connect!) end
@!visibility private Tell the component to establish its ports' connections, i.e. make the connection. Uses the underlying connection object. @!visibility private
# File lib/rflow/component.rb, line 166 def connect_outputs! output_ports.each(&:connect!) end
Returns a list of connected input ports. Each port will have one or more keys associated with a particular connection. @return [Array<InputPort>]
# File lib/rflow/component.rb, line 128 def input_ports; ports.by_type['RFlow::Component::InputPort']; end
Returns a list of connected output ports. Each port will have one or more keys associated with the particular connection. @return [Array<OutputPort>]
# File lib/rflow/component.rb, line 133 def output_ports; ports.by_type['RFlow::Component::OutputPort']; end
Method called when a message is received on an input port. Subclasses should implement if they want to receive messages. @param input_port
[RFlow::Component::InputPort] the input port the message was received on @param input_port_key [String] if the message was received on a keyed subport, this is the key @param connection [RFlow::Connection] the connection the message was received on @param message [RFlow::Message] the message itself @return [void]
# File lib/rflow/component.rb, line 205 def process_message(input_port, input_port_key, connection, message); end
Main component running method. Subclasses should implement if they want to set up any EventMachine stuffs (servers, clients, etc.). @return [void]
# File lib/rflow/component.rb, line 196 def run!; end
@!attribute shard [r] Reference to the component's worker process's {Shard}. @return [Shard]
# File lib/rflow/component.rb, line 123 def shard; worker.shard if worker; end
Method called when RFlow
is shutting down. Subclasses should implement to terminate any servers/clients (or let them finish) and stop sending new data through the flow. @return [void]
# File lib/rflow/component.rb, line 211 def shutdown!; end
Pretty-printed version of the component, its ports, their keys, and their connections. @return [String]
# File lib/rflow/component.rb, line 172 def to_s string = "Component '#{name}' (#{uuid})\n" ports.each do |port| port.keys.each do |key| port[key].each do |connection| string << "\t#{port.class.to_s} '#{port.name}' (#{port.uuid}) key '#{key}' connection '#{connection.name}' (#{connection.uuid})\n" end end end string end