class Tros::IPC::Requestor

Attributes

local_protocol[R]
remote_hash[RW]
remote_protocol[RW]
send_protocol[RW]
transport[R]

Public Class Methods

new(local_protocol, transport) click to toggle source
   # File lib/tros/ipc.rb
82 def initialize(local_protocol, transport)
83   @local_protocol = local_protocol
84   @transport = transport
85   @remote_protocol = nil
86   @remote_hash = nil
87   @send_protocol = nil
88 end

Public Instance Methods

read_call_response(message_name, decoder) click to toggle source
    # File lib/tros/ipc.rb
190 def read_call_response(message_name, decoder)
191   # The format of a call response is:
192   #   * response metadata, a map with values of type bytes
193   #   * a one-byte error flag boolean, followed by either:
194   #     * if the error flag is false,
195   #       the message response, serialized per the message's response schema.
196   #     * if the error flag is true,
197   #       the error, serialized per the message's error union schema.
198   response_metadata = META_READER.read(decoder)
199 
200   # remote response schema
201   remote_message_schema = remote_protocol.messages[message_name]
202   raise AvroError.new("Unknown remote message: #{message_name}") unless remote_message_schema
203 
204   # local response schema
205   local_message_schema = local_protocol.messages[message_name]
206   unless local_message_schema
207     raise AvroError.new("Unknown local message: #{message_name}")
208   end
209 
210   # error flag
211   if !decoder.read_boolean
212     writers_schema = remote_message_schema.response
213     readers_schema = local_message_schema.response
214     read_response(writers_schema, readers_schema, decoder)
215   else
216     writers_schema = remote_message_schema.errors || SYSTEM_ERROR_SCHEMA
217     readers_schema = local_message_schema.errors || SYSTEM_ERROR_SCHEMA
218     raise read_error(writers_schema, readers_schema, decoder)
219   end
220 end
read_error(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/ipc.rb
227 def read_error(writers_schema, readers_schema, decoder)
228   datum_reader = Tros::IO::DatumReader.new(writers_schema, readers_schema)
229   AvroRemoteError.new(datum_reader.read(decoder))
230 end
read_handshake_response(decoder) click to toggle source
    # File lib/tros/ipc.rb
164 def read_handshake_response(decoder)
165   handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder)
166   we_have_matching_schema = false
167 
168   case handshake_response['match']
169   when 'BOTH'
170     self.send_protocol = false
171     we_have_matching_schema = true
172   when 'CLIENT'
173     raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol
174     self.remote_protocol = Tros::Protocol.parse(handshake_response['serverProtocol'])
175     self.remote_hash = handshake_response['serverHash']
176     self.send_protocol = false
177     we_have_matching_schema = true
178   when 'NONE'
179     raise AvroError.new('Handshake failure. match == NONE') if send_protocol
180     self.remote_protocol = Tros::Protocol.parse(handshake_response['serverProtocol'])
181     self.remote_hash = handshake_response['serverHash']
182     self.send_protocol = true
183   else
184     raise AvroError.new("Unexpected match: #{match}")
185   end
186 
187   return we_have_matching_schema
188 end
read_response(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/ipc.rb
222 def read_response(writers_schema, readers_schema, decoder)
223   datum_reader = Tros::IO::DatumReader.new(writers_schema, readers_schema)
224   datum_reader.read(decoder)
225 end
remote_hash=(new_remote_hash) click to toggle source
   # File lib/tros/ipc.rb
95 def remote_hash=(new_remote_hash)
96   @remote_hash = new_remote_hash
97   REMOTE_HASHES[transport.remote_name] = remote_hash
98 end
remote_protocol=(new_remote_protocol) click to toggle source
   # File lib/tros/ipc.rb
90 def remote_protocol=(new_remote_protocol)
91   @remote_protocol = new_remote_protocol
92   REMOTE_PROTOCOLS[transport.remote_name] = remote_protocol
93 end
request(message_name, request_datum) click to toggle source
    # File lib/tros/ipc.rb
100 def request(message_name, request_datum)
101   # Writes a request message and reads a response or error message.
102   # build handshake and call request
103   buffer_writer = StringIO.new('', 'w+')
104   buffer_encoder = Tros::IO::BinaryEncoder.new(buffer_writer)
105   write_handshake_request(buffer_encoder)
106   write_call_request(message_name, request_datum, buffer_encoder)
107 
108   # send the handshake and call request;  block until call response
109   call_request = buffer_writer.string
110   call_response = transport.transceive(call_request)
111 
112   # process the handshake and call response
113   buffer_decoder = Tros::IO::BinaryDecoder.new(StringIO.new(call_response))
114   if read_handshake_response(buffer_decoder)
115     read_call_response(message_name, buffer_decoder)
116   else
117     request(message_name, request_datum)
118   end
119 end
write_call_request(message_name, request_datum, encoder) click to toggle source
    # File lib/tros/ipc.rb
139 def write_call_request(message_name, request_datum, encoder)
140   # The format of a call request is:
141   #   * request metadata, a map with values of type bytes
142   #   * the message name, an Tros string, followed by
143   #   * the message parameters. Parameters are serialized according to
144   #     the message's request declaration.
145 
146   # TODO request metadata (not yet implemented)
147   request_metadata = {}
148   META_WRITER.write(request_metadata, encoder)
149 
150   message = local_protocol.messages[message_name]
151   unless message
152     raise AvroError, "Unknown message: #{message_name}"
153   end
154   encoder.write_string(message.name)
155 
156   write_request(message.request, request_datum, encoder)
157 end
write_handshake_request(encoder) click to toggle source
    # File lib/tros/ipc.rb
121 def write_handshake_request(encoder)
122   local_hash = local_protocol.md5
123   remote_name = transport.remote_name
124   remote_hash = REMOTE_HASHES[remote_name]
125   unless remote_hash
126     remote_hash = local_hash
127     self.remote_protocol = local_protocol
128   end
129   request_datum = {
130     'clientHash' => local_hash,
131     'serverHash' => remote_hash
132   }
133   if send_protocol
134     request_datum['clientProtocol'] = local_protocol.to_s
135   end
136   HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder)
137 end
write_request(request_schema, request_datum, encoder) click to toggle source
    # File lib/tros/ipc.rb
159 def write_request(request_schema, request_datum, encoder)
160   datum_writer = Tros::IO::DatumWriter.new(request_schema)
161   datum_writer.write(request_datum, encoder)
162 end