class Floss::RPC::ZMQ::Client

Attributes

address[RW]
latch[RW]

@return [Celluloid::IO::Stream::Latch]

socket[RW]

@return [Celluloid::ZMQ::ReqSocket]

Public Class Methods

new(address) click to toggle source
# File lib/floss/rpc/zmq.rb, line 20
def initialize(address)
  self.latch = Celluloid::IO::Stream::Latch.new
  self.address = address
  connect
end

Public Instance Methods

call(command, payload) click to toggle source
# File lib/floss/rpc/zmq.rb, line 35
def call(command, payload)
  message = encode_request(command, payload)
  response = latch.synchronize { request(message) }
  decode_response(response)
end
connect() click to toggle source
# File lib/floss/rpc/zmq.rb, line 26
def connect
  self.socket = Celluloid::ZMQ::ReqSocket.new
  socket.connect(address)
end
decode_response(response) click to toggle source
# File lib/floss/rpc/zmq.rb, line 56
def decode_response(response)
  Marshal.load(response)
end
disconnect() click to toggle source
# File lib/floss/rpc/zmq.rb, line 31
def disconnect
  socket.close if socket
end
encode_request(command, payload) click to toggle source
# File lib/floss/rpc/zmq.rb, line 52
def encode_request(command, payload)
  "#{command}:#{Marshal.dump(payload)}"
end
request(message) click to toggle source
# File lib/floss/rpc/zmq.rb, line 41
def request(message)
  timeout(Floss::RPC::TIMEOUT) do
    socket.send(message)
    socket.read
  end
rescue Celluloid::Task::TimeoutError
  disconnect
  connect
  abort Floss::TimeoutError.new("RPC timed out (#{address}).")
end