class Tros::IPC::Responder
Base class for the server side of a protocol interaction.
Attributes
local_hash[R]
local_protocol[R]
protocol_cache[R]
Public Class Methods
new(local_protocol)
click to toggle source
# File lib/tros/ipc.rb 236 def initialize(local_protocol) 237 @local_protocol = local_protocol 238 @local_hash = self.local_protocol.md5 239 @protocol_cache = {} 240 protocol_cache[local_hash] = local_protocol 241 end
Public Instance Methods
call(local_message, request)
click to toggle source
# File lib/tros/ipc.rb 352 def call(local_message, request) 353 # Actual work done by server: cf. handler in thrift. 354 raise NotImplementedError 355 end
process_handshake(decoder, encoder, connection=nil)
click to toggle source
# File lib/tros/ipc.rb 305 def process_handshake(decoder, encoder, connection=nil) 306 if connection && connection.is_connected? 307 return connection.protocol 308 end 309 handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder) 310 handshake_response = {} 311 312 # determine the remote protocol 313 client_hash = handshake_request['clientHash'] 314 client_protocol = handshake_request['clientProtocol'] 315 remote_protocol = protocol_cache[client_hash] 316 317 if !remote_protocol && client_protocol 318 remote_protocol = Tros::Protocol.parse(client_protocol) 319 protocol_cache[client_hash] = remote_protocol 320 end 321 322 # evaluate remote's guess of the local protocol 323 server_hash = handshake_request['serverHash'] 324 if local_hash == server_hash 325 if !remote_protocol 326 handshake_response['match'] = 'NONE' 327 else 328 handshake_response['match'] = 'BOTH' 329 end 330 else 331 if !remote_protocol 332 handshake_response['match'] = 'NONE' 333 else 334 handshake_response['match'] = 'CLIENT' 335 end 336 end 337 338 if handshake_response['match'] != 'BOTH' 339 handshake_response['serverProtocol'] = local_protocol.to_s 340 handshake_response['serverHash'] = local_hash 341 end 342 343 HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder) 344 345 if connection && handshake_response['match'] != 'NONE' 346 connection.protocol = remote_protocol 347 end 348 349 remote_protocol 350 end
read_request(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/tros/ipc.rb 357 def read_request(writers_schema, readers_schema, decoder) 358 datum_reader = Tros::IO::DatumReader.new(writers_schema, readers_schema) 359 datum_reader.read(decoder) 360 end
respond(call_request, transport=nil)
click to toggle source
Called by a server to deserialize a request, compute and serialize a response or error. Compare to 'handle()' in Thrift.
# File lib/tros/ipc.rb 245 def respond(call_request, transport=nil) 246 buffer_decoder = Tros::IO::BinaryDecoder.new(StringIO.new(call_request)) 247 buffer_writer = StringIO.new('', 'w+') 248 buffer_encoder = Tros::IO::BinaryEncoder.new(buffer_writer) 249 error = nil 250 response_metadata = {} 251 252 begin 253 remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport) 254 # handshake failure 255 unless remote_protocol 256 return buffer_writer.string 257 end 258 259 # read request using remote protocol 260 request_metadata = META_READER.read(buffer_decoder) 261 remote_message_name = buffer_decoder.read_string 262 263 # get remote and local request schemas so we can do 264 # schema resolution (one fine day) 265 remote_message = remote_protocol.messages[remote_message_name] 266 unless remote_message 267 raise AvroError.new("Unknown remote message: #{remote_message_name}") 268 end 269 local_message = local_protocol.messages[remote_message_name] 270 unless local_message 271 raise AvroError.new("Unknown local message: #{remote_message_name}") 272 end 273 writers_schema = remote_message.request 274 readers_schema = local_message.request 275 request = read_request(writers_schema, readers_schema, buffer_decoder) 276 # perform server logic 277 begin 278 response = call(local_message, request) 279 rescue AvroRemoteError => e 280 error = e 281 rescue Exception => e 282 error = AvroRemoteError.new(e.to_s) 283 end 284 285 # write response using local protocol 286 META_WRITER.write(response_metadata, buffer_encoder) 287 buffer_encoder.write_boolean(!!error) 288 if error.nil? 289 writers_schema = local_message.response 290 write_response(writers_schema, response, buffer_encoder) 291 else 292 writers_schema = local_message.errors || SYSTEM_ERROR_SCHEMA 293 write_error(writers_schema, error, buffer_encoder) 294 end 295 rescue Tros::AvroError => e 296 error = AvroRemoteException.new(e.to_s) 297 buffer_encoder = Tros::IO::BinaryEncoder.new(StringIO.new) 298 META_WRITER.write(response_metadata, buffer_encoder) 299 buffer_encoder.write_boolean(true) 300 self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder) 301 end 302 buffer_writer.string 303 end
write_error(writers_schema, error_exception, encoder)
click to toggle source
# File lib/tros/ipc.rb 367 def write_error(writers_schema, error_exception, encoder) 368 datum_writer = Tros::IO::DatumWriter.new(writers_schema) 369 datum_writer.write(error_exception.to_s, encoder) 370 end
write_response(writers_schema, response_datum, encoder)
click to toggle source
# File lib/tros/ipc.rb 362 def write_response(writers_schema, response_datum, encoder) 363 datum_writer = Tros::IO::DatumWriter.new(writers_schema) 364 datum_writer.write(response_datum, encoder) 365 end