class Avro::IPC::SocketTransport

Attributes

protocol[RW]
remote_name[R]

A simple socket-based Transport implementation.

sock[R]

A simple socket-based Transport implementation.

Public Class Methods

new(sock) click to toggle source
    # File lib/avro/ipc.rb
384 def initialize(sock)
385   @sock = sock
386   @protocol = nil
387 end

Public Instance Methods

close() click to toggle source
    # File lib/avro/ipc.rb
461 def close
462   sock.close
463 end
is_connected?() click to toggle source
    # File lib/avro/ipc.rb
389 def is_connected?()
390   !!@protocol
391 end
read_buffer_length() click to toggle source
    # File lib/avro/ipc.rb
453 def read_buffer_length
454   read = sock.read(BUFFER_HEADER_LENGTH)
455   if read == '' || read == nil
456     raise ConnectionClosedException.new("Socket read 0 bytes.")
457   end
458   read.unpack('N')[0]
459 end
read_framed_message() click to toggle source
    # File lib/avro/ipc.rb
398 def read_framed_message
399   message = []
400   loop do
401     buffer = StringIO.new(String.new('', encoding: 'BINARY'))
402     buffer_length = read_buffer_length
403     if buffer_length == 0
404       return message.join
405     end
406     while buffer.tell < buffer_length
407       chunk = sock.read(buffer_length - buffer.tell)
408       if chunk == ''
409         raise ConnectionClosedException.new("Socket read 0 bytes.")
410       end
411       buffer.write(chunk)
412     end
413     message << buffer.string
414   end
415 end
transceive(request) click to toggle source
    # File lib/avro/ipc.rb
393 def transceive(request)
394   write_framed_message(request)
395   read_framed_message
396 end
write_buffer(chunk) click to toggle source
    # File lib/avro/ipc.rb
433 def write_buffer(chunk)
434   buffer_length = chunk.bytesize
435   write_buffer_length(buffer_length)
436   total_bytes_sent = 0
437   while total_bytes_sent < buffer_length
438     bytes_sent = self.sock.write(chunk[total_bytes_sent..-1])
439     if bytes_sent == 0
440       raise ConnectionClosedException.new("Socket sent 0 bytes.")
441     end
442     total_bytes_sent += bytes_sent
443   end
444 end
write_buffer_length(n) click to toggle source
    # File lib/avro/ipc.rb
446 def write_buffer_length(n)
447   bytes_sent = sock.write([n].pack('N'))
448   if bytes_sent == 0
449     raise ConnectionClosedException.new("socket sent 0 bytes")
450   end
451 end
write_framed_message(message) click to toggle source
    # File lib/avro/ipc.rb
417 def write_framed_message(message)
418   message_length = message.bytesize
419   total_bytes_sent = 0
420   while message_length - total_bytes_sent > 0
421     if message_length - total_bytes_sent > BUFFER_SIZE
422       buffer_length = BUFFER_SIZE
423     else
424       buffer_length = message_length - total_bytes_sent
425     end
426     write_buffer(message[total_bytes_sent,buffer_length])
427     total_bytes_sent += buffer_length
428   end
429   # A message is always terminated by a zero-length buffer.
430   write_buffer_length(0)
431 end