class Avro::IPC::SocketTransport
Attributes
protocol[RW]
remote_name[R]
A simple socket-based Transport implementation.
sock[R]
A simple socket-based Transport implementation.
Public Class Methods
new(sock)
click to toggle source
# File lib/avro/ipc.rb 384 def initialize(sock) 385 @sock = sock 386 @protocol = nil 387 end
Public Instance Methods
close()
click to toggle source
# File lib/avro/ipc.rb 461 def close 462 sock.close 463 end
is_connected?()
click to toggle source
# File lib/avro/ipc.rb 389 def is_connected?() 390 !!@protocol 391 end
read_buffer_length()
click to toggle source
# File lib/avro/ipc.rb 453 def read_buffer_length 454 read = sock.read(BUFFER_HEADER_LENGTH) 455 if read == '' || read == nil 456 raise ConnectionClosedException.new("Socket read 0 bytes.") 457 end 458 read.unpack('N')[0] 459 end
read_framed_message()
click to toggle source
# File lib/avro/ipc.rb 398 def read_framed_message 399 message = [] 400 loop do 401 buffer = StringIO.new(String.new('', encoding: 'BINARY')) 402 buffer_length = read_buffer_length 403 if buffer_length == 0 404 return message.join 405 end 406 while buffer.tell < buffer_length 407 chunk = sock.read(buffer_length - buffer.tell) 408 if chunk == '' 409 raise ConnectionClosedException.new("Socket read 0 bytes.") 410 end 411 buffer.write(chunk) 412 end 413 message << buffer.string 414 end 415 end
transceive(request)
click to toggle source
# File lib/avro/ipc.rb 393 def transceive(request) 394 write_framed_message(request) 395 read_framed_message 396 end
write_buffer(chunk)
click to toggle source
# File lib/avro/ipc.rb 433 def write_buffer(chunk) 434 buffer_length = chunk.bytesize 435 write_buffer_length(buffer_length) 436 total_bytes_sent = 0 437 while total_bytes_sent < buffer_length 438 bytes_sent = self.sock.write(chunk[total_bytes_sent..-1]) 439 if bytes_sent == 0 440 raise ConnectionClosedException.new("Socket sent 0 bytes.") 441 end 442 total_bytes_sent += bytes_sent 443 end 444 end
write_buffer_length(n)
click to toggle source
# File lib/avro/ipc.rb 446 def write_buffer_length(n) 447 bytes_sent = sock.write([n].pack('N')) 448 if bytes_sent == 0 449 raise ConnectionClosedException.new("socket sent 0 bytes") 450 end 451 end
write_framed_message(message)
click to toggle source
# File lib/avro/ipc.rb 417 def write_framed_message(message) 418 message_length = message.bytesize 419 total_bytes_sent = 0 420 while message_length - total_bytes_sent > 0 421 if message_length - total_bytes_sent > BUFFER_SIZE 422 buffer_length = BUFFER_SIZE 423 else 424 buffer_length = message_length - total_bytes_sent 425 end 426 write_buffer(message[total_bytes_sent,buffer_length]) 427 total_bytes_sent += buffer_length 428 end 429 # A message is always terminated by a zero-length buffer. 430 write_buffer_length(0) 431 end