class Protocol::HTTP2::Stream

A single HTTP 2.0 connection can multiplex multiple streams in parallel: multiple requests and responses can be in flight simultaneously and stream data can be interleaved and prioritized.

This class encapsulates all of the state, transition, flow-control, and error management as defined by the HTTP 2.0 specification. All you have to do is subscribe to appropriate events (marked with “:” prefix in diagram below) and provide your application logic to handle request and response processing.

                  +--------+
          send PP |        | recv PP
         ,--------|  idle  |--------.
        /         |        |         \
       v          +--------+          v
+----------+          |           +----------+
|          |          | send H /  |          |

,——| reserved | | recv H | reserved |——. | | (local) | | | (remote) | | | ---------- v ---------- | | | -------- | | | | recv ES | | send ES | | | send H | ,——-| open |——-. | recv H | | | / | | \ | | | v v -------- v v | | ---------- | ---------- | | | half | | | half | | | | closed | | send R / | closed | | | | (remote) | | recv R | (local) | | | ---------- | ---------- | | | | | | | | send ES / | recv ES / | | | | send R / v send R / | | | | recv R -------- recv R | | | send R / `———–>| |<———–' send R / | | recv R | closed | recv R | `———————–>| |<———————-'

                      +--------+

send:   endpoint sends this frame
recv:   endpoint receives this frame

H:  HEADERS frame (with implied CONTINUATIONs)
PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
ES: END_STREAM flag
R:  RST_STREAM frame

State transition methods use a trailing “!”.

Attributes

connection[R]

The connection this stream belongs to.

dependency[R]
id[R]

Stream ID (odd for client initiated streams, even otherwise).

local_window[R]
remote_window[R]
state[RW]

Stream state, e.g. `idle`, `closed`.

Public Class Methods

create(connection, id) click to toggle source
# File lib/protocol/http2/stream.rb, line 77
def self.create(connection, id)
        stream = self.new(connection, id)
        
        connection.streams[id] = stream
        
        return stream
end
new(connection, id, state = :idle) click to toggle source
# File lib/protocol/http2/stream.rb, line 85
def initialize(connection, id, state = :idle)
        @connection = connection
        @id = id
        
        @state = state
        
        @local_window = Window.new(@connection.local_settings.initial_window_size)
        @remote_window = Window.new(@connection.remote_settings.initial_window_size)
        
        @dependency = Dependency.create(@connection, @id)
end

Public Instance Methods

accept_push_promise_stream(stream_id, headers) click to toggle source

Override this function to implement your own push promise logic.

# File lib/protocol/http2/stream.rb, line 433
def accept_push_promise_stream(stream_id, headers)
        @connection.accept_push_promise_stream(stream_id)
end
active?() click to toggle source
# File lib/protocol/http2/stream.rb, line 135
def active?
        @state != :closed && @state != :idle
end
close(error = nil) click to toggle source

Transition directly to closed state. Do not pass go, do not collect $200. This method should only be used by `Connection#close`.

# File lib/protocol/http2/stream.rb, line 145
def close(error = nil)
        unless closed?
                @state = :closed
                self.closed(error)
        end
end
close!(error_code = nil) click to toggle source

Transition the stream into the closed state. @param error_code [Integer] the error code if the stream was closed due to a stream reset.

# File lib/protocol/http2/stream.rb, line 258
def close!(error_code = nil)
        @state = :closed
        @connection.delete(@id)
        
        if error_code
                error = StreamError.new("Stream closed!", error_code)
        end
        
        self.closed(error)
        
        return self
end
closed(error = nil) click to toggle source

The stream has been closed. If closed due to a stream reset, the error will be set.

# File lib/protocol/http2/stream.rb, line 253
def closed(error = nil)
end
closed?() click to toggle source
# File lib/protocol/http2/stream.rb, line 139
def closed?
        @state == :closed
end
consume_remote_window(frame) click to toggle source
# File lib/protocol/http2/stream.rb, line 201
def consume_remote_window(frame)
        super
        
        @connection.consume_remote_window(frame)
end
create_push_promise_stream(headers) click to toggle source

Override this function to implement your own push promise logic.

# File lib/protocol/http2/stream.rb, line 413
def create_push_promise_stream(headers)
        @connection.create_push_promise_stream
end
ignore_data(frame) click to toggle source
# File lib/protocol/http2/stream.rb, line 336
def ignore_data(frame)
        # Async.logger.warn(self) {"Received headers in state: #{@state}!"}
end
inspect() click to toggle source
# File lib/protocol/http2/stream.rb, line 448
def inspect
        "\#<#{self.class} id=#{@id} state=#{@state}>"
end
maximum_frame_size() click to toggle source
# File lib/protocol/http2/stream.rb, line 127
def maximum_frame_size
        @connection.available_frame_size
