class Protocol::HTTP2::Connection

Attributes

dependencies[R]
dependency[R]
framer[R]
local_settings[RW]

Current settings value for local and peer

local_window[R]

Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.

remote_settings[RW]
remote_stream_id[R]

The highest stream_id that has been successfully accepted by this connection.

remote_window[R]

Our window for sending data. When we send data, it reduces this window.

state[RW]

Connection state (:new, :open, :closed).

streams[R]

Public Class Methods

new(framer, local_stream_id) click to toggle source
Calls superclass method
# File lib/protocol/http2/connection.rb, line 32
def initialize(framer, local_stream_id)
        super()
        
        @state = :new
        
        # Hash(Integer, Stream)
        @streams = {}
        
        # Hash(Integer, Dependency)
        @dependency = Dependency.new(self, 0)
        @dependencies = {0 => @dependency}
        
        @framer = framer
        
        # The next stream id to use:
        @local_stream_id = local_stream_id
        
        # The biggest remote stream id seen thus far:
        @remote_stream_id = 0
        
        @local_settings = PendingSettings.new
        @remote_settings = Settings.new
        
        @decoder = HPACK::Context.new
        @encoder = HPACK::Context.new
        
        @local_window = LocalWindow.new
        @remote_window = Window.new
end

Public Instance Methods

[](id) click to toggle source
# File lib/protocol/http2/connection.rb, line 66
def [] id
        if id.zero?
                self
        else
                @streams[id]
        end
end
accept_push_promise_stream(stream_id, &block) click to toggle source

Accept an incoming push promise from the other side of the connection. On the client side, we accept push promise streams. On the server side, existing streams create push promise streams.

# File lib/protocol/http2/connection.rb, line 347
def accept_push_promise_stream(stream_id, &block)
        accept_stream(stream_id, &block)
end
accept_stream(stream_id, &block) click to toggle source

Accept an incoming stream from the other side of the connnection. On the server side, we accept requests.

# File lib/protocol/http2/connection.rb, line 336
def accept_stream(stream_id, &block)
        unless valid_remote_stream_id?(stream_id)
                raise ProtocolError, "Invalid stream id: #{stream_id}"
        end
        
        create_stream(stream_id, &block)
end
client_stream_id?(id) click to toggle source
# File lib/protocol/http2/connection.rb, line 430
def client_stream_id?(id)
        id.odd?
end
close(error = nil) click to toggle source

Close the underlying framer and all streams.

# File lib/protocol/http2/connection.rb, line 113
def close(error = nil)
        # The underlying socket may already be closed by this point.
        @streams.each_value{|stream| stream.close(error)}
        @streams.clear
        
        if @framer
                @framer.close
                @framer = nil
        end
end
close!() click to toggle source

Transition the connection into the closed state.

# File lib/protocol/http2/connection.rb, line 194
def close!
        @state = :closed
        
        return self
end
closed?() click to toggle source

Whether the connection is effectively or actually closed.

# File lib/protocol/http2/connection.rb, line 103
def closed?
        @state == :closed || @framer.nil?
end
closed_stream_id?(id) click to toggle source
# File lib/protocol/http2/connection.rb, line 438
def closed_stream_id?(id)
        if id.zero?
                # The connection "stream id" can never be closed:
                false
        elsif id.even?
                # Server-initiated streams are even.
                if @local_stream_id.even?
                        id < @local_stream_id
                else
                        id <= @remote_stream_id
                end
        elsif id.odd?
                # Client-initiated streams are odd.
                if @local_stream_id.odd?
                        id < @local_stream_id
                else
                        id <= @remote_stream_id
                end
        end
end
consume_window(size = self.available_size) click to toggle source

Traverse active streams in order of priority and allow them to consume the available flow-control window. @param amount [Integer] the amount of data to write. Defaults to the current window capacity.

# File lib/protocol/http2/connection.rb, line 473
def consume_window(size = self.available_size)
        # Return if there is no window to consume:
        return unless size > 0
        
        # Console.logger.debug(self) do |buffer|
        #   @dependencies.each do |id, dependency|
        #           buffer.puts "- #{dependency}"
        #   end
        #
        #   buffer.puts
        #
        #   @dependency.print_hierarchy(buffer)
        # end
        
        @dependency.consume_window(size)
end
create_push_promise_stream(&block) click to toggle source
# File lib/protocol/http2/connection.rb, line 362
def create_push_promise_stream(&block)
        create_stream(&block)
end
create_stream(id = next_stream_id) { |self, id| ... } click to toggle source

Create a stream, defaults to an outgoing stream. On the client side, we create requests. @return [Stream] the created stream.

# File lib/protocol/http2/connection.rb, line 354
def create_stream(id = next_stream_id, &block)
        if block_given?
                return yield(self, id)
        else
                return Stream.create(self, id)
        end
end
decode_headers(data) click to toggle source
# File lib/protocol/http2/connection.rb, line 128
def decode_headers(data)
        HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode
