Part 10 - Our own ASGI server
(The repository for the ASGI
part of this series is here.)
Let’s prepare another repository to follow along with all the ASGI
code we’re going to write now. We’ll start out with a toplevel ./setup.py
.
#./setup.py
from setuptools import setup, find_packages
setup(
name="asgi",
description="A tutorial implementation of an ASGI server and application.",
version="0.0.1",
packages=find_packages(),
)
Now we’ll create the actual package directory ./asgi/
, add an empty ./asgi/__init__.py
to it and then add the submodule ./asgi/server/
, where we’ll also create an empty ./asgi/server/__init__.py
. Now we’re all prepared to get started.
TCP server
Let’s build again from a very simple, synchronous TCP
echo server.
#./asgi/server/server.py
import socket
def handle_socket(client_socket):
while True:
data = client_socket.recv(1024) # blocking
print(f"Received {data}")
if data == b"":
break
client_socket.send(data) # blocking
client_socket.close()
def serve_forever(host, port):
server_socket = socket.socket()
server_socket.bind((host, port))
server_socket.listen(1)
while True:
client_socket, address = server_socket.accept() # blocking
print(f"Socket established with {address}.")
handle_socket(client_socket)
I’ve highlighted the lines that could be blocking in this program. Before any socket is established, it blocks on waiting for incoming connections. And once a socket is up, it perpetually blocks on waiting for new incoming data and trying to send out data on that socket.
Concurrency using coroutines
As we saw in the synchronous TCP server post this leads to the server only ever being able to communicate with a single client at a time. Back then we solved the problem by handling each socket in a separate thread. This time, let’s look at how we can enable concurrent clients by using asyncio
coroutines. I’m not going to say much more about coroutines or the event loop here, but I can wholeheartedly recommend David Beazley’s fantastic talk “Python Concurrency From the Ground Up”, where he compares different ways of achieving concurrency in python, especially in the context of network programming. In that talk he eventually even implements a tiny coroutine event loop: highly educational.
The necessary changes to enable coroutine-based concurrency in our server are as follows:
#./asgi/server/server.py
import asyncio
import socket
async def handle_socket(client_socket):
loop = asyncio.get_event_loop()
while True:
data = await loop.sock_recv(client_socket, 1024)
print(f"Received {data}")
if data == b"":
break
await loop.sock_sendall(client_socket, data)
client_socket.close()
async def serve_forever(host, port):
server_socket = socket.socket()
server_socket.bind((host, port))
server_socket.listen(1)
server_socket.setblocking(False)
loop = asyncio.get_event_loop()
while True:
client_socket, address = await loop.sock_accept(server_socket)
print(f"Socket established with {address}.")
loop.create_task(handle_socket(client_socket))
The first thing you should notice is that the code looks very similar to the initial synchronous implementation. This is one of the big advantages with concurrency through coroutines: the code almost looks like synchronous code and can be reasoned about in a straightforward way.
The actual changes start with both functions now being async
, i.e. they can be stepped in and out of at await
statements. The server socket is set to non-blocking and all of the interactions with sockets (establishing a new socket, receiving data, sending data) are replaced with specific non-blocking asyncio loop
functions. Those can be awaited, i.e. at those points the event loop gets the chance to switch to running some other piece of code. Each handle_socket
for a specific client_socket
is run in a separate asyncio loop
task. The entire program, even once 100s or 1000s of clients connect, would be running in a single thread in a single process.
The interesting part really starts with the server_socket
being set to be non-blocking. This means that it, and the client_socket
s created through it, won’t block on accept/recv/send
anymore. This makes them usable in coroutines. However, you would now have to carefully check whether a socket is in a state to accept/recv/send
before you actually call those methods. Luckily, asyncio
does the heavy-lifting for this in the background using the select
module and exposes the necessary functions to us through the loop
.
It is not surprising that asyncio
provides these low-level ways of interacting with sockets. Network programming is one of the main areas where concurrency with coroutines really shines. Having this functionality in asyncio
is therefore only natural.
The documentation for async
socket interaction is here. It also advises you to check out loop.create_server
which is a much more high-level way of setting up an async
socket server.
To run this code, we’ll adjust ./asgi/server/__init__.py
.
#./asgi/server/__init__.py
from .server import serve_forever
Now we can run the server from a top-level ./run.py
as follows:
#./run.py
import asyncio
from asgi.server import serve_forever
if __name__ == "__main__":
asyncio.run(serve_forever("127.0.0.1", 5000))
You can use netcat
again and see that indeed, several concurrent clients can interact with the server.
An asynchronous HTTP server
To add HTTP
capabilities to our server, we can reuse the parser we wrote for the WSGI
server. We’ll also reuse the functionality for creating a valid HTTP
response. Let’s put those files at ./asgi/server/http_parse.py
, ./asgi/server/splitbuffer.py
and ./asgi/server/http_response.py
respectively (check out this commit for a summary).
Now we can implement the parser callbacks in our server in ./asgi/server/server.py
and also take this opportunity to rewrite it in a more object-oriented way.
#./asgi/server/server.py
import asyncio
import socket
from .http_parse import HttpRequestParser
from .http_response import make_response
class Session:
def __init__(self, client_socket, address):
self.loop = asyncio.get_event_loop()
self.client_socket = client_socket
self.address = address
self.trigger_send_response = asyncio.Event()
self.response_sent = False
self.parser = HttpRequestParser(self)
async def run(self):
self.loop.create_task(self.send_response())
while True:
if self.response_sent:
break
data = await self.loop.sock_recv(self.client_socket, 1024)
print(f"Received {data}")
self.parser.feed_data(data)
self.client_socket.close()
print(f"Socket with {self.address} closed.")
async def send_response(self):
await self.trigger_send_response.wait()
body = b"<html><body>Hello World</body></html>"
response = make_response(status_code=200, headers=[], body=body)
await self.loop.sock_sendall(self.client_socket, response)
print("Response sent.")
self.response_sent = True
# HTTP parser callbacks
def on_url(self, url):
print(f"Received url: {url}")
def on_header(self, name: bytes, value: bytes):
print(f"Received header: ({name}, {value})")
def on_body(self, body: bytes):
print(f"Received body: {body}")
def on_message_complete(self):
print("Received request completely.")
self.trigger_send_response.set()
async def serve_forever(host, port):
server_socket = socket.socket()
server_socket.bind((host, port))
server_socket.listen(1)
server_socket.setblocking(False)
loop = asyncio.get_event_loop()
while True:
client_socket, address = await loop.sock_accept(server_socket)
print(f"Socket established with {address}.")
session = Session(client_socket, address)
loop.create_task(session.run())
In general, the behavior still looks very similar to the synchronous HTTP
server we implemented in the WSGI
part of this series. But I’ve highlighted the most important async
parts in the server, so let’s see what’s going on here.
First of all, both run()
and send_response()
are now async
methods. They need to be, because they interact directly with the asynchronous socket API. But since the parser is synchronous it must have some way to trigger the send_response()
method. This is solved by starting send_response()
in a separate task right at the beginning of running the Session
, but then it immediately awaits on self.trigger_send_response
, which is an asyncio.Event
. This blocks until it is set in the on_message_complete()
parser callback.
You can test that everything works by running ./run.py
and firing some curl
requests like e.g.
$ curl localhost:5000 -i
HTTP/1.1 200 OK
Content-Length: 37
<html><body>Hello World</body></html>
Which should log something like the following:
$ python run.py
Socket established with ('127.0.0.1', 36394).
Received b'GET / HTTP/1.1\r\nHost: localhost:5000\r\nUser-Agent: curl/7.69.1\r\nAccept: */*\r\n\r\n'
Received url: b'/'
Received header: (b'Host', b'localhost:5000')
Received header: (b'User-Agent', b'curl/7.69.1')
Received header: (b'Accept', b'*/*')
Received request completely.
Response sent.
Received b''
Received request completely.
Socket with ('127.0.0.1', 36394) closed.
If you want to have a step-by-step introduction to an async
HTTP
server with an emphasis on using the higher-level abstractions in the asyncio
library and making the server a bit more resilient, check out this insightful article.
The server just returns a dummy reply for now and doesn’t care about the input at all, let’s change that by turning it into a basic ASGI
server and wiring it up with an ASGI
application.
ASGI server
To start with, we need to add one new callback to the HTTP
request parser in ./asgi/server/http_parse.py
. It’s called on_headers_complete()
and will be triggered once all request headers are successfully read. In the ASGI
server, this is the point when we should await the app and then stream the request body in via events after that. In the parser we’ll now also save the request HTTP
method (which is parsed from the status line of the request).
#./asgi/server/http_parse.py
from .splitbuffer import SplitBuffer
class HttpRequestParser:
def __init__(self, protocol):
self.protocol = protocol
self.buffer = SplitBuffer()
self.http_method = ""
self.done_parsing_start = False
self.done_parsing_headers = False
self.expected_body_length = 0
def feed_data(self, data: bytes):
self.buffer.feed_data(data)
self.parse()
def parse(self):
if not self.done_parsing_start:
self.parse_startline()
elif not self.done_parsing_headers:
self.parse_headerline()
elif self.expected_body_length:
data = self.buffer.flush()
self.expected_body_length -= len(data)
self.protocol.on_body(data)
self.parse()
else:
self.protocol.on_message_complete()
def parse_startline(self):
line = self.buffer.pop(separator=b"\r\n")
if line is not None:
http_method, url, http_version = line.strip().split()
self.http_method = http_method
self.done_parsing_start = True
self.protocol.on_url(url)
self.parse()
def parse_headerline(self):
line = self.buffer.pop(separator=b"\r\n")
if line is not None:
if line:
name, value = line.strip().split(b": ", maxsplit=1)
if name.lower() == b"content-length":
self.expected_body_length = int(value.decode("utf-8"))
self.protocol.on_header(name, value)
else:
self.done_parsing_headers = True
self.protocol.on_headers_complete()
self.parse()
With that out of the way we can now implement the ASGI
protocol in the Session
in ./asgi/server/server.py
.
#./asgi/server/server.py
import asyncio
import socket
from .http_parse import HttpRequestParser
from .http_response import make_response
from .asgi import ASGIRequest, ASGIResponse
class Session:
def __init__(self, client_socket, address, app):
self.loop = asyncio.get_event_loop()
self.client_socket = client_socket
self.address = address
self.app = app
self.trigger_run_asgi = asyncio.Event()
self.parser = HttpRequestParser(self)
self.request = ASGIRequest()
self.response = ASGIResponse()
async def run(self):
self.loop.create_task(self.run_asgi())
while True:
if self.response.is_sent:
break
data = await self.loop.sock_recv(self.client_socket, 1024)
print(f"Received {data}")
self.parser.feed_data(data)
self.client_socket.close()
print(f"Socket with {self.address} closed.")
# ASGI Server protocol methods
async def run_asgi(self):
await self.trigger_run_asgi.wait()
await self.app(self.request.to_scope(), self.receive, self.send)
async def receive(self):
while True:
await self.request.trigger_more_body.wait()
return self.request.to_event()
async def send(self, event):
self.response.feed_event(event)
if self.response.is_complete:
resp_http = self.response.to_http()
await self.loop.sock_sendall(self.client_socket, resp_http)
print("Response sent.")
# HTTP parser callbacks
def on_url(self, url):
print(f"Received url: {url}")
self.request.http_method = self.parser.http_method.decode("utf-8")
self.request.path = url.decode("utf-8")
def on_header(self, name: bytes, value: bytes):
print(f"Received header: ({name}, {value})")
self.request.headers.append((name, value))
def on_headers_complete(self):
print("Received all headers.")
self.trigger_run_asgi.set()
def on_body(self, body: bytes):
print(f"Received body: {body}")
self.request.body_buffer += body
self.request.trigger_more_body.set()
def on_message_complete(self):
print("Received request completely.")
self.request.last_body = True
self.request.trigger_more_body.set()
...
There are some changes related to initializing the Session
, namely that we now hand in the ASGI
application object and initialize ASGIRequest
and ASGIResponse
objects (implementation details coming up shortly). Those objects will be used to aggregate request/response information and translate between HTTP
and ASGI
events.
Right at the start of a Session
, when awaiting the run()
method, we’ll create a new task for the run_asgi()
awaitable. This one, in turn, directly awaits the self.trigger_run_asgi
asyncio.Event
. In that way, we can later on (in the new headers_complete()
parser callback) set that event and thus trigger awaiting the ASGI
application.
The main changes to the server are that the HTTP
request parser callbacks now aggregate the request information into the new request
object and trigger ASGI
events via asyncio.Event
variables. Once we reach on_headers_complete
, we will start awaiting the ASGI
application and hand in the scope
(which can be created based on the request
object), and the two awaitables receive()
and send()
.
Subsequently, any incoming request body chunk is buffered in the request
object and then triggers another cycle in the receive()
awaitable, which sends that body chunk into the application via an ASGI
event. This is repeated until the on_message_complete
callback. At that point we’ll send in a special event that lets the application know that there is no more request body to come. In the send()
awaitable, the server is processing any events coming back from the application until the response is finished and we can send back the HTTP
response.
Now, let’s implement the ASGIRequest
and ASGIResponse
classes in ./asgi/server/asgi.py
.
#./asgi/server/asgi.py
import asyncio
from dataclasses import dataclass, field
from typing import List, Tuple
from .http_response import make_response
@dataclass
class ASGIRequest:
http_method: str = ""
path: str = ""
headers: List[Tuple[bytes, bytes]] = field(default_factory=lambda: [])
body_buffer: bytes = b""
trigger_more_body: asyncio.Event = asyncio.Event()
last_body: bool = False
def to_scope(self):
path_parts = self.path.split("?")
scope = {
"type": "http",
"asgi": {"version": "2.1", "spec_version": "2.1"},
"http_version": "1.1",
"method": self.http_method,
"scheme": "http",
"path": path_parts[0],
"query_string": path_parts[1] if len(path_parts) > 1 else "",
"headers": self.headers,
}
return scope
def to_event(self):
event = {
"type": "http.request",
"body": self.body_buffer,
"more_body": not self.last_body,
}
self.body_buffer = b""
return event
@dataclass
class ASGIResponse:
status_code: int = 200
headers: List[Tuple[bytes, bytes]] = field(default_factory=lambda: [])
body: bytes = b""
is_complete: bool = False
def to_http(self):
return make_response(self.status_code, self.headers, self.body)
def feed_event(self, event):
if event["type"] == "http.response.start":
self.status_code = event["status"]
self.headers = event["headers"]
elif event["type"] == "http.response.body":
self.body += event.get("body", b"")
if not event.get("more_body", False):
self.is_complete = True
Both are simple dataclasses that can aggregate request/response state. You can see how the creation of the scope
object is very similar to the environ
object in WSGI
. The big difference is that there is no file object, like wsgi.input
, to hand the request body into the application with. Instead the request body can be streamed into the application using events. The ASGIRequest
class contains a body_buffer
and can create a correctly formatted event to hand into the application. You can stream as many body chunks to the application as you like, until you stream an event where you set the more_body
field to False
. At that point it is assumed that there is no more request body to come and the application can start creating an appropriate response.
The ASGIResponse
class is very similar. But instead of creating events, it can be fed with events coming back from the application. In feed_event()
you can see that there are 2 possible event types: http.response.start
and http.response.body
. The first is used as the initial event from the application to the server and contains the response status code and headers (much like the start_response()
callable in the WSGI
protocol). After that, a stream of http.response.body
type events is expected, which work similarly to the events going from the server to the application that we just discussed. In this simple implementation we just aggregate all body chunks until more_body
is False
. At that point we set the is_complete
flag to True
which makes the Session
send the HTTP
response to the client via the socket. A more potent ASGI
server should be able to send the HTTP
response in chunks but we’re making our life a bit easier here.
You can read the details about the events and the scope
object when using ASGI
for HTTP
here.
Let’s also turn the serve_forever()
function into a method on an ASGIServer
class in ./asgi/server/server.py
to be more in line with the WSGI
server implementation.
#./asgi/server/server.py
...
class ASGIServer:
def __init__(self, host: str, port: int, app):
self.host = host
self.port = port
self.app = app
async def serve_forever(self):
server_socket = socket.socket()
server_socket.bind((self.host, self.port))
server_socket.listen(1)
server_socket.setblocking(False)
loop = asyncio.get_event_loop()
while True:
client_socket, address = await loop.sock_accept(server_socket)
print(f"Socket established with {address}.")
session = Session(client_socket, address, self.app)
loop.create_task(session.run())
This also requires a change to the import in ./asgi/server/__init__.py
.
#./asgi/server/__init__.py
from .server import ASGIServer
And finally we just need to set up a little ASGI
test application in ./run.py
to be able to verify our implementation. We’re using fastAPI
in this example.
#./run.py
from typing import List
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
from asgi.server import ASGIServer
app = FastAPI()
@app.get("/")
async def root():
print("hello from /")
return {"hello": "world"}
class Model(BaseModel):
id: int
name: str
@app.post("/create")
async def create(i: Model):
print("hello from /create")
return f"created {i}"
if __name__ == "__main__":
server = ASGIServer("127.0.0.1", 5000, app)
asyncio.run(server.serve_forever())
An example curl
request should work just fine:
$ curl localhost:5000/create -i -H "Content-Type: application/json" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 200 OK
content-length: 27
content-type: application/json
"created id=123 name='abc'"
Notes
And just like that, with a very manageable codebase, we’ve implemented a functional ASGI
server. Again, much of the simplicity is due to us not handling errors or edge cases or any kind of server resilience, but it’s still pretty cool to see. Take a look at uvicorn
if you want to check out a proper, mature ASGI
server. Especially their httptools
implementation shows a lot of parallels with the code we’ve just written.
Another point to keep in mind is that this ASGI
server implementation only works for HTTP
. An implementation for WebSockets
would exchange slightly different events between the server and application. You can dig into the uvicorn
code to see what that would look like.
Now the only thing left to do in this series is to implement our own little ASGI
application framework, let’s do that in the next post.