end
open!() click to toggle source
# File lib/protocol/http2/stream.rb, line 240
def open!
        if @state == :idle
                @state = :open
        else
                raise ProtocolError, "Cannot open stream in state: #{@state}"
        end
        
        self.opened
        
        return self
end
opened(error = nil) click to toggle source

The stream has been opened.

# File lib/protocol/http2/stream.rb, line 237
def opened(error = nil)
end
parent=(stream) click to toggle source
# File lib/protocol/http2/stream.rb, line 123
def parent=(stream)
        @dependency.parent = stream.dependency
end
priority() click to toggle source
# File lib/protocol/http2/stream.rb, line 115
def priority
        @dependency.priority
end
priority=(priority) click to toggle source
# File lib/protocol/http2/stream.rb, line 119
def priority= priority
        @dependency.priority = priority
end
process_data(frame) click to toggle source

@return [String] the data that was received.

# File lib/protocol/http2/stream.rb, line 332
def process_data(frame)
        frame.unpack
end
process_headers(frame) click to toggle source
# File lib/protocol/http2/stream.rb, line 284
def process_headers(frame)
        # Receiving request headers:
        priority, data = frame.unpack
        
        if priority
                @dependency.process_priority(priority)
        end
        
        @connection.decode_headers(data)
end
receive_data(frame) click to toggle source

DATA frames are subject to flow control and can only be sent when a stream is in the “open” or “half-closed (remote)” state. The entire DATA frame payload is included in flow control, including the Pad Length and Padding fields if present. If a DATA frame is received whose stream is not in “open” or “half-closed (local)” state, the recipient MUST respond with a stream error of type STREAM_CLOSED.

# File lib/protocol/http2/stream.rb, line 341
def receive_data(frame)
        if @state == :open
                update_local_window(frame)
                
                if frame.end_stream?
                        @state = :half_closed_remote
                end
                
                process_data(frame)
        elsif @state == :half_closed_local
                update_local_window(frame)
                
                process_data(frame)
                
                if frame.end_stream?
                        close!
                end
        elsif self.closed?
                ignore_data(frame)
        else
                # If a DATA frame is received whose stream is not in "open" or "half-closed (local)" state, the recipient MUST respond with a stream error (Section 5.4.2) of type STREAM_CLOSED.
                self.send_reset_stream(Error::STREAM_CLOSED)
        end
end
receive_headers(frame) click to toggle source
# File lib/protocol/http2/stream.rb, line 299
def receive_headers(frame)
        if @state == :idle
                if frame.end_stream?
                        @state = :half_closed_remote
                else
                        open!
                end
                
                process_headers(frame)
        elsif @state == :reserved_remote
                process_headers(frame)
                
                @state = :half_closed_local
        elsif @state == :open
                process_headers(frame)
                
                if frame.end_stream?
                        @state = :half_closed_remote
                end
        elsif @state == :half_closed_local
                process_headers(frame)
                
                if frame.end_stream?
                        close!
                end
        elsif self.closed?
                ignore_headers(frame)
        else
                self.send_reset_stream(Error::STREAM_CLOSED)
        end
end
receive_push_promise(frame) click to toggle source
# File lib/protocol/http2/stream.rb, line 437
def receive_push_promise(frame)
        promised_stream_id, data = frame.unpack
        headers = @connection.decode_headers(data)
        
        stream = self.accept_push_promise_stream(promised_stream_id, headers)
        stream.parent = self
        stream.reserved_remote!
        
        return stream, headers
end
receive_reset_stream(frame) click to toggle source
# File lib/protocol/http2/stream.rb, line 366
def receive_reset_stream(frame)
        if @state == :idle
                # If a RST_STREAM frame identifying an idle stream is received, the recipient MUST treat this as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
                raise ProtocolError, "Cannot receive reset stream in state: #{@state}!"
        else
                error_code = frame.unpack
                
                close!(error_code)
                
                return error_code
        end
end
reserved_local!() click to toggle source
# File lib/protocol/http2/stream.rb, line 396
def reserved_local!
        if @state == :idle
                @state = :reserved_local
        else
                raise ProtocolError, "Cannot reserve stream in state: #{@state}"
        end
end
reserved_remote!() click to toggle source
# File lib/protocol/http2/stream.rb, line 404
def reserved_remote!
        if @state == :idle
                @state = :reserved_remote
        else
                raise ProtocolError, "Cannot reserve stream in state: #{@state}"
        end