end
delete(id) click to toggle source
# File lib/protocol/http2/connection.rb, line 107
def delete(id)
        @streams.delete(id)
        @dependencies[id]&.delete!
end
encode_headers(headers, buffer = String.new.b) click to toggle source
# File lib/protocol/http2/connection.rb, line 124
def encode_headers(headers, buffer = String.new.b)
        HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers)
end
id() click to toggle source
# File lib/protocol/http2/connection.rb, line 62
def id
        0
end
idle_stream_id?(id) click to toggle source
# File lib/protocol/http2/connection.rb, line 399
def idle_stream_id?(id)
        if id.even?
                # Server-initiated streams are even.
                if @local_stream_id.even?
                        id >= @local_stream_id
                else
                        id > @remote_stream_id
                end
        elsif id.odd?
                # Client-initiated streams are odd.
                if @local_stream_id.odd?
                        id >= @local_stream_id
                else
                        id > @remote_stream_id
                end
        end
end
ignore_frame?(frame) click to toggle source

6.8. GOAWAY There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client. Once sent, the sender will ignore frames sent on streams initiated by the receiver if the stream has an identifier higher than the included last stream identifier. Receivers of a GOAWAY frame MUST NOT open additional streams on the connection, although a new connection can be established for new streams.

# File lib/protocol/http2/connection.rb, line 150
def ignore_frame?(frame)
        if self.closed?
                # puts "ignore_frame? #{frame.stream_id} -> #{valid_remote_stream_id?(frame.stream_id)} > #{@remote_stream_id}"
                if valid_remote_stream_id?(frame.stream_id)
                        return frame.stream_id > @remote_stream_id
                end
        end
end
maximum_concurrent_streams() click to toggle source
# File lib/protocol/http2/connection.rb, line 79
def maximum_concurrent_streams
        [@local_settings.maximum_concurrent_streams, @remote_settings.maximum_concurrent_streams].min
end
maximum_frame_size() click to toggle source

The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.

# File lib/protocol/http2/connection.rb, line 75
def maximum_frame_size
        @remote_settings.maximum_frame_size
end
next_stream_id() click to toggle source

Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.

# File lib/protocol/http2/connection.rb, line 133
def next_stream_id
        id = @local_stream_id
        
        @local_stream_id += 2
        
        return id
end
open!() click to toggle source
# File lib/protocol/http2/connection.rb, line 274
def open!
        @state = :open
        
        return self
end
process_settings(frame) click to toggle source

In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.

@return [Boolean] whether the frame was an acknowledgement

# File lib/protocol/http2/connection.rb, line 251
def process_settings(frame)
        if frame.acknowledgement?
                # The remote end has confirmed the settings have been received:
                changes = @local_settings.acknowledge
                
                update_local_settings(changes)
                
                return true
        else
                # The remote end is updating the settings, we reply with acknowledgement:
                reply = frame.acknowledge
                
                write_frame(reply)
                
                changes = frame.unpack
                @remote_settings.update(changes)
                
                update_remote_settings(changes)
                
                return false
        end
end
read_frame() { |frame| ... } click to toggle source

Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed.

# File lib/protocol/http2/connection.rb, line 160
def read_frame
        frame = @framer.read_frame(@local_settings.maximum_frame_size)
        # puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})"
        # puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}"
        
        return if ignore_frame?(frame)
        
        yield frame if block_given?
        frame.apply(self)
        
        return frame
rescue GoawayError => error
        # Go directly to jail. Do not pass go, do not collect $200.
        raise
rescue ProtocolError => error
        send_goaway(error.code || PROTOCOL_ERROR, error.message)
        
        raise
rescue HPACK::Error => error
        send_goaway(COMPRESSION_ERROR, error.message)
        
        raise
