class Tros::IPC::Responder

Base class for the server side of a protocol interaction.

Attributes

local_hash[R]
local_protocol[R]
protocol_cache[R]

Public Class Methods

new(local_protocol) click to toggle source
    # File lib/tros/ipc.rb
236 def initialize(local_protocol)
237   @local_protocol = local_protocol
238   @local_hash = self.local_protocol.md5
239   @protocol_cache = {}
240   protocol_cache[local_hash] = local_protocol
241 end

Public Instance Methods

call(local_message, request) click to toggle source
    # File lib/tros/ipc.rb
352 def call(local_message, request)
353   # Actual work done by server: cf. handler in thrift.
354   raise NotImplementedError
355 end
process_handshake(decoder, encoder, connection=nil) click to toggle source
    # File lib/tros/ipc.rb
305 def process_handshake(decoder, encoder, connection=nil)
306   if connection && connection.is_connected?
307     return connection.protocol
308   end
309   handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
310   handshake_response = {}
311 
312   # determine the remote protocol
313   client_hash = handshake_request['clientHash']
314   client_protocol = handshake_request['clientProtocol']
315   remote_protocol = protocol_cache[client_hash]
316 
317   if !remote_protocol && client_protocol
318     remote_protocol = Tros::Protocol.parse(client_protocol)
319     protocol_cache[client_hash] = remote_protocol
320   end
321 
322   # evaluate remote's guess of the local protocol
323   server_hash = handshake_request['serverHash']
324   if local_hash == server_hash
325     if !remote_protocol
326       handshake_response['match'] = 'NONE'
327     else
328       handshake_response['match'] = 'BOTH'
329     end
330   else
331     if !remote_protocol
332       handshake_response['match'] = 'NONE'
333     else
334       handshake_response['match'] = 'CLIENT'
335     end
336   end
337 
338   if handshake_response['match'] != 'BOTH'
339     handshake_response['serverProtocol'] = local_protocol.to_s
340     handshake_response['serverHash'] = local_hash
341   end
342 
343   HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
344 
345   if connection && handshake_response['match'] != 'NONE'
346     connection.protocol = remote_protocol
347   end
348 
349   remote_protocol
350 end
read_request(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/tros/ipc.rb
357 def read_request(writers_schema, readers_schema, decoder)
358   datum_reader = Tros::IO::DatumReader.new(writers_schema, readers_schema)
359   datum_reader.read(decoder)
360 end
respond(call_request, transport=nil) click to toggle source

Called by a server to deserialize a request, compute and serialize a response or error. Compare to 'handle()' in Thrift.

    # File lib/tros/ipc.rb
245 def respond(call_request, transport=nil)
246   buffer_decoder = Tros::IO::BinaryDecoder.new(StringIO.new(call_request))
247   buffer_writer = StringIO.new('', 'w+')
248   buffer_encoder = Tros::IO::BinaryEncoder.new(buffer_writer)
249   error = nil
250   response_metadata = {}
251 
252   begin
253     remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport)
254     # handshake failure
255     unless remote_protocol
256       return buffer_writer.string
257     end
258 
259     # read request using remote protocol
260     request_metadata = META_READER.read(buffer_decoder)
261     remote_message_name = buffer_decoder.read_string
262 
263     # get remote and local request schemas so we can do
264     # schema resolution (one fine day)
265     remote_message = remote_protocol.messages[remote_message_name]
266     unless remote_message
267       raise AvroError.new("Unknown remote message: #{remote_message_name}")
268     end
269     local_message = local_protocol.messages[remote_message_name]
270     unless local_message
271       raise AvroError.new("Unknown local message: #{remote_message_name}")
272     end
273     writers_schema = remote_message.request
274     readers_schema = local_message.request
275     request = read_request(writers_schema, readers_schema, buffer_decoder)
276     # perform server logic
277     begin
278       response = call(local_message, request)
279     rescue AvroRemoteError => e
280       error = e
281     rescue Exception => e
282       error = AvroRemoteError.new(e.to_s)
283     end
284 
285     # write response using local protocol
286     META_WRITER.write(response_metadata, buffer_encoder)
287     buffer_encoder.write_boolean(!!error)
288     if error.nil?
289       writers_schema = local_message.response
290       write_response(writers_schema, response, buffer_encoder)
291     else
292       writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA
293       write_error(writers_schema, error, buffer_encoder)
294     end
295   rescue Tros::AvroError => e
296     error = AvroRemoteException.new(e.to_s)
297     buffer_encoder = Tros::IO::BinaryEncoder.new(StringIO.new)
298     META_WRITER.write(response_metadata, buffer_encoder)
299     buffer_encoder.write_boolean(true)
300     self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
301   end
302   buffer_writer.string
303 end
write_error(writers_schema, error_exception, encoder) click to toggle source
    # File lib/tros/ipc.rb
367 def write_error(writers_schema, error_exception, encoder)
368   datum_writer = Tros::IO::DatumWriter.new(writers_schema)
369   datum_writer.write(error_exception.to_s, encoder)
370 end
write_response(writers_schema, response_datum, encoder) click to toggle source
    # File lib/tros/ipc.rb
362 def write_response(writers_schema, response_datum, encoder)
363   datum_writer = Tros::IO::DatumWriter.new(writers_schema)
364   datum_writer.write(response_datum, encoder)
365 end