class Floss::RPC::ZMQ::Client
Attributes
address[RW]
latch[RW]
@return [Celluloid::IO::Stream::Latch]
socket[RW]
@return [Celluloid::ZMQ::ReqSocket]
Public Class Methods
new(address)
click to toggle source
# File lib/floss/rpc/zmq.rb, line 20 def initialize(address) self.latch = Celluloid::IO::Stream::Latch.new self.address = address connect end
Public Instance Methods
call(command, payload)
click to toggle source
# File lib/floss/rpc/zmq.rb, line 35 def call(command, payload) message = encode_request(command, payload) response = latch.synchronize { request(message) } decode_response(response) end
connect()
click to toggle source
# File lib/floss/rpc/zmq.rb, line 26 def connect self.socket = Celluloid::ZMQ::ReqSocket.new socket.connect(address) end
decode_response(response)
click to toggle source
# File lib/floss/rpc/zmq.rb, line 56 def decode_response(response) Marshal.load(response) end
disconnect()
click to toggle source
# File lib/floss/rpc/zmq.rb, line 31 def disconnect socket.close if socket end
encode_request(command, payload)
click to toggle source
# File lib/floss/rpc/zmq.rb, line 52 def encode_request(command, payload) "#{command}:#{Marshal.dump(payload)}" end
request(message)
click to toggle source
# File lib/floss/rpc/zmq.rb, line 41 def request(message) timeout(Floss::RPC::TIMEOUT) do socket.send(message) socket.read end rescue Celluloid::Task::TimeoutError disconnect connect abort Floss::TimeoutError.new("RPC timed out (#{address}).") end