end
receive_continuation(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 509
def receive_continuation(frame)
        raise ProtocolError, "Received unexpected continuation: #{frame.class}"
end
receive_data(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 318
def receive_data(frame)
        update_local_window(frame)
        
        if stream = @streams[frame.stream_id]
                stream.receive_data(frame)
        elsif closed_stream_id?(frame.stream_id)
                # This can occur if one end sent a stream reset, while the other end was sending a data frame. It's mostly harmless.
        else
                raise ProtocolError, "Cannot receive data for stream id #{frame.stream_id}"
        end
end
receive_frame(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 513
def receive_frame(frame)
        # ignore.
end
receive_goaway(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 210
def receive_goaway(frame)
        # We capture the last stream that was processed.
        @remote_stream_id, error_code, message = frame.unpack
        
        self.close!
        
        if error_code != 0
                # Shut down immediately.
                raise GoawayError.new(message, error_code)
        end
end
receive_headers(frame) click to toggle source

On the server side, starts a new request.

# File lib/protocol/http2/connection.rb, line 367
def receive_headers(frame)
        stream_id = frame.stream_id
        
        if stream_id.zero?
                raise ProtocolError, "Cannot receive headers for stream 0!"
        end
        
        if stream = @streams[stream_id]
                stream.receive_headers(frame)
        else
                if stream_id <= @remote_stream_id
                        raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!"
                end
                
                if @streams.size < self.maximum_concurrent_streams
                        stream = accept_stream(stream_id)
                        @remote_stream_id = stream_id
                        
                        stream.receive_headers(frame)
                else
                        raise ProtocolError, "Exceeded maximum concurrent streams"
                end
        end
end
receive_ping(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 302
def receive_ping(frame)
        if @state != :closed
                if frame.stream_id != 0
                        raise ProtocolError, "Ping received for non-zero stream!"
                end
                
                unless frame.acknowledgement?
                        reply = frame.acknowledge
                        
                        write_frame(reply)
                end
        else
                raise ProtocolError, "Cannot receive ping in state #{@state}"
        end
end
receive_priority(frame) click to toggle source

Sets the priority for an incoming stream.

# File lib/protocol/http2/connection.rb, line 418
def receive_priority(frame)
        if dependency = @dependencies[frame.stream_id]
                dependency.receive_priority(frame)
        elsif idle_stream_id?(frame.stream_id)
                Dependency.create(self, frame.stream_id, frame.unpack)
        end
end
receive_push_promise(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 426
def receive_push_promise(frame)
        raise ProtocolError, "Unable to receive push promise!"
end
receive_reset_stream(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 459
def receive_reset_stream(frame)
        if frame.connection?
                raise ProtocolError, "Cannot reset connection!"
        elsif stream = @streams[frame.stream_id]
                stream.receive_reset_stream(frame)
        elsif closed_stream_id?(frame.stream_id)
                # Ignore.
        else
                raise StreamClosed, "Cannot reset stream #{frame.stream_id}"
        end
end
receive_settings(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 280
def receive_settings(frame)
        if @state == :new
                # We transition to :open when we receive acknowledgement of first settings frame:
                open! if process_settings(frame)
        elsif @state != :closed
                process_settings(frame)
        else
                raise ProtocolError, "Cannot receive settings in state #{@state}"
        end
end
receive_window_update(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 490
def receive_window_update(frame)
        if frame.connection?
                super
                
                self.consume_window
        elsif stream = @streams[frame.stream_id]
                begin
                        stream.receive_window_update(frame)
                rescue ProtocolError => error
                        stream.send_reset_stream(error.code)
                end
        elsif closed_stream_id?(frame.stream_id)
                # Ignore.
        else
                # Receiving any frame other than HEADERS or PRIORITY on a stream in this state (idle) MUST be treated as a connection error of type PROTOCOL_ERROR.
                raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}"
        end
end
send_goaway(error_code = 0, message = "") click to toggle source

Tell the remote end that the connection is being shut down. If the `error_code` is 0, this is a graceful shutdown. The other end of the connection should not make any new streams, but existing streams may be completed.

# File lib/protocol/http2/connection.rb, line 201
def send_goaway(error_code = 0, message = "")
        frame = GoawayFrame.new
        frame.pack @remote_stream_id, error_code, message
        
        write_frame(frame)
ensure
        self.close!
end
send_ping(data) click to toggle source
# File lib/protocol/http2/connection.rb, line 291
def send_ping(data)
        if @state != :closed
                frame = PingFrame.new
                frame.pack data
                
                write_frame(frame)
        else
                raise ProtocolError, "Cannot send ping in state #{@state}"
        end
end
send_priority(stream_id, priority) click to toggle source
# File lib/protocol/http2/connection.rb, line 392
def send_priority(stream_id, priority)
        frame = PriorityFrame.new(stream_id)
        frame.pack(priority)
        
        write_frame(frame)
end
send_settings(changes) click to toggle source
# File lib/protocol/http2/connection.rb, line 184
def send_settings(changes)
        @local_settings.append(changes)
        
        frame = SettingsFrame.new
        frame.pack(changes)
        
        write_frame(frame)
end
server_stream_id?(id) click to toggle source
# File lib/protocol/http2/connection.rb, line 434
def server_stream_id?(id)
        id.even?
end
update_local_settings(changes) click to toggle source
# File lib/protocol/http2/connection.rb, line 230
def update_local_settings(changes)
        capacity = @local_settings.initial_window_size
        
        @streams.each_value do |stream|
                stream.local_window.capacity = capacity
        end
        
        @local_window.desired = capacity
end
update_remote_settings(changes) click to toggle source
# File lib/protocol/http2/connection.rb, line 240
def update_remote_settings(changes)
        capacity = @remote_settings.initial_window_size
        
        @streams.each_value do |stream|
                stream.remote_window.capacity = capacity
        end
end
valid_remote_stream_id?() click to toggle source
# File lib/protocol/http2/connection.rb, line 330
def valid_remote_stream_id?
        false
end
write_frame(frame) click to toggle source
# File lib/protocol/http2/connection.rb, line 222
def write_frame(frame)
        @framer.write_frame(frame)
end
write_frames() { |framer| ... } click to toggle source
# File lib/protocol/http2/connection.rb, line 226
def write_frames
        yield @framer
end