class Avro::IPC::Requestor
Attributes
local_protocol[R]
remote_hash[RW]
remote_protocol[RW]
send_protocol[RW]
transport[R]
Public Class Methods
new(local_protocol, transport)
click to toggle source
# File lib/avro/ipc.rb 82 def initialize(local_protocol, transport) 83 @local_protocol = local_protocol 84 @transport = transport 85 @remote_protocol = nil 86 @remote_hash = nil 87 @send_protocol = nil 88 end
Public Instance Methods
read_call_response(message_name, decoder)
click to toggle source
# File lib/avro/ipc.rb 190 def read_call_response(message_name, decoder) 191 # The format of a call response is: 192 # * response metadata, a map with values of type bytes 193 # * a one-byte error flag boolean, followed by either: 194 # * if the error flag is false, 195 # the message response, serialized per the message's response schema. 196 # * if the error flag is true, 197 # the error, serialized per the message's error union schema. 198 response_metadata = META_READER.read(decoder) 199 200 # remote response schema 201 remote_message_schema = remote_protocol.messages[message_name] 202 raise AvroError.new("Unknown remote message: #{message_name}") unless remote_message_schema 203 204 # local response schema 205 local_message_schema = local_protocol.messages[message_name] 206 unless local_message_schema 207 raise AvroError.new("Unknown local message: #{message_name}") 208 end 209 210 # error flag 211 if !decoder.read_boolean 212 writers_schema = remote_message_schema.response 213 readers_schema = local_message_schema.response 214 read_response(writers_schema, readers_schema, decoder) 215 else 216 writers_schema = remote_message_schema.errors || SYSTEM_ERROR_SCHEMA 217 readers_schema = local_message_schema.errors || SYSTEM_ERROR_SCHEMA 218 raise read_error(writers_schema, readers_schema, decoder) 219 end 220 end
read_error(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/ipc.rb 227 def read_error(writers_schema, readers_schema, decoder) 228 datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) 229 AvroRemoteError.new(datum_reader.read(decoder)) 230 end
read_handshake_response(decoder)
click to toggle source
# File lib/avro/ipc.rb 164 def read_handshake_response(decoder) 165 handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder) 166 we_have_matching_schema = false 167 168 case handshake_response['match'] 169 when 'BOTH' 170 self.send_protocol = false 171 we_have_matching_schema = true 172 when 'CLIENT' 173 raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol 174 self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) 175 self.remote_hash = handshake_response['serverHash'] 176 self.send_protocol = false 177 we_have_matching_schema = true 178 when 'NONE' 179 raise AvroError.new('Handshake failure. match == NONE') if send_protocol 180 self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol']) 181 self.remote_hash = handshake_response['serverHash'] 182 self.send_protocol = true 183 else 184 raise AvroError.new("Unexpected match: #{match}") 185 end 186 187 return we_have_matching_schema 188 end
read_response(writers_schema, readers_schema, decoder)
click to toggle source
# File lib/avro/ipc.rb 222 def read_response(writers_schema, readers_schema, decoder) 223 datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema) 224 datum_reader.read(decoder) 225 end
remote_hash=(new_remote_hash)
click to toggle source
# File lib/avro/ipc.rb 95 def remote_hash=(new_remote_hash) 96 @remote_hash = new_remote_hash 97 REMOTE_HASHES[transport.remote_name] = remote_hash 98 end
remote_protocol=(new_remote_protocol)
click to toggle source
# File lib/avro/ipc.rb 90 def remote_protocol=(new_remote_protocol) 91 @remote_protocol = new_remote_protocol 92 REMOTE_PROTOCOLS[transport.remote_name] = remote_protocol 93 end
request(message_name, request_datum)
click to toggle source
# File lib/avro/ipc.rb 100 def request(message_name, request_datum) 101 # Writes a request message and reads a response or error message. 102 # build handshake and call request 103 buffer_writer = StringIO.new(''.force_encoding('BINARY')) 104 buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer) 105 write_handshake_request(buffer_encoder) 106 write_call_request(message_name, request_datum, buffer_encoder) 107 108 # send the handshake and call request; block until call response 109 call_request = buffer_writer.string 110 call_response = transport.transceive(call_request) 111 112 # process the handshake and call response 113 buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_response)) 114 if read_handshake_response(buffer_decoder) 115 read_call_response(message_name, buffer_decoder) 116 else 117 request(message_name, request_datum) 118 end 119 end
write_call_request(message_name, request_datum, encoder)
click to toggle source
# File lib/avro/ipc.rb 139 def write_call_request(message_name, request_datum, encoder) 140 # The format of a call request is: 141 # * request metadata, a map with values of type bytes 142 # * the message name, an Avro string, followed by 143 # * the message parameters. Parameters are serialized according to 144 # the message's request declaration. 145 146 # TODO request metadata (not yet implemented) 147 request_metadata = {} 148 META_WRITER.write(request_metadata, encoder) 149 150 message = local_protocol.messages[message_name] 151 unless message 152 raise AvroError, "Unknown message: #{message_name}" 153 end 154 encoder.write_string(message.name) 155 156 write_request(message.request, request_datum, encoder) 157 end
write_handshake_request(encoder)
click to toggle source
# File lib/avro/ipc.rb 121 def write_handshake_request(encoder) 122 local_hash = local_protocol.md5 123 remote_name = transport.remote_name 124 remote_hash = REMOTE_HASHES[remote_name] 125 unless remote_hash 126 remote_hash = local_hash 127 self.remote_protocol = local_protocol 128 end 129 request_datum = { 130 'clientHash' => local_hash, 131 'serverHash' => remote_hash 132 } 133 if send_protocol 134 request_datum['clientProtocol'] = local_protocol.to_s 135 end 136 HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder) 137 end
write_request(request_schema, request_datum, encoder)
click to toggle source
# File lib/avro/ipc.rb 159 def write_request(request_schema, request_datum, encoder) 160 datum_writer = Avro::IO::DatumWriter.new(request_schema) 161 datum_writer.write(request_datum, encoder) 162 end