class Avro::IPC::Responder
Base class for the server side of a protocol interaction.
Attributes
local_hash[R]
local_protocol[R]
protocol_cache[R]
Public Class Methods
new(local_protocol)
click to toggle source
# File lib/avro/ipc.rb 240 def initialize(local_protocol) 241 @local_protocol = local_protocol 242 @local_hash = self.local_protocol.md5 243 @protocol_cache = {} 244 protocol_cache[local_hash] = local_protocol 245 end
Public Instance Methods
call(_local_message, _request)
click to toggle source
# File lib/avro/ipc.rb 357 def call(_local_message, _request) 358 # Actual work done by server: cf. handler in thrift. 359 raise NotImplementedError 360 end
process_handshake(decoder, encoder, connection=nil)
click to toggle source
# File lib/avro/ipc.rb 310 def process_handshake(decoder, encoder, connection=nil) 311 if connection && connection.is_connected? 312 return connection.protocol 313 end 314 handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder) 315 handshake_response = {} 316 317 # determine the remote protocol 318 client_hash = handshake_request['clientHash'] 319 client_protocol = handshake_request['clientProtocol'] 320 remote_protocol = protocol_cache[client_hash] 321 322 if !remote_protocol && client_protocol 323 remote_protocol = Avro::Protocol.parse(client_protocol) 324 protocol_cache[client_hash] = remote_protocol 325 end 326 327 # evaluate remote's guess of the local protocol 328 server_hash = handshake_request['serverHash'] 329 if local_hash == server_hash 330 if !remote_protocol 331 handshake_response['match'] = 'NONE' 332 else 333 handshake_response['match'] = 'BOTH' 334 end 335 else 336 if !remote_protocol 337 handshake_response['match'] = 'NONE' 338 else 339 handshake_response['match'] = 'CLIENT' 340 end 341 end 342 343 if handshake_response['match'] != 'BOTH' 344 handshake_response['serverProtocol'] = local_protocol.to_s 345 handshake_response['serverHash'] = local_hash 346 end 347 348 HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder) 349 350 if connection && handshake_response['match'] != 'NONE' 351 connection.protocol = remote_protocol 352 end 353 354 remote_protocol 355 end
read_request(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/ipc.rb 362 def read_request(writers_schema, readers_schema, decoder) 363 datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) 364 datum_reader.read(decoder) 365 end
respond(call_request, transport=nil)
click to toggle source
Called by a server to deserialize a request, compute and serialize a response or error. Compare to ‘handle()’ in Thrift.
# File lib/avro/ipc.rb 249 def respond(call_request, transport=nil) 250 buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request)) 251 buffer_writer = StringIO.new(String.new('', encoding: 'BINARY')) 252 buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) 253 error = nil 254 response_metadata = {} 255 256 begin 257 remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport) 258 # handshake failure 259 unless remote_protocol 260 return buffer_writer.string 261 end 262 263 # read request using remote protocol 264 _request_metadata = META_READER.read(buffer_decoder) 265 remote_message_name = buffer_decoder.read_string 266 267 # get remote and local request schemas so we can do 268 # schema resolution (one fine day) 269 remote_message = remote_protocol.messages[remote_message_name] 270 unless remote_message 271 raise AvroError.new("Unknown remote message: #{remote_message_name}") 272 end 273 local_message = local_protocol.messages[remote_message_name] 274 unless local_message 275 raise AvroError.new("Unknown local message: #{remote_message_name}") 276 end 277 writers_schema = remote_message.request 278 readers_schema = local_message.request 279 request = read_request(writers_schema, readers_schema, buffer_decoder) 280 # perform server logic 281 begin 282 response = call(local_message, request) 283 rescue AvroRemoteError => e 284 error = e 285 rescue Exception => e # rubocop:disable Lint/RescueException 286 error = AvroRemoteError.new(e.to_s) 287 end 288 289 # write response using local protocol 290 META_WRITER.write(response_metadata, buffer_encoder) 291 buffer_encoder.write_boolean(!!error) 292 if error.nil? 293 writers_schema = local_message.response 294 write_response(writers_schema, response, buffer_encoder) 295 else 296 writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA 297 write_error(writers_schema, error, buffer_encoder) 298 end 299 rescue Avro::AvroError => e 300 error = AvroRemoteException.new(e.to_s) 301 # TODO does the stuff written here ever get used? 302 buffer_encoder = Avro::IO::BinaryEncoder.new(StringIO.new) 303 META_WRITER.write(response_metadata, buffer_encoder) 304 buffer_encoder.write_boolean(true) 305 self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder) 306 end 307 buffer_writer.string 308 end
write_error(writers_schema, error_exception, encoder)
click to toggle source
# File lib/avro/ipc.rb 372 def write_error(writers_schema, error_exception, encoder) 373 datum_writer = Avro::IO::DatumWriter.new(writers_schema) 374 datum_writer.write(error_exception.to_s, encoder) 375 end
write_response(writers_schema, response_datum, encoder)
click to toggle source
# File lib/avro/ipc.rb 367 def write_response(writers_schema, response_datum, encoder) 368 datum_writer = Avro::IO::DatumWriter.new(writers_schema) 369 datum_writer.write(response_datum, encoder) 370 end