class Tipi::HTTP2StreamHandler

Manages an HTTP 2 stream

Attributes

__next__[RW]
conn[R]

Public Class Methods

new(adapter, stream, conn, first, &block) click to toggle source
# File lib/tipi/http2_stream.rb, line 12
def initialize(adapter, stream, conn, first, &block)
  @adapter = adapter
  @stream = stream
  @conn = conn
  @first = first
  @connection_fiber = Fiber.current
  @stream_fiber = spin { run(&block) }

  # Stream callbacks occur on the connection fiber (see HTTP2Adapter#each).
  # The request handler is run on a separate fiber for each stream, allowing
  # concurrent handling of incoming requests on the same HTTP/2 connection.
  #
  # The different stream adapter APIs suspend the stream fiber, waiting for
  # stream callbacks to be called. The callbacks, in turn, transfer control to
  # the stream fiber, effectively causing the return of the adapter API calls.
  #
  # Note: the request handler is run once headers are received. Reading the
  # request body, if present, is at the discretion of the request handler.
  # This mirrors the behaviour of the HTTP/1 adapter.
  stream.on(:headers, &method(:on_headers))
  stream.on(:data, &method(:on_data))
  stream.on(:half_close, &method(:on_half_close))
end

Public Instance Methods

complete?(request) click to toggle source
# File lib/tipi/http2_stream.rb, line 112
def complete?(request)
  @complete
end
finish(request) click to toggle source
# File lib/tipi/http2_stream.rb, line 179
def finish(request)
  if @headers_sent
    @stream.close
  else
    headers[':status'] ||= Qeweney::Status::NO_CONTENT
    with_transfer_count(request) do
      @stream.headers(transform_headers(headers), end_stream: true)
    end
  end
rescue HTTP2::Error::StreamClosed
  # ignore
end
get_body(request) click to toggle source
# File lib/tipi/http2_stream.rb, line 97
def get_body(request)
  @buffered_chunks ||= []
  return @buffered_chunks.join if @complete

  while !@complete
    begin
      @get_body_chunk_fiber = Fiber.current
      suspend
    ensure
      @get_body_chunk_fiber = nil
    end
  end
  @buffered_chunks.join
end
get_body_chunk(request, buffered_only = false) click to toggle source
# File lib/tipi/http2_stream.rb, line 83
def get_body_chunk(request, buffered_only = false)
  @buffered_chunks ||= []
  return @buffered_chunks.shift unless @buffered_chunks.empty?
  return nil if @complete
  
  begin
    @get_body_chunk_fiber = Fiber.current
    suspend
  ensure
    @get_body_chunk_fiber = nil
  end
  @buffered_chunks.shift
end
on_data(data) click to toggle source
# File lib/tipi/http2_stream.rb, line 60
def on_data(data)
  data = data.to_s # chunks might be wrapped in a HTTP2::Buffer
  
  (@buffered_chunks ||= []) << data
  @get_body_chunk_fiber&.schedule
end
on_half_close() click to toggle source
# File lib/tipi/http2_stream.rb, line 67
def on_half_close
  @get_body_chunk_fiber&.schedule
  @complete = true
end
on_headers(headers) click to toggle source
# File lib/tipi/http2_stream.rb, line 49
def on_headers(headers)
  @request = Qeweney::Request.new(headers.to_h, self)
  @request.rx_incr(@adapter.get_rx_count)
  @request.tx_incr(@adapter.get_tx_count)
  if @first
    @request.headers[':first'] = true
    @first = false
  end
  @stream_fiber << @request
end
protocol() click to toggle source
# File lib/tipi/http2_stream.rb, line 72
def protocol
  'h2'
end
respond(request, chunk, headers) click to toggle source

response API

# File lib/tipi/http2_stream.rb, line 117
def respond(request, chunk, headers)
  headers[':status'] ||= Qeweney::Status::OK
  headers[':status'] = headers[':status'].to_s
  with_transfer_count(request) do
    @stream.headers(transform_headers(headers))
    @headers_sent = true
    @stream.data(chunk || '')
  end
rescue HTTP2::Error::StreamClosed
  # ignore
end
respond_from_io(request, io, headers, chunk_size = 2**16) click to toggle source
# File lib/tipi/http2_stream.rb, line 129
def respond_from_io(request, io, headers, chunk_size = 2**16)
  headers[':status'] ||= Qeweney::Status::OK
  headers[':status'] = headers[':status'].to_s
  with_transfer_count(request) do
    @stream.headers(transform_headers(headers))
    @headers_sent = true
    while (chunk = io.read(chunk_size))
      @stream.data(chunk)
    end
  end
rescue HTTP2::Error::StreamClosed
  # ignore
end
run(&block) click to toggle source
# File lib/tipi/http2_stream.rb, line 36
def run(&block)
  request = receive
  error = nil
  block.(request)
  @connection_fiber.schedule
rescue Polyphony::BaseException
  raise
rescue Exception => e
  error = e
ensure
  @connection_fiber.schedule error
end
send_chunk(request, chunk, done: false) click to toggle source
# File lib/tipi/http2_stream.rb, line 165
def send_chunk(request, chunk, done: false)
  send_headers({}, false) unless @headers_sent
  
  if chunk
    with_transfer_count(request) do
      @stream.data(chunk, end_stream: done)
    end
  elsif done
    @stream.close
  end
rescue HTTP2::Error::StreamClosed
  # ignore
end
send_headers(request, headers, empty_response: false) click to toggle source
# File lib/tipi/http2_stream.rb, line 153
def send_headers(request, headers, empty_response: false)
  return if @headers_sent
  
  headers[':status'] ||= (empty_response ? Qeweney::Status::NO_CONTENT : Qeweney::Status::OK).to_s
  with_transfer_count(request) do
    @stream.headers(transform_headers(headers), end_stream: false)
  end
  @headers_sent = true
rescue HTTP2::Error::StreamClosed
  # ignore
end
stop() click to toggle source
# File lib/tipi/http2_stream.rb, line 192
def stop
  return if @complete
  
  @stream.close
  @stream_fiber.schedule(Polyphony::MoveOn.new)
end
transform_headers(headers) click to toggle source
# File lib/tipi/http2_stream.rb, line 143
def transform_headers(headers)
  headers.each_with_object([]) do |(k, v), a|
    if v.is_a?(Array)
      v.each { |vv| a << [k, vv.to_s] }
    else
      a << [k, v.to_s]
    end
  end
end
with_transfer_count(request) { || ... } click to toggle source
# File lib/tipi/http2_stream.rb, line 76
def with_transfer_count(request)
  @adapter.set_request_for_transfer_count(request)
  yield
ensure
  @adapter.unset_request_for_transfer_count(request)
end