class RFlow::Connections::ZMQConnection
Represents a ZeroMQ connection.
Attributes
zmq_context[RW]
The ZeroMQ context object. @return [EM::ZeroMQ::Context]
input_socket[RW]
output_socket[RW]
Public Class Methods
create_zmq_context()
click to toggle source
@!visibility private
# File lib/rflow/connections/zmq_connection.rb, line 22 def create_zmq_context version = LibZMQ::version RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" } if EM.reactor_running? raise RuntimeError, 'EventMachine reactor is running when attempting to create a ZeroMQ context' end EM::ZeroMQ::Context.new(1) end
new(config)
click to toggle source
Calls superclass method
RFlow::Connection::new
# File lib/rflow/connections/zmq_connection.rb, line 44 def initialize(config) super validate_options! zmq_context # cause the ZMQ context to be created before the reactor is running end
Public Instance Methods
connect_input!()
click to toggle source
Hook up the input to the real ZeroMQ sockets. @return [void]
# File lib/rflow/connections/zmq_connection.rb, line 52 def connect_input! RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}" check_address(options['input_address']) self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type'])) input_socket.send(options['input_responsibility'].to_sym, options['input_address']) if config.delivery == 'broadcast' input_socket.setsockopt(ZMQ::SUBSCRIBE, '') # request all messages end input_socket.on(:message) do |*message_parts| begin message = RFlow::Message.from_avro(message_parts.last.copy_out_string) RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'" message_parts.each(&:close) # avoid memory leaks recv_callback.call(message) rescue Exception => e RFlow.logger.error "#{name}: Exception processing message of type '#{message.data_type_name}': #{e.message}, because: #{e.backtrace}" end end input_socket end
connect_output!()
click to toggle source
Hook up the output to the real ZeroMQ sockets. @return [void]
# File lib/rflow/connections/zmq_connection.rb, line 78 def connect_output! RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}" check_address(options['output_address']) self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type'])) output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s) output_socket end
send_message(message)
click to toggle source
Send a message along the connection into ZeroMQ. @return [void]
# File lib/rflow/connections/zmq_connection.rb, line 89 def send_message(message) RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'" begin output_socket.send_msg(message.data_type_name.to_s, message.to_avro) rescue Exception => e RFlow.logger.error "Exception #{e.class}: #{e.message}, because: #{e.backtrace}" end end
zmq_context()
click to toggle source
The ZeroMQ context object. @return [EM::ZeroMQ::Context]
# File lib/rflow/connections/zmq_connection.rb, line 38 def zmq_context; ZMQConnection.zmq_context; end
Private Instance Methods
check_address(address)
click to toggle source
# File lib/rflow/connections/zmq_connection.rb, line 120 def check_address(address) # make sure we're not trying to create IPC sockets in an NFS share # because that works poorly if address.start_with?('ipc://') filename = address[6..-1] mount_point = Sys::Filesystem.mount_point(File.dirname(filename)) return unless mount_point mount_type = Sys::Filesystem.mounts.find {|m| m.mount_point == mount_point }.mount_type return unless mount_type case mount_type when 'vmhgfs', 'vboxsf', 'nfs' # vmware, virtualbox, nfs raise ArgumentError, "Cannot safely create IPC sockets in network filesystem '#{mount_point}' of type #{mount_type}" end end end
validate_options!()
click to toggle source
# File lib/rflow/connections/zmq_connection.rb, line 100 def validate_options! # TODO: Normalize/validate configuration missing_options = [] ['input', 'output'].each do |direction_prefix| ['_socket_type', '_address', '_responsibility'].each do |option_suffix| option_name = "#{direction_prefix}#{option_suffix}" unless options.include? option_name missing_options << option_name end end end unless missing_options.empty? raise ArgumentError, "#{self.class.to_s}: configuration missing options: #{missing_options.join ', '}" end true end