class Avro::IPC::Responder
Base class for the server side of a protocol interaction.
Attributes
local_hash[R]
local_protocol[R]
protocol_cache[R]
Public Class Methods
new(local_protocol)
click to toggle source
# File lib/avro/ipc.rb 236 def initialize(local_protocol) 237 @local_protocol = local_protocol 238 @local_hash = self.local_protocol.md5 239 @protocol_cache = {} 240 protocol_cache[local_hash] = local_protocol 241 end
Public Instance Methods
call(local_message, request)
click to toggle source
# File lib/avro/ipc.rb 353 def call(local_message, request) 354 # Actual work done by server: cf. handler in thrift. 355 raise NotImplementedError 356 end
process_handshake(decoder, encoder, connection=nil)
click to toggle source
# File lib/avro/ipc.rb 306 def process_handshake(decoder, encoder, connection=nil) 307 if connection && connection.is_connected? 308 return connection.protocol 309 end 310 handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder) 311 handshake_response = {} 312 313 # determine the remote protocol 314 client_hash = handshake_request['clientHash'] 315 client_protocol = handshake_request['clientProtocol'] 316 remote_protocol = protocol_cache[client_hash] 317 318 if !remote_protocol && client_protocol 319 remote_protocol = Avro::Protocol.parse(client_protocol) 320 protocol_cache[client_hash] = remote_protocol 321 end 322 323 # evaluate remote's guess of the local protocol 324 server_hash = handshake_request['serverHash'] 325 if local_hash == server_hash 326 if !remote_protocol 327 handshake_response['match'] = 'NONE' 328 else 329 handshake_response['match'] = 'BOTH' 330 end 331 else 332 if !remote_protocol 333 handshake_response['match'] = 'NONE' 334 else 335 handshake_response['match'] = 'CLIENT' 336 end 337 end 338 339 if handshake_response['match'] != 'BOTH' 340 handshake_response['serverProtocol'] = local_protocol.to_s 341 handshake_response['serverHash'] = local_hash 342 end 343 344 HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder) 345 346 if connection && handshake_response['match'] != 'NONE' 347 connection.protocol = remote_protocol 348 end 349 350 remote_protocol 351 end
read_request(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/ipc.rb 358 def read_request(writers_schema, readers_schema, decoder) 359 datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) 360 datum_reader.read(decoder) 361 end
respond(call_request, transport=nil)
click to toggle source
Called by a server to deserialize a request, compute and serialize a response or error. Compare to ‘handle()’ in Thrift.
# File lib/avro/ipc.rb 245 def respond(call_request, transport=nil) 246 buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request)) 247 buffer_writer = StringIO.new(''.force_encoding('BINARY')) 248 buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) 249 error = nil 250 response_metadata = {} 251 252 begin 253 remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport) 254 # handshake failure 255 unless remote_protocol 256 return buffer_writer.string 257 end 258 259 # read request using remote protocol 260 request_metadata = META_READER.read(buffer_decoder) 261 remote_message_name = buffer_decoder.read_string 262 263 # get remote and local request schemas so we can do 264 # schema resolution (one fine day) 265 remote_message = remote_protocol.messages[remote_message_name] 266 unless remote_message 267 raise AvroError.new("Unknown remote message: #{remote_message_name}") 268 end 269 local_message = local_protocol.messages[remote_message_name] 270 unless local_message 271 raise AvroError.new("Unknown local message: #{remote_message_name}") 272 end 273 writers_schema = remote_message.request 274 readers_schema = local_message.request 275 request = read_request(writers_schema, readers_schema, buffer_decoder) 276 # perform server logic 277 begin 278 response = call(local_message, request) 279 rescue AvroRemoteError => e 280 error = e 281 rescue Exception => e 282 error = AvroRemoteError.new(e.to_s) 283 end 284 285 # write response using local protocol 286 META_WRITER.write(response_metadata, buffer_encoder) 287 buffer_encoder.write_boolean(!!error) 288 if error.nil? 289 writers_schema = local_message.response 290 write_response(writers_schema, response, buffer_encoder) 291 else 292 writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA 293 write_error(writers_schema, error, buffer_encoder) 294 end 295 rescue Avro::AvroError => e 296 error = AvroRemoteException.new(e.to_s) 297 # TODO does the stuff written here ever get used? 298 buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new) 299 META_WRITER.write(response_metadata, buffer_encoder) 300 buffer_encoder.write_boolean(true) 301 self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder) 302 end 303 buffer_writer.string 304 end
write_error(writers_schema, error_exception, encoder)
click to toggle source
# File lib/avro/ipc.rb 368 def write_error(writers_schema, error_exception, encoder) 369 datum_writer = Avro::IO::DatumWriter.new(writers_schema) 370 datum_writer.write(error_exception.to_s, encoder) 371 end
write_response(writers_schema, response_datum, encoder)
click to toggle source
# File lib/avro/ipc.rb 363 def write_response(writers_schema, response_datum, encoder) 364 datum_writer = Avro::IO::DatumWriter.new(writers_schema) 365 datum_writer.write(response_datum, encoder) 366 end