class Avro::IPC::SocketTransport

Attributes

protocol[RW]
remote_name[R]

A simple socket-based Transport implementation.

sock[R]

A simple socket-based Transport implementation.

Public Class Methods

new(sock) click to toggle source
    # File lib/avro/ipc.rb
380 def initialize(sock)
381   @sock = sock
382   @protocol = nil
383 end

Public Instance Methods

close() click to toggle source
    # File lib/avro/ipc.rb
457 def close
458   sock.close
459 end
is_connected?() click to toggle source
    # File lib/avro/ipc.rb
385 def is_connected?()
386   !!@protocol
387 end
read_buffer_length() click to toggle source
    # File lib/avro/ipc.rb
449 def read_buffer_length
450   read = sock.read(BUFFER_HEADER_LENGTH)
451   if read == '' || read == nil
452     raise ConnectionClosedException.new("Socket read 0 bytes.")
453   end
454   read.unpack('N')[0]
455 end
read_framed_message() click to toggle source
    # File lib/avro/ipc.rb
394 def read_framed_message
395   message = []
396   loop do
397     buffer = StringIO.new(''.force_encoding('BINARY'))
398     buffer_length = read_buffer_length
399     if buffer_length == 0
400       return message.join
401     end
402     while buffer.tell < buffer_length
403       chunk = sock.read(buffer_length - buffer.tell)
404       if chunk == ''
405         raise ConnectionClosedException.new("Socket read 0 bytes.")
406       end
407       buffer.write(chunk)
408     end
409     message << buffer.string
410   end
411 end
transceive(request) click to toggle source
    # File lib/avro/ipc.rb
389 def transceive(request)
390   write_framed_message(request)
391   read_framed_message
392 end
write_buffer(chunk) click to toggle source
    # File lib/avro/ipc.rb
429 def write_buffer(chunk)
430   buffer_length = chunk.bytesize
431   write_buffer_length(buffer_length)
432   total_bytes_sent = 0
433   while total_bytes_sent < buffer_length
434     bytes_sent = self.sock.write(chunk[total_bytes_sent..-1])
435     if bytes_sent == 0
436       raise ConnectionClosedException.new("Socket sent 0 bytes.")
437     end
438     total_bytes_sent += bytes_sent
439   end
440 end
write_buffer_length(n) click to toggle source
    # File lib/avro/ipc.rb
442 def write_buffer_length(n)
443   bytes_sent = sock.write([n].pack('N'))
444   if bytes_sent == 0
445     raise ConnectionClosedException.new("socket sent 0 bytes")
446   end
447 end
write_framed_message(message) click to toggle source
    # File lib/avro/ipc.rb
413 def write_framed_message(message)
414   message_length = message.bytesize
415   total_bytes_sent = 0
416   while message_length - total_bytes_sent > 0
417     if message_length - total_bytes_sent > BUFFER_SIZE
418       buffer_length = BUFFER_SIZE
419     else
420       buffer_length = message_length - total_bytes_sent
421     end
422     write_buffer(message[total_bytes_sent,buffer_length])
423     total_bytes_sent += buffer_length
424   end
425   # A message is always terminated by a zero-length buffer.
426   write_buffer_length(0)
427 end