class Avro::IPC::Responder

Base class for the server side of a protocol interaction.

Attributes

local_hash[R]
local_protocol[R]
protocol_cache[R]

Public Class Methods

new(local_protocol) click to toggle source
    # File lib/avro/ipc.rb
236 def initialize(local_protocol)
237   @local_protocol = local_protocol
238   @local_hash = self.local_protocol.md5
239   @protocol_cache = {}
240   protocol_cache[local_hash] = local_protocol
241 end

Public Instance Methods

call(local_message, request) click to toggle source
    # File lib/avro/ipc.rb
353 def call(local_message, request)
354   # Actual work done by server: cf. handler in thrift.
355   raise NotImplementedError
356 end
process_handshake(decoder, encoder, connection=nil) click to toggle source
    # File lib/avro/ipc.rb
306 def process_handshake(decoder, encoder, connection=nil)
307   if connection && connection.is_connected?
308     return connection.protocol
309   end
310   handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
311   handshake_response = {}
312 
313   # determine the remote protocol
314   client_hash = handshake_request['clientHash']
315   client_protocol = handshake_request['clientProtocol']
316   remote_protocol = protocol_cache[client_hash]
317 
318   if !remote_protocol && client_protocol
319     remote_protocol = Avro::Protocol.parse(client_protocol)
320     protocol_cache[client_hash] = remote_protocol
321   end
322 
323   # evaluate remote's guess of the local protocol
324   server_hash = handshake_request['serverHash']
325   if local_hash == server_hash
326     if !remote_protocol
327       handshake_response['match'] = 'NONE'
328     else
329       handshake_response['match'] = 'BOTH'
330     end
331   else
332     if !remote_protocol
333       handshake_response['match'] = 'NONE'
334     else
335       handshake_response['match'] = 'CLIENT'
336     end
337   end
338 
339   if handshake_response['match'] != 'BOTH'
340     handshake_response['serverProtocol'] = local_protocol.to_s
341     handshake_response['serverHash'] = local_hash
342   end
343 
344   HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
345 
346   if connection && handshake_response['match'] != 'NONE'
347     connection.protocol = remote_protocol
348   end
349 
350   remote_protocol
351 end
read_request(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/ipc.rb
358 def read_request(writers_schema, readers_schema, decoder)
359   datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
360   datum_reader.read(decoder)
361 end
respond(call_request, transport=nil) click to toggle source

Called by a server to deserialize a request, compute and serialize a response or error. Compare to ‘handle()’ in Thrift.

    # File lib/avro/ipc.rb
245 def respond(call_request, transport=nil)
246   buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
247   buffer_writer = StringIO.new(''.force_encoding('BINARY'))
248   buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
249   error = nil
250   response_metadata = {}
251 
252   begin
253     remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport)
254     # handshake failure
255     unless remote_protocol
256       return buffer_writer.string
257     end
258 
259     # read request using remote protocol
260     request_metadata = META_READER.read(buffer_decoder)
261     remote_message_name = buffer_decoder.read_string
262 
263     # get remote and local request schemas so we can do
264     # schema resolution (one fine day)
265     remote_message = remote_protocol.messages[remote_message_name]
266     unless remote_message
267       raise AvroError.new("Unknown remote message: #{remote_message_name}")
268     end
269     local_message = local_protocol.messages[remote_message_name]
270     unless local_message
271       raise AvroError.new("Unknown local message: #{remote_message_name}")
272     end
273     writers_schema = remote_message.request
274     readers_schema = local_message.request
275     request = read_request(writers_schema, readers_schema, buffer_decoder)
276     # perform server logic
277     begin
278       response = call(local_message, request)
279     rescue AvroRemoteError => e
280       error = e
281     rescue Exception => e
282       error = AvroRemoteError.new(e.to_s)
283     end
284 
285     # write response using local protocol
286     META_WRITER.write(response_metadata, buffer_encoder)
287     buffer_encoder.write_boolean(!!error)
288     if error.nil?
289       writers_schema = local_message.response
290       write_response(writers_schema, response, buffer_encoder)
291     else
292       writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA
293       write_error(writers_schema, error, buffer_encoder)
294     end
295   rescue Avro::AvroError => e
296     error = AvroRemoteException.new(e.to_s)
297     # TODO does the stuff written here ever get used?
298     buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new)
299     META_WRITER.write(response_metadata, buffer_encoder)
300     buffer_encoder.write_boolean(true)
301     self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
302   end
303   buffer_writer.string
304 end
write_error(writers_schema, error_exception, encoder) click to toggle source
    # File lib/avro/ipc.rb
368 def write_error(writers_schema, error_exception, encoder)
369   datum_writer = Avro::IO::DatumWriter.new(writers_schema)
370   datum_writer.write(error_exception.to_s, encoder)
371 end
write_response(writers_schema, response_datum, encoder) click to toggle source
    # File lib/avro/ipc.rb
363 def write_response(writers_schema, response_datum, encoder)
364   datum_writer = Avro::IO::DatumWriter.new(writers_schema)
365   datum_writer.write(response_datum, encoder)
366 end