end
send_data(*arguments, **options) click to toggle source
# File lib/protocol/http2/stream.rb, line 218
def send_data(*arguments, **options)
        if @state == :open
                frame = write_data(*arguments, **options)
                
                if frame.end_stream?
                        @state = :half_closed_local
                end
        elsif @state == :half_closed_remote
                frame = write_data(*arguments, **options)
                
                if frame.end_stream?
                        close!
                end
        else
                raise ProtocolError, "Cannot send data in state: #{@state}"
        end
end
send_headers(*arguments) click to toggle source

The HEADERS frame is used to open a stream, and additionally carries a header block fragment. HEADERS frames can be sent on a stream in the “idle”, “reserved (local)”, “open”, or “half-closed (remote)” state.

# File lib/protocol/http2/stream.rb, line 171
def send_headers(*arguments)
        if @state == :idle
                frame = write_headers(*arguments)
                
                if frame.end_stream?
                        @state = :half_closed_local
                else
                        open!
                end
        elsif @state == :reserved_local
                frame = write_headers(*arguments)
                
                @state = :half_closed_remote
        elsif @state == :open
                frame = write_headers(*arguments)
                
                if frame.end_stream?
                        @state = :half_closed_local
                end
        elsif @state == :half_closed_remote
                frame = write_headers(*arguments)
                
                if frame.end_stream?
                        close!
                end
        else
                raise ProtocolError, "Cannot send headers in state: #{@state}"
        end
end
send_headers?() click to toggle source
# File lib/protocol/http2/stream.rb, line 152
def send_headers?
        @state == :idle or @state == :reserved_local or @state == :open or @state == :half_closed_remote
end
send_push_promise(headers) click to toggle source

Server push is semantically equivalent to a server responding to a request; however, in this case, that request is also sent by the server, as a PUSH_PROMISE frame. @param headers [Hash] contains a complete set of request header fields that the server attributes to the request.

# File lib/protocol/http2/stream.rb, line 419
def send_push_promise(headers)
        if @state == :open or @state == :half_closed_remote
                promised_stream = self.create_push_promise_stream(headers)
                promised_stream.reserved_local!
                
                write_push_promise(promised_stream.id, headers)
                
                return promised_stream
        else
                raise ProtocolError, "Cannot send push promise in state: #{@state}"
        end
end
send_reset_stream(error_code = 0) click to toggle source
# File lib/protocol/http2/stream.rb, line 271
def send_reset_stream(error_code = 0)
        if @state != :idle and @state != :closed
                frame = ResetStreamFrame.new(@id)
                frame.pack(error_code)
                
                write_frame(frame)
                
                close!
        else
                raise ProtocolError, "Cannot send reset stream (#{error_code}) in state: #{@state}"
        end
end
to_s() click to toggle source
# File lib/protocol/http2/stream.rb, line 452
def to_s
        inspect
end
weight() click to toggle source
# File lib/protocol/http2/stream.rb, line 111
def weight
        @dependency.weight
end
write_frame(frame) click to toggle source
# File lib/protocol/http2/stream.rb, line 131
def write_frame(frame)
        @connection.write_frame(frame)
end

Protected Instance Methods

ignore_headers(frame) click to toggle source
# File lib/protocol/http2/stream.rb, line 295
          def ignore_headers(frame)
        # Async.logger.warn(self) {"Received headers in state: #{@state}!"}
end

Private Instance Methods

write_data(data, flags = 0, **options) click to toggle source
# File lib/protocol/http2/stream.rb, line 207
        def write_data(data, flags = 0, **options)
        frame = DataFrame.new(@id, flags)
        frame.pack(data, **options)
        
        # This might fail if the data payload was too big:
        consume_remote_window(frame)
        write_frame(frame)
        
        return frame
end
write_headers(priority, headers, flags = 0) click to toggle source
# File lib/protocol/http2/stream.rb, line 156
        def write_headers(priority, headers, flags = 0)
        frame = HeadersFrame.new(@id, flags)
        
        @connection.write_frames do |framer|
                data = @connection.encode_headers(headers)
                
                frame.pack(priority, data, maximum_size: @connection.maximum_frame_size)
                
                framer.write_frame(frame)
        end
        
        return frame
end
write_push_promise(stream_id, headers, flags = 0, **options) click to toggle source

A normal request is client request -> server response -> client. A push promise is server request -> client -> server response -> client. The server generates the same set of headers as if the client was sending a request, and sends these to the client. The client can reject the request by resetting the (new) stream. Otherwise, the server will start sending a response as if the client had send the request.

# File lib/protocol/http2/stream.rb, line 382
        def write_push_promise(stream_id, headers, flags = 0, **options)
        frame = PushPromiseFrame.new(@id, flags)
        
        @connection.write_frames do |framer|
                data = @connection.encode_headers(headers)
                
                frame.pack(stream_id, data, maximum_size: @connection.maximum_frame_size)
                
                framer.write_frame(frame)
        end
        
        return frame
end