class Avro::IPC::Requestor

Base class for the client side of a protocol interaction.

Attributes

local_protocol[R]
remote_hash[R]
remote_protocol[R]
send_protocol[RW]
transport[R]

Public Class Methods

new(local_protocol, transport) click to toggle source
   # File lib/avro/ipc.rb
86 def initialize(local_protocol, transport)
87   @local_protocol = local_protocol
88   @transport = transport
89   @remote_protocol = nil
90   @remote_hash = nil
91   @send_protocol = nil
92 end

Public Instance Methods

read_call_response(message_name, decoder) click to toggle source
    # File lib/avro/ipc.rb
194 def read_call_response(message_name, decoder)
195   # The format of a call response is:
196   #   * response metadata, a map with values of type bytes
197   #   * a one-byte error flag boolean, followed by either:
198   #     * if the error flag is false,
199   #       the message response, serialized per the message's response schema.
200   #     * if the error flag is true,
201   #       the error, serialized per the message's error union schema.
202   _response_metadata = META_READER.read(decoder)
203 
204   # remote response schema
205   remote_message_schema = remote_protocol.messages[message_name]
206   raise AvroError.new("Unknown remote message: #{message_name}") unless remote_message_schema
207 
208   # local response schema
209   local_message_schema = local_protocol.messages[message_name]
210   unless local_message_schema
211     raise AvroError.new("Unknown local message: #{message_name}")
212   end
213 
214   # error flag
215   if !decoder.read_boolean
216     writers_schema = remote_message_schema.response
217     readers_schema = local_message_schema.response
218     read_response(writers_schema, readers_schema, decoder)
219   else
220     writers_schema = remote_message_schema.errors || SYSTEM_ERROR_SCHEMA
221     readers_schema = local_message_schema.errors || SYSTEM_ERROR_SCHEMA
222     raise read_error(writers_schema, readers_schema, decoder)
223   end
224 end
read_error(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/ipc.rb
231 def read_error(writers_schema, readers_schema, decoder)
232   datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
233   AvroRemoteError.new(datum_reader.read(decoder))
234 end
read_handshake_response(decoder) click to toggle source
    # File lib/avro/ipc.rb
168 def read_handshake_response(decoder)
169   handshake_response = HANDSHAKE_REQUESTOR_READER.read(decoder)
170   we_have_matching_schema = false
171 
172   case handshake_response['match']
173   when 'BOTH'
174     self.send_protocol = false
175     we_have_matching_schema = true
176   when 'CLIENT'
177     raise AvroError.new('Handshake failure. match == CLIENT') if send_protocol
178     self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol'])
179     self.remote_hash = handshake_response['serverHash']
180     self.send_protocol = false
181     we_have_matching_schema = true
182   when 'NONE'
183     raise AvroError.new('Handshake failure. match == NONE') if send_protocol
184     self.remote_protocol = Avro::Protocol.parse(handshake_response['serverProtocol'])
185     self.remote_hash = handshake_response['serverHash']
186     self.send_protocol = true
187   else
188     raise AvroError.new("Unexpected match: #{match}")
189   end
190 
191   return we_have_matching_schema
192 end
read_response(writers_schema, readers_schema, decoder) click to toggle source
    # File lib/avro/ipc.rb
226 def read_response(writers_schema, readers_schema, decoder)
227   datum_reader = Avro::IO::DatumReader.new(writers_schema, readers_schema)
228   datum_reader.read(decoder)
229 end
remote_hash=(new_remote_hash) click to toggle source
    # File lib/avro/ipc.rb
 99 def remote_hash=(new_remote_hash)
100   @remote_hash = new_remote_hash
101   REMOTE_HASHES[transport.remote_name] = remote_hash
102 end
remote_protocol=(new_remote_protocol) click to toggle source
   # File lib/avro/ipc.rb
94 def remote_protocol=(new_remote_protocol)
95   @remote_protocol = new_remote_protocol
96   REMOTE_PROTOCOLS[transport.remote_name] = remote_protocol
97 end
request(message_name, request_datum) click to toggle source
    # File lib/avro/ipc.rb
104 def request(message_name, request_datum)
105   # Writes a request message and reads a response or error message.
106   # build handshake and call request
107   buffer_writer = StringIO.new(String.new('', encoding: 'BINARY'))
108   buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
109   write_handshake_request(buffer_encoder)
110   write_call_request(message_name, request_datum, buffer_encoder)
111 
112   # send the handshake and call request;  block until call response
113   call_request = buffer_writer.string
114   call_response = transport.transceive(call_request)
115 
116   # process the handshake and call response
117   buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_response))
118   if read_handshake_response(buffer_decoder)
119     read_call_response(message_name, buffer_decoder)
120   else
121     request(message_name, request_datum)
122   end
123 end
write_call_request(message_name, request_datum, encoder) click to toggle source
    # File lib/avro/ipc.rb
143 def write_call_request(message_name, request_datum, encoder)
144   # The format of a call request is:
145   #   * request metadata, a map with values of type bytes
146   #   * the message name, an Avro string, followed by
147   #   * the message parameters. Parameters are serialized according to
148   #     the message's request declaration.
149 
150   # TODO request metadata (not yet implemented)
151   request_metadata = {}
152   META_WRITER.write(request_metadata, encoder)
153 
154   message = local_protocol.messages[message_name]
155   unless message
156     raise AvroError, "Unknown message: #{message_name}"
157   end
158   encoder.write_string(message.name)
159 
160   write_request(message.request, request_datum, encoder)
161 end
write_handshake_request(encoder) click to toggle source
    # File lib/avro/ipc.rb
125 def write_handshake_request(encoder)
126   local_hash = local_protocol.md5
127   remote_name = transport.remote_name
128   remote_hash = REMOTE_HASHES[remote_name]
129   unless remote_hash
130     remote_hash = local_hash
131     self.remote_protocol = local_protocol
132   end
133   request_datum = {
134     'clientHash' => local_hash,
135     'serverHash' => remote_hash
136   }
137   if send_protocol
138     request_datum['clientProtocol'] = local_protocol.to_s
139   end
140   HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder)
141 end
write_request(request_schema, request_datum, encoder) click to toggle source
    # File lib/avro/ipc.rb
163 def write_request(request_schema, request_datum, encoder)
164   datum_writer = Avro::IO::DatumWriter.new(request_schema)
165   datum_writer.write(request_datum, encoder)
166 end