class Async::IO::Stream

Constants

BLOCK_SIZE

Attributes

block_size[R]
io[R]

Public Class Methods

new(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) click to toggle source
# File lib/async/io/stream.rb, line 45
def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false)
        @io = io
        @eof = false
        
        @pending = 0
        # This field is ignored, but used to mean, try to buffer packets in a single iteration of the reactor.
        # @deferred = deferred
        
        @writing = Async::Semaphore.new(1)
        
        # We don't want Ruby to do any IO buffering.
        @io.sync = sync
        
        @block_size = block_size
        @maximum_read_size = maximum_read_size
        
        @read_buffer = Buffer.new
        @write_buffer = Buffer.new
        @drain_buffer = Buffer.new
        
        # Used as destination buffer for underlying reads.
        @input_buffer = Buffer.new
end
open(path, mode = "r+", **options) { |stream| ... } click to toggle source
# File lib/async/io/stream.rb, line 33
def self.open(path, mode = "r+", **options)
        stream = self.new(File.open(path, mode), **options)
        
        return stream unless block_given?
        
        begin
                yield stream
        ensure
                stream.close
        end
end

Public Instance Methods

<<(string) click to toggle source

Writes `string` to the stream and returns self.

# File lib/async/io/stream.rb, line 183
def <<(string)
        write(string)
        
        return self
end
close() click to toggle source

Best effort to flush any unwritten data, and then close the underling IO.

# File lib/async/io/stream.rb, line 216
def close
        return if @io.closed?
        
        begin
                flush
        rescue
                # We really can't do anything here unless we want #close to raise exceptions.
        ensure
                @io.close
        end
end
close_read() click to toggle source
# File lib/async/io/stream.rb, line 205
def close_read
        @io.close_read
end
close_write() click to toggle source
# File lib/async/io/stream.rb, line 209
def close_write
        flush
ensure
        @io.close_write
end
closed?() click to toggle source
# File lib/async/io/stream.rb, line 201
def closed?
        @io.closed?
end
connected?() click to toggle source
# File lib/async/io/stream.rb, line 197
def connected?
        @io.connected?
end
eof()
Alias for: eof?
eof!() click to toggle source
# File lib/async/io/stream.rb, line 241
def eof!
        @read_buffer.clear
        @eof = true
        
        raise EOFError
end
eof?() click to toggle source

Returns true if the stream is at file which means there is no more data to be read.

# File lib/async/io/stream.rb, line 229
def eof?
        if !@read_buffer.empty?
                return false
        elsif @eof
                return true
        else
                return @io.eof?
        end
end
Also aliased as: eof
flush() click to toggle source

Flushes buffered data to the stream.

# File lib/async/io/stream.rb, line 152
def flush
        return if @write_buffer.empty?
        
        @writing.acquire do
                # Flip the write buffer and drain buffer:
                @write_buffer, @drain_buffer = @drain_buffer, @write_buffer
                
                begin
                        @io.write(@drain_buffer)
                ensure
                        # If the write operation fails, we still need to clear this buffer, and the data is essentially lost.
                        @drain_buffer.clear
                end
        end
end
gets(separator = $/, **options) click to toggle source
# File lib/async/io/stream.rb, line 147
def gets(separator = $/, **options)
        read_until(separator, **options)
end
peek() click to toggle source
# File lib/async/io/stream.rb, line 141
def peek
        until yield(@read_buffer) or @eof
                fill_read_buffer
        end
end
puts(*arguments, separator: $/) click to toggle source
# File lib/async/io/stream.rb, line 189
def puts(*arguments, separator: $/)
        arguments.each do |argument|
                @write_buffer << argument << separator
        end
        
        flush
end
read(size = nil) click to toggle source

Reads `size` bytes from the stream. If size is not specified, read until end of file.

# File lib/async/io/stream.rb, line 74
def read(size = nil)
        return String.new(encoding: Encoding::BINARY) if size == 0
        
        if size
                until @eof or @read_buffer.bytesize >= size
                        # Compute the amount of data we need to read from the underlying stream:
                        read_size = size - @read_buffer.bytesize
                        
                        # Don't read less than @block_size to avoid lots of small reads:
                        fill_read_buffer(read_size > @block_size ? read_size : @block_size)
                end
        else
                until @eof
                        fill_read_buffer
                end
        end
        
        return consume_read_buffer(size)
