class Async::IO::Stream
Constants
- BLOCK_SIZE
Attributes
Public Class Methods
# File lib/async/io/stream.rb, line 45 def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) @io = io @eof = false @pending = 0 # This field is ignored, but used to mean, try to buffer packets in a single iteration of the reactor. # @deferred = deferred @writing = Async::Semaphore.new(1) # We don't want Ruby to do any IO buffering. @io.sync = sync @block_size = block_size @maximum_read_size = maximum_read_size @read_buffer = Buffer.new @write_buffer = Buffer.new @drain_buffer = Buffer.new # Used as destination buffer for underlying reads. @input_buffer = Buffer.new end
# File lib/async/io/stream.rb, line 33 def self.open(path, mode = "r+", **options) stream = self.new(File.open(path, mode), **options) return stream unless block_given? begin yield stream ensure stream.close end end
Public Instance Methods
Writes `string` to the stream and returns self.
# File lib/async/io/stream.rb, line 183 def <<(string) write(string) return self end
Best effort to flush any unwritten data, and then close the underling IO
.
# File lib/async/io/stream.rb, line 216 def close return if @io.closed? begin flush rescue # We really can't do anything here unless we want #close to raise exceptions. ensure @io.close end end
# File lib/async/io/stream.rb, line 205 def close_read @io.close_read end
# File lib/async/io/stream.rb, line 209 def close_write flush ensure @io.close_write end
# File lib/async/io/stream.rb, line 201 def closed? @io.closed? end
# File lib/async/io/stream.rb, line 197 def connected? @io.connected? end
# File lib/async/io/stream.rb, line 241 def eof! @read_buffer.clear @eof = true raise EOFError end
Returns true if the stream is at file which means there is no more data to be read.
# File lib/async/io/stream.rb, line 229 def eof? if !@read_buffer.empty? return false elsif @eof return true else return @io.eof? end end
Flushes buffered data to the stream.
# File lib/async/io/stream.rb, line 152 def flush return if @write_buffer.empty? @writing.acquire do # Flip the write buffer and drain buffer: @write_buffer, @drain_buffer = @drain_buffer, @write_buffer begin @io.write(@drain_buffer) ensure # If the write operation fails, we still need to clear this buffer, and the data is essentially lost. @drain_buffer.clear end end end
# File lib/async/io/stream.rb, line 147 def gets(separator = $/, **options) read_until(separator, **options) end
# File lib/async/io/stream.rb, line 141 def peek until yield(@read_buffer) or @eof fill_read_buffer end end
# File lib/async/io/stream.rb, line 189 def puts(*arguments, separator: $/) arguments.each do |argument| @write_buffer << argument << separator end flush end
Reads `size` bytes from the stream. If size is not specified, read until end of file.
# File lib/async/io/stream.rb, line 74 def read(size = nil) return String.new(encoding: Encoding::BINARY) if size == 0 if size until @eof or @read_buffer.bytesize >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end else until @eof fill_read_buffer end end return consume_read_buffer(size) end
# File lib/async/io/stream.rb, line 105 def read_exactly(size, exception: EOFError) if buffer = read(size) if buffer.bytesize != size raise exception, "could not read enough data" end return buffer end raise exception, "encountered eof while reading data" end
Read at most `size` bytes from the stream. Will avoid reading from the underlying stream if possible.
# File lib/async/io/stream.rb, line 95 def read_partial(size = nil) return String.new(encoding: Encoding::BINARY) if size == 0 if !@eof and @read_buffer.empty? fill_read_buffer end return consume_read_buffer(size) end
Efficiently read data from the stream until encountering pattern. @param pattern [String] The pattern to match. @return [String] The contents of the stream up until the pattern, which is consumed but not returned.
# File lib/async/io/stream.rb, line 122 def read_until(pattern, offset = 0, chomp: true) # We don't want to split on the pattern, so we subtract the size of the pattern. split_offset = pattern.bytesize - 1 until index = @read_buffer.index(pattern, offset) offset = @read_buffer.bytesize - split_offset offset = 0 if offset < 0 return unless fill_read_buffer end @read_buffer.freeze matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize)) @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) return matched end
Writes `string` to the buffer. When the buffer is full or sync is true the buffer is flushed to the underlying `io`. @param string the string to write to the buffer. @return the number of bytes appended to the buffer.
# File lib/async/io/stream.rb, line 172 def write(string) @write_buffer << string if @write_buffer.bytesize >= @block_size flush end return string.bytesize end
Private Instance Methods
Consumes at most `size` bytes from the buffer. @param size [Integer|nil] The amount of data to consume. If nil, consume entire buffer.
# File lib/async/io/stream.rb, line 281 def consume_read_buffer(size = nil) # If we are at eof, and the read buffer is empty, we can't consume anything. return nil if @eof && @read_buffer.empty? result = nil if size.nil? or size >= @read_buffer.bytesize # Consume the entire read buffer: result = @read_buffer @read_buffer = Buffer.new else # This approach uses more memory. # result = @read_buffer.slice!(0, size) # We know that we are not going to reuse the original buffer. # But byteslice will generate a hidden copy. So let's freeze it first: @read_buffer.freeze result = @read_buffer.byteslice(0, size) @read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize) end return result end
Fills the buffer from the underlying stream.
# File lib/async/io/stream.rb, line 251 def fill_read_buffer(size = @block_size) # We impose a limit because the underlying `read` system call can fail if we request too much data in one go. if size > @maximum_read_size size = @maximum_read_size end # This effectively ties the input and output stream together. flush if @read_buffer.empty? if @io.read_nonblock(size, @read_buffer, exception: false) # Console.logger.debug(self, name: "read") {@read_buffer.inspect} return true end else if chunk = @io.read_nonblock(size, @input_buffer, exception: false) @read_buffer << chunk # Console.logger.debug(self, name: "read") {@read_buffer.inspect} return true end end # else for both cases above: @eof = true return false end