class Pantry::Communication::ReadingSocket

Base class of all sockets that read messages from ZMQ. Not meant for direct use, please use one of the subclasses for specific functionality.

Attributes

host[R]
port[R]

Public Class Methods

new(host, port, security) click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 13
def initialize(host, port, security)
  @host     = host
  @port     = port
  @listener = nil
  @security = security
end

Public Instance Methods

add_listener(listener) click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 20
def add_listener(listener)
  @listener = listener
end
build_socket() click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 34
def build_socket
  raise "Implement the socket setup."
end
has_source_header?() click to toggle source

Some ZMQ socket types include the source as the first packet of a message. We need to know if the socket in question does this so we can properly build the Message coming in.

# File lib/pantry/communication/reading_socket.rb, line 49
def has_source_header?
  false
end
open() click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 24
def open
  @socket = build_socket
  Communication.configure_socket(@socket)
  @security.configure_socket(@socket)
  open_socket(@socket)

  @running = true
  self.async.process_messages
end
open_socket(socket) click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 38
def open_socket(socket)
  raise "Connect / Bind the socket built in #build_socket"
end
shutdown() click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 42
def shutdown
  @running = false
end

Protected Instance Methods

handle_message(message) click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 82
def handle_message(message)
  @listener.handle_message(message) if @listener
end
process_messages() click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 55
def process_messages
  while @running
    process_next_message
  end

  @socket.close
end
process_next_message() click to toggle source
# File lib/pantry/communication/reading_socket.rb, line 63
def process_next_message
  next_message = []

  # Drop the ZMQ given source packet, it's extraneous for our purposes
  if has_source_header?
    @socket.read
  end

  next_message << @socket.read

  while @socket.more_parts?
    next_message << @socket.read
  end

  async.handle_message(
    SerializeMessage.from_zeromq(next_message)
  )
end