Asyncio Example ServerΒΆ

This example is a basic HTTP/2 server written using asyncio, using some functionality that was introduced in Python 3.5. This server represents basically just the same JSON-headers-returning server that was built in the Getting Started: Writing Your Own HTTP/2 Server document.

This example demonstrates some basic asyncio techniques.

  1# -*- coding: utf-8 -*-
  2"""
  3asyncio-server.py
  4~~~~~~~~~~~~~~~~~
  5
  6A fully-functional HTTP/2 server using asyncio. Requires Python 3.5+.
  7
  8This example demonstrates handling requests with bodies, as well as handling
  9those without. In particular, it demonstrates the fact that DataReceived may
 10be called multiple times, and that applications must handle that possibility.
 11"""
 12import asyncio
 13import io
 14import json
 15import ssl
 16import collections
 17from typing import List, Tuple
 18
 19from h2.config import H2Configuration
 20from h2.connection import H2Connection
 21from h2.events import (
 22    ConnectionTerminated, DataReceived, RemoteSettingsChanged,
 23    RequestReceived, StreamEnded, StreamReset, WindowUpdated
 24)
 25from h2.errors import ErrorCodes
 26from h2.exceptions import ProtocolError, StreamClosedError
 27from h2.settings import SettingCodes
 28
 29
 30RequestData = collections.namedtuple('RequestData', ['headers', 'data'])
 31
 32
 33class H2Protocol(asyncio.Protocol):
 34    def __init__(self):
 35        config = H2Configuration(client_side=False, header_encoding='utf-8')
 36        self.conn = H2Connection(config=config)
 37        self.transport = None
 38        self.stream_data = {}
 39        self.flow_control_futures = {}
 40
 41    def connection_made(self, transport: asyncio.Transport):
 42        self.transport = transport
 43        self.conn.initiate_connection()
 44        self.transport.write(self.conn.data_to_send())
 45
 46    def connection_lost(self, exc):
 47        for future in self.flow_control_futures.values():
 48            future.cancel()
 49        self.flow_control_futures = {}
 50
 51    def data_received(self, data: bytes):
 52        try:
 53            events = self.conn.receive_data(data)
 54        except ProtocolError as e:
 55            self.transport.write(self.conn.data_to_send())
 56            self.transport.close()
 57        else:
 58            self.transport.write(self.conn.data_to_send())
 59            for event in events:
 60                if isinstance(event, RequestReceived):
 61                    self.request_received(event.headers, event.stream_id)
 62                elif isinstance(event, DataReceived):
 63                    self.receive_data(
 64                        event.data, event.flow_controlled_length, event.stream_id
 65                    )
 66                elif isinstance(event, StreamEnded):
 67                    self.stream_complete(event.stream_id)
 68                elif isinstance(event, ConnectionTerminated):
 69                    self.transport.close()
 70                elif isinstance(event, StreamReset):
 71                    self.stream_reset(event.stream_id)
 72                elif isinstance(event, WindowUpdated):
 73                    self.window_updated(event.stream_id, event.delta)
 74                elif isinstance(event, RemoteSettingsChanged):
 75                    if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
 76                        self.window_updated(None, 0)
 77
 78                self.transport.write(self.conn.data_to_send())
 79
 80    def request_received(self, headers: List[Tuple[str, str]], stream_id: int):
 81        headers = collections.OrderedDict(headers)
 82        method = headers[':method']
 83
 84        # Store off the request data.
 85        request_data = RequestData(headers, io.BytesIO())
 86        self.stream_data[stream_id] = request_data
 87
 88    def stream_complete(self, stream_id: int):
 89        """
 90        When a stream is complete, we can send our response.
 91        """
 92        try:
 93            request_data = self.stream_data[stream_id]
 94        except KeyError:
 95            # Just return, we probably 405'd this already
 96            return
 97
 98        headers = request_data.headers
 99        body = request_data.data.getvalue().decode('utf-8')
