class RFlow::Connections::ZMQStreamer
The broker process responsible for shuttling messages back and forth on a many-to-many pipeline link. (Solutions without a broker only allow a 1-to-many or many-to-1 connection.)
Attributes
back[R]
connection[R]
context[R]
front[R]
Public Class Methods
new(config)
click to toggle source
Calls superclass method
# File lib/rflow/connections/zmq_connection.rb, line 152 def initialize(config) @connection = config.connection super("broker-#{connection.name}", 'Broker') end
Public Instance Methods
run_process()
click to toggle source
Start the broker process. Returns when things are shutting down. @return [void]
# File lib/rflow/connections/zmq_connection.rb, line 159 def run_process version = LibZMQ::version RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } @context = ZMQ::Context.new RFlow.logger.debug { "Connecting message broker to route from #{connection.options['output_address']} to #{connection.options['input_address']}" } @front = case connection.options['output_socket_type'] when 'PUSH'; context.socket(ZMQ::PULL) when 'PUB'; context.socket(ZMQ::XSUB) else raise ArgumentError, "Unknown output socket type #{connection.options['output_socket_type']}" end @back = case connection.options['input_socket_type'] when 'PULL'; context.socket(ZMQ::PUSH) when 'SUB'; context.socket(ZMQ::XPUB) else raise ArgumentError, "Unknown input socket type #{connection.options['input_socket_type']}" end front.bind(connection.options['output_address']) back.bind(connection.options['input_address']) while true ZMQ::Proxy.new(front, back) end rescue Exception => e RFlow.logger.error "Error running message broker: #{e.class}: #{e.message}, because: #{e.backtrace.inspect}" ensure back.close if back front.close if front context.terminate if context end