class GrpcKit::Session::RecvBuffer

Public Class Methods

new() click to toggle source
# File lib/grpc_kit/session/recv_buffer.rb, line 8
def initialize
  @buffer = +''.b
  @end = false
  @queue = Queue.new
end

Public Instance Methods

close() click to toggle source

@return [void]

# File lib/grpc_kit/session/recv_buffer.rb, line 33
def close
  @queue.close
end
closed?() click to toggle source

@return [Boolean]

# File lib/grpc_kit/session/recv_buffer.rb, line 28
def closed?
  @queue.closed?
end
empty?() click to toggle source

@return [Boolean]

# File lib/grpc_kit/session/recv_buffer.rb, line 23
def empty?
  @queue.empty?
end
end_read() click to toggle source

@return [void]

# File lib/grpc_kit/session/recv_buffer.rb, line 81
def end_read
  @end = true
end
end_read?() click to toggle source

@return [Boolean]

# File lib/grpc_kit/session/recv_buffer.rb, line 76
def end_read?
  @end
end
read(size = nil, last: false, blocking:) click to toggle source

This method is not thread safe (as RecvBuffer is designed to be a multi-producer/single-consumer) @param size [Integer,nil] @param last [Boolean] @param blocking [Boolean] @return [String,Symbol,nil]

# File lib/grpc_kit/session/recv_buffer.rb, line 42
def read(size = nil, last: false, blocking:)
  if @buffer.empty?
    return nil if empty? && closed?
    return :wait_readable if empty? && !blocking

    # Consume existing data as much as possible to continue (important on clients where single-threaded)
    loop do
      begin
        data = @queue.shift(!blocking)
        @buffer << data if data
      rescue ThreadError
        break
      end

      break if empty?
    end 
  end

  buf = if size.nil? || @buffer.bytesize < size
    rbuf = @buffer
    @buffer = ''.b
    rbuf
  else
    @buffer.freeze
    rbuf = @buffer.byteslice(0, size)
    @buffer = @buffer.byteslice(size, @buffer.bytesize)
    rbuf
  end

  end_read if last
  buf
end
write(data) click to toggle source

@param data [String] @return [void]

# File lib/grpc_kit/session/recv_buffer.rb, line 16
def write(data)
  @queue << data
rescue ClosedQueueError
  raise Closed, "[BUG] write to closed queue"
end