100
101        data = json.dumps(
102            {"headers": headers, "body": body}, indent=4
103        ).encode("utf8")
104
105        response_headers = (
106            (':status', '200'),
107            ('content-type', 'application/json'),
108            ('content-length', str(len(data))),
109            ('server', 'asyncio-h2'),
110        )
111        self.conn.send_headers(stream_id, response_headers)
112        asyncio.ensure_future(self.send_data(data, stream_id))
113
114    def receive_data(self, data: bytes, flow_controlled_length: int, stream_id: int):
115        """
116        We've received some data on a stream. If that stream is one we're
117        expecting data on, save it off (and account for the received amount of
118        data in flow control so that the client can send more data).
119        Otherwise, reset the stream.
120        """
121        try:
122            stream_data = self.stream_data[stream_id]
123        except KeyError:
124            self.conn.reset_stream(
125                stream_id, error_code=ErrorCodes.PROTOCOL_ERROR
126            )
127        else:
128            stream_data.data.write(data)
129            self.conn.acknowledge_received_data(flow_controlled_length, stream_id)
130
131    def stream_reset(self, stream_id):
132        """
133        A stream reset was sent. Stop sending data.
134        """
135        if stream_id in self.flow_control_futures:
136            future = self.flow_control_futures.pop(stream_id)
137            future.cancel()
138
139    async def send_data(self, data, stream_id):
140        """
141        Send data according to the flow control rules.
142        """
143        while data:
144            while self.conn.local_flow_control_window(stream_id) < 1:
145                try:
146                    await self.wait_for_flow_control(stream_id)
147                except asyncio.CancelledError:
148                    return
149
150            chunk_size = min(
151                self.conn.local_flow_control_window(stream_id),
152                len(data),
153                self.conn.max_outbound_frame_size,
154            )
155
156            try:
157                self.conn.send_data(
158                    stream_id,
159                    data[:chunk_size],
160                    end_stream=(chunk_size == len(data))
161                )
162            except (StreamClosedError, ProtocolError):
163                # The stream got closed and we didn't get told. We're done
164                # here.
165                break
166
167            self.transport.write(self.conn.data_to_send())
168            data = data[chunk_size:]
169
170    async def wait_for_flow_control(self, stream_id):
171        """
172        Waits for a Future that fires when the flow control window is opened.
173        """
174        f = asyncio.Future()
175        self.flow_control_futures[stream_id] = f
176        await f
177
178    def window_updated(self, stream_id, delta):
179        """
180        A window update frame was received. Unblock some number of flow control
181        Futures.
182        """
183        if stream_id and stream_id in self.flow_control_futures:
184            f = self.flow_control_futures.pop(stream_id)
185            f.set_result(delta)
186        elif not stream_id:
187            for f in self.flow_control_futures.values():
188                f.set_result(delta)
189
190            self.flow_control_futures = {}
191
192
193ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
194ssl_context.options |= (
195    ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
196)
197ssl_context.load_cert_chain(certfile="cert.crt", keyfile="cert.key")
198ssl_context.set_alpn_protocols(["h2"])
199
200loop = asyncio.get_event_loop()
201# Each client connection will create a new protocol instance
202coro = loop.create_server(H2Protocol, '127.0.0.1', 8443, ssl=ssl_context)
203server = loop.run_until_complete(coro)
204
205# Serve requests until Ctrl+C is pressed
206print('Serving on {}'.format(server.sockets[0].getsockname()))
207try:
208    loop.run_forever()
209except KeyboardInterrupt:
210    pass
211finally:
212    # Close the server
213    server.close()
214    loop.run_until_complete(server.wait_closed())
215    loop.close()

You can use cert.crt and cert.key files provided within the repository or generate your own certificates using OpenSSL:

$ openssl req -x509 -newkey rsa:2048 -keyout cert.key -out cert.crt -days 365 -nodes