class Protocol::HTTP2::Connection
Attributes
Current settings value for local and peer
Our window for receiving data. When we receive data, it reduces this window. If the window gets too small, we must send a window update.
The highest stream_id that has been successfully accepted by this connection.
Our window for sending data. When we send data, it reduces this window.
Connection
state (:new, :open, :closed).
Public Class Methods
# File lib/protocol/http2/connection.rb, line 32 def initialize(framer, local_stream_id) super() @state = :new # Hash(Integer, Stream) @streams = {} # Hash(Integer, Dependency) @dependency = Dependency.new(self, 0) @dependencies = {0 => @dependency} @framer = framer # The next stream id to use: @local_stream_id = local_stream_id # The biggest remote stream id seen thus far: @remote_stream_id = 0 @local_settings = PendingSettings.new @remote_settings = Settings.new @decoder = HPACK::Context.new @encoder = HPACK::Context.new @local_window = LocalWindow.new @remote_window = Window.new end
Public Instance Methods
# File lib/protocol/http2/connection.rb, line 66 def [] id if id.zero? self else @streams[id] end end
Accept an incoming push promise from the other side of the connection. On the client side, we accept push promise streams. On the server side, existing streams create push promise streams.
# File lib/protocol/http2/connection.rb, line 347 def accept_push_promise_stream(stream_id, &block) accept_stream(stream_id, &block) end
Accept an incoming stream from the other side of the connnection. On the server side, we accept requests.
# File lib/protocol/http2/connection.rb, line 336 def accept_stream(stream_id, &block) unless valid_remote_stream_id?(stream_id) raise ProtocolError, "Invalid stream id: #{stream_id}" end create_stream(stream_id, &block) end
# File lib/protocol/http2/connection.rb, line 430 def client_stream_id?(id) id.odd? end
Close the underlying framer and all streams.
# File lib/protocol/http2/connection.rb, line 113 def close(error = nil) # The underlying socket may already be closed by this point. @streams.each_value{|stream| stream.close(error)} @streams.clear if @framer @framer.close @framer = nil end end
Transition the connection into the closed state.
# File lib/protocol/http2/connection.rb, line 194 def close! @state = :closed return self end
Whether the connection is effectively or actually closed.
# File lib/protocol/http2/connection.rb, line 103 def closed? @state == :closed || @framer.nil? end
# File lib/protocol/http2/connection.rb, line 438 def closed_stream_id?(id) if id.zero? # The connection "stream id" can never be closed: false elsif id.even? # Server-initiated streams are even. if @local_stream_id.even? id < @local_stream_id else id <= @remote_stream_id end elsif id.odd? # Client-initiated streams are odd. if @local_stream_id.odd? id < @local_stream_id else id <= @remote_stream_id end end end
Traverse active streams in order of priority and allow them to consume the available flow-control window. @param amount [Integer] the amount of data to write. Defaults to the current window capacity.
# File lib/protocol/http2/connection.rb, line 473 def consume_window(size = self.available_size) # Return if there is no window to consume: return unless size > 0 # Console.logger.debug(self) do |buffer| # @dependencies.each do |id, dependency| # buffer.puts "- #{dependency}" # end # # buffer.puts # # @dependency.print_hierarchy(buffer) # end @dependency.consume_window(size) end
# File lib/protocol/http2/connection.rb, line 362 def create_push_promise_stream(&block) create_stream(&block) end
Create a stream, defaults to an outgoing stream. On the client side, we create requests. @return [Stream] the created stream.
# File lib/protocol/http2/connection.rb, line 354 def create_stream(id = next_stream_id, &block) if block_given? return yield(self, id) else return Stream.create(self, id) end end
# File lib/protocol/http2/connection.rb, line 128 def decode_headers(data) HPACK::Decompressor.new(data, @decoder, table_size_limit: @local_settings.header_table_size).decode end
# File lib/protocol/http2/connection.rb, line 107 def delete(id) @streams.delete(id) @dependencies[id]&.delete! end
# File lib/protocol/http2/connection.rb, line 124 def encode_headers(headers, buffer = String.new.b) HPACK::Compressor.new(buffer, @encoder, table_size_limit: @remote_settings.header_table_size).encode(headers) end
# File lib/protocol/http2/connection.rb, line 62 def id 0 end
# File lib/protocol/http2/connection.rb, line 399 def idle_stream_id?(id) if id.even? # Server-initiated streams are even. if @local_stream_id.even? id >= @local_stream_id else id > @remote_stream_id end elsif id.odd? # Client-initiated streams are odd. if @local_stream_id.odd? id >= @local_stream_id else id > @remote_stream_id end end end
6.8. GOAWAY There is an inherent race condition between an endpoint starting new streams and the remote sending a GOAWAY frame. To deal with this case, the GOAWAY contains the stream identifier of the last peer-initiated stream that was or might be processed on the sending endpoint in this connection. For instance, if the server sends a GOAWAY frame, the identified stream is the highest-numbered stream initiated by the client. Once sent, the sender will ignore frames sent on streams initiated by the receiver if the stream has an identifier higher than the included last stream identifier. Receivers of a GOAWAY frame MUST NOT open additional streams on the connection, although a new connection can be established for new streams.
# File lib/protocol/http2/connection.rb, line 150 def ignore_frame?(frame) if self.closed? # puts "ignore_frame? #{frame.stream_id} -> #{valid_remote_stream_id?(frame.stream_id)} > #{@remote_stream_id}" if valid_remote_stream_id?(frame.stream_id) return frame.stream_id > @remote_stream_id end end end
# File lib/protocol/http2/connection.rb, line 79 def maximum_concurrent_streams [@local_settings.maximum_concurrent_streams, @remote_settings.maximum_concurrent_streams].min end
The size of a frame payload is limited by the maximum size that a receiver advertises in the SETTINGS_MAX_FRAME_SIZE setting.
# File lib/protocol/http2/connection.rb, line 75 def maximum_frame_size @remote_settings.maximum_frame_size end
Streams are identified with an unsigned 31-bit integer. Streams initiated by a client MUST use odd-numbered stream identifiers; those initiated by the server MUST use even-numbered stream identifiers. A stream identifier of zero (0x0) is used for connection control messages; the stream identifier of zero cannot be used to establish a new stream.
# File lib/protocol/http2/connection.rb, line 133 def next_stream_id id = @local_stream_id @local_stream_id += 2 return id end
# File lib/protocol/http2/connection.rb, line 274 def open! @state = :open return self end
In addition to changing the flow-control window for streams that are not yet active, a SETTINGS frame can alter the initial flow-control window size for streams with active flow-control windows (that is, streams in the “open” or “half-closed (remote)” state). When the value of SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST adjust the size of all stream flow-control windows that it maintains by the difference between the new value and the old value.
@return [Boolean] whether the frame was an acknowledgement
# File lib/protocol/http2/connection.rb, line 251 def process_settings(frame) if frame.acknowledgement? # The remote end has confirmed the settings have been received: changes = @local_settings.acknowledge update_local_settings(changes) return true else # The remote end is updating the settings, we reply with acknowledgement: reply = frame.acknowledge write_frame(reply) changes = frame.unpack @remote_settings.update(changes) update_remote_settings(changes) return false end end
Reads one frame from the network and processes. Processing the frame updates the state of the connection and related streams. If the frame triggers an error, e.g. a protocol error, the connection will typically emit a goaway frame and re-raise the exception. You should continue processing frames until the underlying connection is closed.
# File lib/protocol/http2/connection.rb, line 160 def read_frame frame = @framer.read_frame(@local_settings.maximum_frame_size) # puts "#{self.class} #{@state} read_frame: class=#{frame.class} stream_id=#{frame.stream_id} flags=#{frame.flags} length=#{frame.length} (remote_stream_id=#{@remote_stream_id})" # puts "Windows: local_window=#{@local_window.inspect}; remote_window=#{@remote_window.inspect}" return if ignore_frame?(frame) yield frame if block_given? frame.apply(self) return frame rescue GoawayError => error # Go directly to jail. Do not pass go, do not collect $200. raise rescue ProtocolError => error send_goaway(error.code || PROTOCOL_ERROR, error.message) raise rescue HPACK::Error => error send_goaway(COMPRESSION_ERROR, error.message) raise end
# File lib/protocol/http2/connection.rb, line 509 def receive_continuation(frame) raise ProtocolError, "Received unexpected continuation: #{frame.class}" end
# File lib/protocol/http2/connection.rb, line 318 def receive_data(frame) update_local_window(frame) if stream = @streams[frame.stream_id] stream.receive_data(frame) elsif closed_stream_id?(frame.stream_id) # This can occur if one end sent a stream reset, while the other end was sending a data frame. It's mostly harmless. else raise ProtocolError, "Cannot receive data for stream id #{frame.stream_id}" end end
# File lib/protocol/http2/connection.rb, line 513 def receive_frame(frame) # ignore. end
# File lib/protocol/http2/connection.rb, line 210 def receive_goaway(frame) # We capture the last stream that was processed. @remote_stream_id, error_code, message = frame.unpack self.close! if error_code != 0 # Shut down immediately. raise GoawayError.new(message, error_code) end end
On the server side, starts a new request.
# File lib/protocol/http2/connection.rb, line 367 def receive_headers(frame) stream_id = frame.stream_id if stream_id.zero? raise ProtocolError, "Cannot receive headers for stream 0!" end if stream = @streams[stream_id] stream.receive_headers(frame) else if stream_id <= @remote_stream_id raise ProtocolError, "Invalid stream id: #{stream_id} <= #{@remote_stream_id}!" end if @streams.size < self.maximum_concurrent_streams stream = accept_stream(stream_id) @remote_stream_id = stream_id stream.receive_headers(frame) else raise ProtocolError, "Exceeded maximum concurrent streams" end end end
# File lib/protocol/http2/connection.rb, line 302 def receive_ping(frame) if @state != :closed if frame.stream_id != 0 raise ProtocolError, "Ping received for non-zero stream!" end unless frame.acknowledgement? reply = frame.acknowledge write_frame(reply) end else raise ProtocolError, "Cannot receive ping in state #{@state}" end end
Sets the priority for an incoming stream.
# File lib/protocol/http2/connection.rb, line 418 def receive_priority(frame) if dependency = @dependencies[frame.stream_id] dependency.receive_priority(frame) elsif idle_stream_id?(frame.stream_id) Dependency.create(self, frame.stream_id, frame.unpack) end end
# File lib/protocol/http2/connection.rb, line 426 def receive_push_promise(frame) raise ProtocolError, "Unable to receive push promise!" end
# File lib/protocol/http2/connection.rb, line 459 def receive_reset_stream(frame) if frame.connection? raise ProtocolError, "Cannot reset connection!" elsif stream = @streams[frame.stream_id] stream.receive_reset_stream(frame) elsif closed_stream_id?(frame.stream_id) # Ignore. else raise StreamClosed, "Cannot reset stream #{frame.stream_id}" end end
# File lib/protocol/http2/connection.rb, line 280 def receive_settings(frame) if @state == :new # We transition to :open when we receive acknowledgement of first settings frame: open! if process_settings(frame) elsif @state != :closed process_settings(frame) else raise ProtocolError, "Cannot receive settings in state #{@state}" end end
Protocol::HTTP2::FlowControlled#receive_window_update
# File lib/protocol/http2/connection.rb, line 490 def receive_window_update(frame) if frame.connection? super self.consume_window elsif stream = @streams[frame.stream_id] begin stream.receive_window_update(frame) rescue ProtocolError => error stream.send_reset_stream(error.code) end elsif closed_stream_id?(frame.stream_id) # Ignore. else # Receiving any frame other than HEADERS or PRIORITY on a stream in this state (idle) MUST be treated as a connection error of type PROTOCOL_ERROR. raise ProtocolError, "Cannot update window of idle stream #{frame.stream_id}" end end
Tell the remote end that the connection is being shut down. If the `error_code` is 0, this is a graceful shutdown. The other end of the connection should not make any new streams, but existing streams may be completed.
# File lib/protocol/http2/connection.rb, line 201 def send_goaway(error_code = 0, message = "") frame = GoawayFrame.new frame.pack @remote_stream_id, error_code, message write_frame(frame) ensure self.close! end
# File lib/protocol/http2/connection.rb, line 291 def send_ping(data) if @state != :closed frame = PingFrame.new frame.pack data write_frame(frame) else raise ProtocolError, "Cannot send ping in state #{@state}" end end
# File lib/protocol/http2/connection.rb, line 392 def send_priority(stream_id, priority) frame = PriorityFrame.new(stream_id) frame.pack(priority) write_frame(frame) end
# File lib/protocol/http2/connection.rb, line 184 def send_settings(changes) @local_settings.append(changes) frame = SettingsFrame.new frame.pack(changes) write_frame(frame) end
# File lib/protocol/http2/connection.rb, line 434 def server_stream_id?(id) id.even? end
# File lib/protocol/http2/connection.rb, line 230 def update_local_settings(changes) capacity = @local_settings.initial_window_size @streams.each_value do |stream| stream.local_window.capacity = capacity end @local_window.desired = capacity end
# File lib/protocol/http2/connection.rb, line 240 def update_remote_settings(changes) capacity = @remote_settings.initial_window_size @streams.each_value do |stream| stream.remote_window.capacity = capacity end end
# File lib/protocol/http2/connection.rb, line 330 def valid_remote_stream_id? false end
# File lib/protocol/http2/connection.rb, line 222 def write_frame(frame) @framer.write_frame(frame) end
# File lib/protocol/http2/connection.rb, line 226 def write_frames yield @framer end