end
read_exactly(size, exception: EOFError) click to toggle source
# File lib/async/io/stream.rb, line 105
def read_exactly(size, exception: EOFError)
        if buffer = read(size)
                if buffer.bytesize != size
                        raise exception, "could not read enough data"
                end
                
                return buffer
        end
        
        raise exception, "encountered eof while reading data"
end
read_partial(size = nil) click to toggle source

Read at most `size` bytes from the stream. Will avoid reading from the underlying stream if possible.

# File lib/async/io/stream.rb, line 95
def read_partial(size = nil)
        return String.new(encoding: Encoding::BINARY) if size == 0

        if !@eof and @read_buffer.empty?
                fill_read_buffer
        end
        
        return consume_read_buffer(size)
end
Also aliased as: readpartial
read_until(pattern, offset = 0, chomp: true) click to toggle source

Efficiently read data from the stream until encountering pattern. @param pattern [String] The pattern to match. @return [String] The contents of the stream up until the pattern, which is consumed but not returned.

# File lib/async/io/stream.rb, line 122
def read_until(pattern, offset = 0, chomp: true)
        # We don't want to split on the pattern, so we subtract the size of the pattern.
        split_offset = pattern.bytesize - 1
        
        until index = @read_buffer.index(pattern, offset)
                offset = @read_buffer.bytesize - split_offset
                
                offset = 0 if offset < 0
                
                return unless fill_read_buffer
        end
        
        @read_buffer.freeze
        matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize))
        @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize)
        
        return matched
end
readpartial(size = nil)
Alias for: read_partial
write(string) click to toggle source

Writes `string` to the buffer. When the buffer is full or sync is true the buffer is flushed to the underlying `io`. @param string the string to write to the buffer. @return the number of bytes appended to the buffer.

# File lib/async/io/stream.rb, line 172
def write(string)
        @write_buffer << string
        
        if @write_buffer.bytesize >= @block_size
                flush
        end
        
        return string.bytesize
end

Private Instance Methods

consume_read_buffer(size = nil) click to toggle source

Consumes at most `size` bytes from the buffer. @param size [Integer|nil] The amount of data to consume. If nil, consume entire buffer.

# File lib/async/io/stream.rb, line 281
def consume_read_buffer(size = nil)
        # If we are at eof, and the read buffer is empty, we can't consume anything.
        return nil if @eof && @read_buffer.empty?
        
        result = nil
        
        if size.nil? or size >= @read_buffer.bytesize
                # Consume the entire read buffer:
                result = @read_buffer
                @read_buffer = Buffer.new
        else
                # This approach uses more memory.
                # result = @read_buffer.slice!(0, size)
                
                # We know that we are not going to reuse the original buffer.
                # But byteslice will generate a hidden copy. So let's freeze it first:
                @read_buffer.freeze
                
                result = @read_buffer.byteslice(0, size)
                @read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize)
        end
        
        return result
end
fill_read_buffer(size = @block_size) click to toggle source

Fills the buffer from the underlying stream.

# File lib/async/io/stream.rb, line 251
def fill_read_buffer(size = @block_size)
        # We impose a limit because the underlying `read` system call can fail if we request too much data in one go.
        if size > @maximum_read_size
                size = @maximum_read_size
        end
        
        # This effectively ties the input and output stream together.
        flush
        
        if @read_buffer.empty?
                if @io.read_nonblock(size, @read_buffer, exception: false)
                        # Console.logger.debug(self, name: "read") {@read_buffer.inspect}
                        return true
                end
        else
                if chunk = @io.read_nonblock(size, @input_buffer, exception: false)
                        @read_buffer << chunk
                        # Console.logger.debug(self, name: "read") {@read_buffer.inspect}
                        
                        return true
                end
        end
        
        # else for both cases above:
        @eof = true
        return false
end