Asyncio Example ServerΒΆ
This example is a basic HTTP/2 server written using asyncio, using some functionality that was introduced in Python 3.5. This server represents basically just the same JSON-headers-returning server that was built in the Getting Started: Writing Your Own HTTP/2 Server document.
This example demonstrates some basic asyncio techniques.
1# -*- coding: utf-8 -*-
2"""
3asyncio-server.py
4~~~~~~~~~~~~~~~~~
5
6A fully-functional HTTP/2 server using asyncio. Requires Python 3.5+.
7
8This example demonstrates handling requests with bodies, as well as handling
9those without. In particular, it demonstrates the fact that DataReceived may
10be called multiple times, and that applications must handle that possibility.
11"""
12import asyncio
13import io
14import json
15import ssl
16import collections
17from typing import List, Tuple
18
19from h2.config import H2Configuration
20from h2.connection import H2Connection
21from h2.events import (
22 ConnectionTerminated, DataReceived, RemoteSettingsChanged,
23 RequestReceived, StreamEnded, StreamReset, WindowUpdated
24)
25from h2.errors import ErrorCodes
26from h2.exceptions import ProtocolError, StreamClosedError
27from h2.settings import SettingCodes
28
29
30RequestData = collections.namedtuple('RequestData', ['headers', 'data'])
31
32
33class H2Protocol(asyncio.Protocol):
34 def __init__(self):
35 config = H2Configuration(client_side=False, header_encoding='utf-8')
36 self.conn = H2Connection(config=config)
37 self.transport = None
38 self.stream_data = {}
39 self.flow_control_futures = {}
40
41 def connection_made(self, transport: asyncio.Transport):
42 self.transport = transport
43 self.conn.initiate_connection()
44 self.transport.write(self.conn.data_to_send())
45
46 def connection_lost(self, exc):
47 for future in self.flow_control_futures.values():
48 future.cancel()
49 self.flow_control_futures = {}
50
51 def data_received(self, data: bytes):
52 try:
53 events = self.conn.receive_data(data)
54 except ProtocolError as e:
55 self.transport.write(self.conn.data_to_send())
56 self.transport.close()
57 else:
58 self.transport.write(self.conn.data_to_send())
59 for event in events:
60 if isinstance(event, RequestReceived):
61 self.request_received(event.headers, event.stream_id)
62 elif isinstance(event, DataReceived):
63 self.receive_data(
64 event.data, event.flow_controlled_length, event.stream_id
65 )
66 elif isinstance(event, StreamEnded):
67 self.stream_complete(event.stream_id)
68 elif isinstance(event, ConnectionTerminated):
69 self.transport.close()
70 elif isinstance(event, StreamReset):
71 self.stream_reset(event.stream_id)
72 elif isinstance(event, WindowUpdated):
73 self.window_updated(event.stream_id, event.delta)
74 elif isinstance(event, RemoteSettingsChanged):
75 if SettingCodes.INITIAL_WINDOW_SIZE in event.changed_settings:
76 self.window_updated(None, 0)
77
78 self.transport.write(self.conn.data_to_send())
79
80 def request_received(self, headers: List[Tuple[str, str]], stream_id: int):
81 headers = collections.OrderedDict(headers)
82 method = headers[':method']
83
84 # Store off the request data.
85 request_data = RequestData(headers, io.BytesIO())
86 self.stream_data[stream_id] = request_data
87
88 def stream_complete(self, stream_id: int):
89 """
90 When a stream is complete, we can send our response.
91 """
92 try:
93 request_data = self.stream_data[stream_id]
94 except KeyError:
95 # Just return, we probably 405'd this already
96 return
97
98 headers = request_data.headers
99 body = request_data.data.getvalue().decode('utf-8')
100
101 data = json.dumps(
102 {"headers": headers, "body": body}, indent=4
103 ).encode("utf8")
104
105 response_headers = (
106 (':status', '200'),
107 ('content-type', 'application/json'),
108 ('content-length', str(len(data))),
109 ('server', 'asyncio-h2'),
110 )
111 self.conn.send_headers(stream_id, response_headers)
112 asyncio.ensure_future(self.send_data(data, stream_id))
113
114 def receive_data(self, data: bytes, flow_controlled_length: int, stream_id: int):
115 """
116 We've received some data on a stream. If that stream is one we're
117 expecting data on, save it off (and account for the received amount of
118 data in flow control so that the client can send more data).
119 Otherwise, reset the stream.
120 """
121 try:
122 stream_data = self.stream_data[stream_id]
123 except KeyError:
124 self.conn.reset_stream(
125 stream_id, error_code=ErrorCodes.PROTOCOL_ERROR
126 )
127 else:
128 stream_data.data.write(data)
129 self.conn.acknowledge_received_data(flow_controlled_length, stream_id)
130
131 def stream_reset(self, stream_id):
132 """
133 A stream reset was sent. Stop sending data.
134 """
135 if stream_id in self.flow_control_futures:
136 future = self.flow_control_futures.pop(stream_id)
137 future.cancel()
138
139 async def send_data(self, data, stream_id):
140 """
141 Send data according to the flow control rules.
142 """
143 while data:
144 while self.conn.local_flow_control_window(stream_id) < 1:
145 try:
146 await self.wait_for_flow_control(stream_id)
147 except asyncio.CancelledError:
148 return
149
150 chunk_size = min(
151 self.conn.local_flow_control_window(stream_id),
152 len(data),
153 self.conn.max_outbound_frame_size,
154 )
155
156 try:
157 self.conn.send_data(
158 stream_id,
159 data[:chunk_size],
160 end_stream=(chunk_size == len(data))
161 )
162 except (StreamClosedError, ProtocolError):
163 # The stream got closed and we didn't get told. We're done
164 # here.
165 break
166
167 self.transport.write(self.conn.data_to_send())
168 data = data[chunk_size:]
169
170 async def wait_for_flow_control(self, stream_id):
171 """
172 Waits for a Future that fires when the flow control window is opened.
173 """
174 f = asyncio.Future()
175 self.flow_control_futures[stream_id] = f
176 await f
177
178 def window_updated(self, stream_id, delta):
179 """
180 A window update frame was received. Unblock some number of flow control
181 Futures.
182 """
183 if stream_id and stream_id in self.flow_control_futures:
184 f = self.flow_control_futures.pop(stream_id)
185 f.set_result(delta)
186 elif not stream_id:
187 for f in self.flow_control_futures.values():
188 f.set_result(delta)
189
190 self.flow_control_futures = {}
191
192
193ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
194ssl_context.options |= (
195 ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_COMPRESSION
196)
197ssl_context.load_cert_chain(certfile="cert.crt", keyfile="cert.key")
198ssl_context.set_alpn_protocols(["h2"])
199
200loop = asyncio.get_event_loop()
201# Each client connection will create a new protocol instance
202coro = loop.create_server(H2Protocol, '127.0.0.1', 8443, ssl=ssl_context)
203server = loop.run_until_complete(coro)
204
205# Serve requests until Ctrl+C is pressed
206print('Serving on {}'.format(server.sockets[0].getsockname()))
207try:
208 loop.run_forever()
209except KeyboardInterrupt:
210 pass
211finally:
212 # Close the server
213 server.close()
214 loop.run_until_complete(server.wait_closed())
215 loop.close()
You can use cert.crt
and cert.key
files provided within the repository
or generate your own certificates using OpenSSL:
$ openssl req -x509 -newkey rsa:2048 -keyout cert.key -out cert.crt -days 365 -nodes