class Tipi::HTTP2StreamHandler
Manages an HTTP 2 stream
Attributes
__next__[RW]
conn[R]
Public Class Methods
new(adapter, stream, conn, first, &block)
click to toggle source
# File lib/tipi/http2_stream.rb, line 12 def initialize(adapter, stream, conn, first, &block) @adapter = adapter @stream = stream @conn = conn @first = first @connection_fiber = Fiber.current @stream_fiber = spin { run(&block) } # Stream callbacks occur on the connection fiber (see HTTP2Adapter#each). # The request handler is run on a separate fiber for each stream, allowing # concurrent handling of incoming requests on the same HTTP/2 connection. # # The different stream adapter APIs suspend the stream fiber, waiting for # stream callbacks to be called. The callbacks, in turn, transfer control to # the stream fiber, effectively causing the return of the adapter API calls. # # Note: the request handler is run once headers are received. Reading the # request body, if present, is at the discretion of the request handler. # This mirrors the behaviour of the HTTP/1 adapter. stream.on(:headers, &method(:on_headers)) stream.on(:data, &method(:on_data)) stream.on(:half_close, &method(:on_half_close)) end
Public Instance Methods
complete?(request)
click to toggle source
# File lib/tipi/http2_stream.rb, line 112 def complete?(request) @complete end
finish(request)
click to toggle source
# File lib/tipi/http2_stream.rb, line 179 def finish(request) if @headers_sent @stream.close else headers[':status'] ||= Qeweney::Status::NO_CONTENT with_transfer_count(request) do @stream.headers(transform_headers(headers), end_stream: true) end end rescue HTTP2::Error::StreamClosed # ignore end
get_body(request)
click to toggle source
# File lib/tipi/http2_stream.rb, line 97 def get_body(request) @buffered_chunks ||= [] return @buffered_chunks.join if @complete while !@complete begin @get_body_chunk_fiber = Fiber.current suspend ensure @get_body_chunk_fiber = nil end end @buffered_chunks.join end
get_body_chunk(request, buffered_only = false)
click to toggle source
# File lib/tipi/http2_stream.rb, line 83 def get_body_chunk(request, buffered_only = false) @buffered_chunks ||= [] return @buffered_chunks.shift unless @buffered_chunks.empty? return nil if @complete begin @get_body_chunk_fiber = Fiber.current suspend ensure @get_body_chunk_fiber = nil end @buffered_chunks.shift end
on_data(data)
click to toggle source
# File lib/tipi/http2_stream.rb, line 60 def on_data(data) data = data.to_s # chunks might be wrapped in a HTTP2::Buffer (@buffered_chunks ||= []) << data @get_body_chunk_fiber&.schedule end
on_half_close()
click to toggle source
# File lib/tipi/http2_stream.rb, line 67 def on_half_close @get_body_chunk_fiber&.schedule @complete = true end
on_headers(headers)
click to toggle source
# File lib/tipi/http2_stream.rb, line 49 def on_headers(headers) @request = Qeweney::Request.new(headers.to_h, self) @request.rx_incr(@adapter.get_rx_count) @request.tx_incr(@adapter.get_tx_count) if @first @request.headers[':first'] = true @first = false end @stream_fiber << @request end
protocol()
click to toggle source
# File lib/tipi/http2_stream.rb, line 72 def protocol 'h2' end
respond(request, chunk, headers)
click to toggle source
response API
# File lib/tipi/http2_stream.rb, line 117 def respond(request, chunk, headers) headers[':status'] ||= Qeweney::Status::OK headers[':status'] = headers[':status'].to_s with_transfer_count(request) do @stream.headers(transform_headers(headers)) @headers_sent = true @stream.data(chunk || '') end rescue HTTP2::Error::StreamClosed # ignore end
respond_from_io(request, io, headers, chunk_size = 2**16)
click to toggle source
# File lib/tipi/http2_stream.rb, line 129 def respond_from_io(request, io, headers, chunk_size = 2**16) headers[':status'] ||= Qeweney::Status::OK headers[':status'] = headers[':status'].to_s with_transfer_count(request) do @stream.headers(transform_headers(headers)) @headers_sent = true while (chunk = io.read(chunk_size)) @stream.data(chunk) end end rescue HTTP2::Error::StreamClosed # ignore end
run(&block)
click to toggle source
# File lib/tipi/http2_stream.rb, line 36 def run(&block) request = receive error = nil block.(request) @connection_fiber.schedule rescue Polyphony::BaseException raise rescue Exception => e error = e ensure @connection_fiber.schedule error end
send_chunk(request, chunk, done: false)
click to toggle source
# File lib/tipi/http2_stream.rb, line 165 def send_chunk(request, chunk, done: false) send_headers({}, false) unless @headers_sent if chunk with_transfer_count(request) do @stream.data(chunk, end_stream: done) end elsif done @stream.close end rescue HTTP2::Error::StreamClosed # ignore end
send_headers(request, headers, empty_response: false)
click to toggle source
# File lib/tipi/http2_stream.rb, line 153 def send_headers(request, headers, empty_response: false) return if @headers_sent headers[':status'] ||= (empty_response ? Qeweney::Status::NO_CONTENT : Qeweney::Status::OK).to_s with_transfer_count(request) do @stream.headers(transform_headers(headers), end_stream: false) end @headers_sent = true rescue HTTP2::Error::StreamClosed # ignore end
stop()
click to toggle source
# File lib/tipi/http2_stream.rb, line 192 def stop return if @complete @stream.close @stream_fiber.schedule(Polyphony::MoveOn.new) end
transform_headers(headers)
click to toggle source
# File lib/tipi/http2_stream.rb, line 143 def transform_headers(headers) headers.each_with_object([]) do |(k, v), a| if v.is_a?(Array) v.each { |vv| a << [k, vv.to_s] } else a << [k, v.to_s] end end end
with_transfer_count(request) { || ... }
click to toggle source
# File lib/tipi/http2_stream.rb, line 76 def with_transfer_count(request) @adapter.set_request_for_transfer_count(request) yield ensure @adapter.unset_request_for_transfer_count(request) end