class RFlow::Connections::ZMQConnection

Represents a ZeroMQ connection.

Attributes

zmq_context[RW]

The ZeroMQ context object. @return [EM::ZeroMQ::Context]

input_socket[RW]
output_socket[RW]

Public Class Methods

create_zmq_context() click to toggle source

@!visibility private

# File lib/rflow/connections/zmq_connection.rb, line 22
def create_zmq_context
  version = LibZMQ::version
  RFlow.logger.debug { "Creating a new ZeroMQ context; ZeroMQ version is #{version[:major]}.#{version[:minor]}.#{version[:patch]}" }
  if EM.reactor_running?
    raise RuntimeError, 'EventMachine reactor is running when attempting to create a ZeroMQ context'
  end
  EM::ZeroMQ::Context.new(1)
end
new(config) click to toggle source
Calls superclass method RFlow::Connection::new
# File lib/rflow/connections/zmq_connection.rb, line 44
def initialize(config)
  super
  validate_options!
  zmq_context # cause the ZMQ context to be created before the reactor is running
end

Public Instance Methods

connect_input!() click to toggle source

Hook up the input to the real ZeroMQ sockets. @return [void]

# File lib/rflow/connections/zmq_connection.rb, line 52
def connect_input!
  RFlow.logger.debug "Connecting input #{uuid} with #{options.find_all {|k, v| k.to_s =~ /input/}}"
  check_address(options['input_address'])

  self.input_socket = zmq_context.socket(ZMQ.const_get(options['input_socket_type']))
  input_socket.send(options['input_responsibility'].to_sym, options['input_address'])
  if config.delivery == 'broadcast'
    input_socket.setsockopt(ZMQ::SUBSCRIBE, '') # request all messages
  end

  input_socket.on(:message) do |*message_parts|
    begin
      message = RFlow::Message.from_avro(message_parts.last.copy_out_string)
      RFlow.logger.debug "#{name}: Received message of type '#{message_parts.first.copy_out_string}'"
      message_parts.each(&:close) # avoid memory leaks
      recv_callback.call(message)
    rescue Exception => e
      RFlow.logger.error "#{name}: Exception processing message of type '#{message.data_type_name}': #{e.message}, because: #{e.backtrace}"
    end
  end

  input_socket
end
connect_output!() click to toggle source

Hook up the output to the real ZeroMQ sockets. @return [void]

# File lib/rflow/connections/zmq_connection.rb, line 78
def connect_output!
  RFlow.logger.debug "Connecting output #{uuid} with #{options.find_all {|k, v| k.to_s =~ /output/}}"
  check_address(options['output_address'])

  self.output_socket = zmq_context.socket(ZMQ.const_get(options['output_socket_type']))
  output_socket.send(options['output_responsibility'].to_sym, options['output_address'].to_s)
  output_socket
end
send_message(message) click to toggle source

Send a message along the connection into ZeroMQ. @return [void]

# File lib/rflow/connections/zmq_connection.rb, line 89
def send_message(message)
  RFlow.logger.debug "#{name}: Sending message of type '#{message.data_type_name.to_s}'"

  begin
    output_socket.send_msg(message.data_type_name.to_s, message.to_avro)
  rescue Exception => e
    RFlow.logger.error "Exception #{e.class}: #{e.message}, because: #{e.backtrace}"
  end
end
zmq_context() click to toggle source

The ZeroMQ context object. @return [EM::ZeroMQ::Context]

# File lib/rflow/connections/zmq_connection.rb, line 38
def zmq_context; ZMQConnection.zmq_context; end

Private Instance Methods

check_address(address) click to toggle source
# File lib/rflow/connections/zmq_connection.rb, line 120
def check_address(address)
  # make sure we're not trying to create IPC sockets in an NFS share
  # because that works poorly
  if address.start_with?('ipc://')
    filename = address[6..-1]
    mount_point = Sys::Filesystem.mount_point(File.dirname(filename))
    return unless mount_point
    mount_type = Sys::Filesystem.mounts.find {|m| m.mount_point == mount_point }.mount_type
    return unless mount_type

    case mount_type
    when 'vmhgfs', 'vboxsf', 'nfs' # vmware, virtualbox, nfs
      raise ArgumentError, "Cannot safely create IPC sockets in network filesystem '#{mount_point}' of type #{mount_type}"
    end
  end
end
validate_options!() click to toggle source
# File lib/rflow/connections/zmq_connection.rb, line 100
def validate_options!
  # TODO: Normalize/validate configuration
  missing_options = []

  ['input', 'output'].each do |direction_prefix|
    ['_socket_type', '_address', '_responsibility'].each do |option_suffix|
      option_name = "#{direction_prefix}#{option_suffix}"
      unless options.include? option_name
        missing_options << option_name
      end
    end
  end

  unless missing_options.empty?
    raise ArgumentError, "#{self.class.to_s}: configuration missing options: #{missing_options.join ', '}"
  end

  true
end