Part 10 - Our own ASGI server

(The repository for the ASGI part of this series is here.)

Let’s prepare another repository to follow along with all the ASGI code we’re going to write now. We’ll start out with a toplevel ./setup.py.

#./setup.py
from setuptools import setup, find_packages

setup(
    name="asgi",
    description="A tutorial implementation of an ASGI server and application.",
    version="0.0.1",
    packages=find_packages(),
)

Now we’ll create the actual package directory ./asgi/, add an empty ./asgi/__init__.py to it and then add the submodule ./asgi/server/, where we’ll also create an empty ./asgi/server/__init__.py. Now we’re all prepared to get started.

TCP server

Let’s build again from a very simple, synchronous TCP echo server.

#./asgi/server/server.py
import socket


def handle_socket(client_socket):
    while True:
        data = client_socket.recv(1024) # blocking
        print(f"Received {data}")
        if data == b"":
            break
        client_socket.send(data) # blocking
    client_socket.close()


def serve_forever(host, port):
    server_socket = socket.socket()
    server_socket.bind((host, port))
    server_socket.listen(1)

    while True:
        client_socket, address = server_socket.accept() # blocking
        print(f"Socket established with {address}.")
		handle_socket(client_socket)

I’ve highlighted the lines that could be blocking in this program. Before any socket is established, it blocks on waiting for incoming connections. And once a socket is up, it perpetually blocks on waiting for new incoming data and trying to send out data on that socket.

Concurrency using coroutines

As we saw in the synchronous TCP server post this leads to the server only ever being able to communicate with a single client at a time. Back then we solved the problem by handling each socket in a separate thread. This time, let’s look at how we can enable concurrent clients by using asyncio coroutines. I’m not going to say much more about coroutines or the event loop here, but I can wholeheartedly recommend David Beazley’s fantastic talk “Python Concurrency From the Ground Up”, where he compares different ways of achieving concurrency in python, especially in the context of network programming. In that talk he eventually even implements a tiny coroutine event loop: highly educational.

The necessary changes to enable coroutine-based concurrency in our server are as follows:

#./asgi/server/server.py
import asyncio
import socket


async def handle_socket(client_socket):
    loop = asyncio.get_event_loop()
    while True:
        data = await loop.sock_recv(client_socket, 1024)
        print(f"Received {data}")
        if data == b"":
            break
        await loop.sock_sendall(client_socket, data)
    client_socket.close()


async def serve_forever(host, port):
    server_socket = socket.socket()
    server_socket.bind((host, port))
    server_socket.listen(1)
    server_socket.setblocking(False)

    loop = asyncio.get_event_loop()
    while True:
        client_socket, address = await loop.sock_accept(server_socket)
        print(f"Socket established with {address}.")
        loop.create_task(handle_socket(client_socket))

The first thing you should notice is that the code looks very similar to the initial synchronous implementation. This is one of the big advantages with concurrency through coroutines: the code almost looks like synchronous code and can be reasoned about in a straightforward way.

The actual changes start with both functions now being async, i.e. they can be stepped in and out of at await statements. The server socket is set to non-blocking and all of the interactions with sockets (establishing a new socket, receiving data, sending data) are replaced with specific non-blocking asyncio loop functions. Those can be awaited, i.e. at those points the event loop gets the chance to switch to running some other piece of code. Each handle_socket for a specific client_socket is run in a separate asyncio loop task. The entire program, even once 100s or 1000s of clients connect, would be running in a single thread in a single process.

The interesting part really starts with the server_socket being set to be non-blocking. This means that it, and the client_sockets created through it, won’t block on accept/recv/send anymore. This makes them usable in coroutines. However, you would now have to carefully check whether a socket is in a state to accept/recv/send before you actually call those methods. Luckily, asyncio does the heavy-lifting for this in the background using the select module and exposes the necessary functions to us through the loop.

It is not surprising that asyncio provides these low-level ways of interacting with sockets. Network programming is one of the main areas where concurrency with coroutines really shines. Having this functionality in asyncio is therefore only natural.

The documentation for async socket interaction is here. It also advises you to check out loop.create_server which is a much more high-level way of setting up an async socket server.

To run this code, we’ll adjust ./asgi/server/__init__.py.

#./asgi/server/__init__.py
from .server import serve_forever

Now we can run the server from a top-level ./run.py as follows:

#./run.py
import asyncio
from asgi.server import serve_forever


if __name__ == "__main__":
    asyncio.run(serve_forever("127.0.0.1", 5000))

You can use netcat again and see that indeed, several concurrent clients can interact with the server.

An asynchronous HTTP server

To add HTTP capabilities to our server, we can reuse the parser we wrote for the WSGI server. We’ll also reuse the functionality for creating a valid HTTP response. Let’s put those files at ./asgi/server/http_parse.py, ./asgi/server/splitbuffer.py and ./asgi/server/http_response.py respectively (check out this commit for a summary).

Now we can implement the parser callbacks in our server in ./asgi/server/server.py and also take this opportunity to rewrite it in a more object-oriented way.

#./asgi/server/server.py
import asyncio
import socket
from .http_parse import HttpRequestParser
from .http_response import make_response


class Session:
    def __init__(self, client_socket, address):
        self.loop = asyncio.get_event_loop()
        self.client_socket = client_socket
        self.address = address
        self.trigger_send_response = asyncio.Event()
        self.response_sent = False
        self.parser = HttpRequestParser(self)

    async def run(self):
        self.loop.create_task(self.send_response())
        while True:
            if self.response_sent:
                break
            data = await self.loop.sock_recv(self.client_socket, 1024)
            print(f"Received {data}")
            self.parser.feed_data(data)
        self.client_socket.close()
        print(f"Socket with {self.address} closed.")

    async def send_response(self):
        await self.trigger_send_response.wait()
        body = b"<html><body>Hello World</body></html>"
        response = make_response(status_code=200, headers=[], body=body)
        await self.loop.sock_sendall(self.client_socket, response)
        print("Response sent.")
        self.response_sent = True

	# HTTP parser callbacks
    def on_url(self, url):
        print(f"Received url: {url}")

    def on_header(self, name: bytes, value: bytes):
        print(f"Received header: ({name}, {value})")

    def on_body(self, body: bytes):
        print(f"Received body: {body}")

    def on_message_complete(self):
        print("Received request completely.")
        self.trigger_send_response.set()


async def serve_forever(host, port):
    server_socket = socket.socket()
    server_socket.bind((host, port))
    server_socket.listen(1)
    server_socket.setblocking(False)

    loop = asyncio.get_event_loop()
    while True:
        client_socket, address = await loop.sock_accept(server_socket)
        print(f"Socket established with {address}.")
        session = Session(client_socket, address)
        loop.create_task(session.run())

In general, the behavior still looks very similar to the synchronous HTTP server we implemented in the WSGI part of this series. But I’ve highlighted the most important async parts in the server, so let’s see what’s going on here.

First of all, both run() and send_response() are now async methods. They need to be, because they interact directly with the asynchronous socket API. But since the parser is synchronous it must have some way to trigger the send_response() method. This is solved by starting send_response() in a separate task right at the beginning of running the Session, but then it immediately awaits on self.trigger_send_response, which is an asyncio.Event. This blocks until it is set in the on_message_complete() parser callback.

You can test that everything works by running ./run.py and firing some curl requests like e.g.

$ curl localhost:5000 -i
HTTP/1.1 200 OK
Content-Length: 37

<html><body>Hello World</body></html>

Which should log something like the following:

$ python run.py 
Socket established with ('127.0.0.1', 36394).
Received b'GET / HTTP/1.1\r\nHost: localhost:5000\r\nUser-Agent: curl/7.69.1\r\nAccept: */*\r\n\r\n'
Received url: b'/'
Received header: (b'Host', b'localhost:5000')
Received header: (b'User-Agent', b'curl/7.69.1')
Received header: (b'Accept', b'*/*')
Received request completely.
Response sent.
Received b''
Received request completely.
Socket with ('127.0.0.1', 36394) closed.

If you want to have a step-by-step introduction to an async HTTP server with an emphasis on using the higher-level abstractions in the asyncio library and making the server a bit more resilient, check out this insightful article.

The server just returns a dummy reply for now and doesn’t care about the input at all, let’s change that by turning it into a basic ASGI server and wiring it up with an ASGI application.

ASGI server

To start with, we need to add one new callback to the HTTP request parser in ./asgi/server/http_parse.py. It’s called on_headers_complete() and will be triggered once all request headers are successfully read. In the ASGI server, this is the point when we should await the app and then stream the request body in via events after that. In the parser we’ll now also save the request HTTP method (which is parsed from the status line of the request).

#./asgi/server/http_parse.py
from .splitbuffer import SplitBuffer


class HttpRequestParser:
    def __init__(self, protocol):
        self.protocol = protocol
        self.buffer = SplitBuffer()
        self.http_method = ""
        self.done_parsing_start = False
        self.done_parsing_headers = False
        self.expected_body_length = 0

    def feed_data(self, data: bytes):
        self.buffer.feed_data(data)
        self.parse()

    def parse(self):
        if not self.done_parsing_start:
            self.parse_startline()
        elif not self.done_parsing_headers:
            self.parse_headerline()
        elif self.expected_body_length:
            data = self.buffer.flush()
            self.expected_body_length -= len(data)
            self.protocol.on_body(data)
            self.parse()
        else:
            self.protocol.on_message_complete()

    def parse_startline(self):
        line = self.buffer.pop(separator=b"\r\n")
        if line is not None:
            http_method, url, http_version = line.strip().split()
            self.http_method = http_method
            self.done_parsing_start = True
            self.protocol.on_url(url)
            self.parse()

    def parse_headerline(self):
        line = self.buffer.pop(separator=b"\r\n")
        if line is not None:
            if line:
                name, value = line.strip().split(b": ", maxsplit=1)
                if name.lower() == b"content-length":
                    self.expected_body_length = int(value.decode("utf-8"))
                self.protocol.on_header(name, value)
            else:
                self.done_parsing_headers = True
                self.protocol.on_headers_complete()
            self.parse()

With that out of the way we can now implement the ASGI protocol in the Session in ./asgi/server/server.py.

#./asgi/server/server.py
import asyncio
import socket
from .http_parse import HttpRequestParser
from .http_response import make_response
from .asgi import ASGIRequest, ASGIResponse


class Session:
    def __init__(self, client_socket, address, app):
        self.loop = asyncio.get_event_loop()
        self.client_socket = client_socket
        self.address = address
        self.app = app
        self.trigger_run_asgi = asyncio.Event()
        self.parser = HttpRequestParser(self)
        self.request = ASGIRequest()
        self.response = ASGIResponse()

    async def run(self):
        self.loop.create_task(self.run_asgi())
        while True:
            if self.response.is_sent:
                break
            data = await self.loop.sock_recv(self.client_socket, 1024)
            print(f"Received {data}")
            self.parser.feed_data(data)
        self.client_socket.close()
        print(f"Socket with {self.address} closed.")


	# ASGI Server protocol methods
    async def run_asgi(self):
        await self.trigger_run_asgi.wait()
        await self.app(self.request.to_scope(), self.receive, self.send)

    async def receive(self):
        while True:
            await self.request.trigger_more_body.wait()
            return self.request.to_event()

    async def send(self, event):
        self.response.feed_event(event)
        if self.response.is_complete:
            resp_http = self.response.to_http()
            await self.loop.sock_sendall(self.client_socket, resp_http)
            print("Response sent.")

    # HTTP parser callbacks
    def on_url(self, url):
        print(f"Received url: {url}")
        self.request.http_method = self.parser.http_method.decode("utf-8")
        self.request.path = url.decode("utf-8")

    def on_header(self, name: bytes, value: bytes):
        print(f"Received header: ({name}, {value})")
        self.request.headers.append((name, value))

    def on_headers_complete(self):
        print("Received all headers.")
        self.trigger_run_asgi.set()

    def on_body(self, body: bytes):
        print(f"Received body: {body}")
        self.request.body_buffer += body
        self.request.trigger_more_body.set()

    def on_message_complete(self):
        print("Received request completely.")
        self.request.last_body = True
        self.request.trigger_more_body.set()

...

There are some changes related to initializing the Session, namely that we now hand in the ASGI application object and initialize ASGIRequest and ASGIResponse objects (implementation details coming up shortly). Those objects will be used to aggregate request/response information and translate between HTTP and ASGI events.

Right at the start of a Session, when awaiting the run() method, we’ll create a new task for the run_asgi() awaitable. This one, in turn, directly awaits the self.trigger_run_asgi asyncio.Event. In that way, we can later on (in the new headers_complete() parser callback) set that event and thus trigger awaiting the ASGI application.

The main changes to the server are that the HTTP request parser callbacks now aggregate the request information into the new request object and trigger ASGI events via asyncio.Event variables. Once we reach on_headers_complete, we will start awaiting the ASGI application and hand in the scope (which can be created based on the request object), and the two awaitables receive() and send().

Subsequently, any incoming request body chunk is buffered in the request object and then triggers another cycle in the receive() awaitable, which sends that body chunk into the application via an ASGI event. This is repeated until the on_message_complete callback. At that point we’ll send in a special event that lets the application know that there is no more request body to come. In the send() awaitable, the server is processing any events coming back from the application until the response is finished and we can send back the HTTP response.

Now, let’s implement the ASGIRequest and ASGIResponse classes in ./asgi/server/asgi.py.

#./asgi/server/asgi.py
import asyncio
from dataclasses import dataclass, field
from typing import List, Tuple
from .http_response import make_response


@dataclass
class ASGIRequest:
    http_method: str = ""
    path: str = ""
    headers: List[Tuple[bytes, bytes]] = field(default_factory=lambda: [])
    body_buffer: bytes = b""
    trigger_more_body: asyncio.Event = asyncio.Event()
    last_body: bool = False

    def to_scope(self):
        path_parts = self.path.split("?")
        scope = {
            "type": "http",
            "asgi": {"version": "2.1", "spec_version": "2.1"},
            "http_version": "1.1",
            "method": self.http_method,
            "scheme": "http",
            "path": path_parts[0],
            "query_string": path_parts[1] if len(path_parts) > 1 else "",
            "headers": self.headers,
        }
        return scope

    def to_event(self):
        event = {
            "type": "http.request",
            "body": self.body_buffer,
            "more_body": not self.last_body,
        }
        self.body_buffer = b""
        return event


@dataclass
class ASGIResponse:
    status_code: int = 200
    headers: List[Tuple[bytes, bytes]] = field(default_factory=lambda: [])
    body: bytes = b""
    is_complete: bool = False

    def to_http(self):
        return make_response(self.status_code, self.headers, self.body)

    def feed_event(self, event):
        if event["type"] == "http.response.start":
            self.status_code = event["status"]
            self.headers = event["headers"]
        elif event["type"] == "http.response.body":
            self.body += event.get("body", b"")
            if not event.get("more_body", False):
                self.is_complete = True

Both are simple dataclasses that can aggregate request/response state. You can see how the creation of the scope object is very similar to the environ object in WSGI. The big difference is that there is no file object, like wsgi.input, to hand the request body into the application with. Instead the request body can be streamed into the application using events. The ASGIRequest class contains a body_buffer and can create a correctly formatted event to hand into the application. You can stream as many body chunks to the application as you like, until you stream an event where you set the more_body field to False. At that point it is assumed that there is no more request body to come and the application can start creating an appropriate response.

The ASGIResponse class is very similar. But instead of creating events, it can be fed with events coming back from the application. In feed_event() you can see that there are 2 possible event types: http.response.start and http.response.body. The first is used as the initial event from the application to the server and contains the response status code and headers (much like the start_response() callable in the WSGI protocol). After that, a stream of http.response.body type events is expected, which work similarly to the events going from the server to the application that we just discussed. In this simple implementation we just aggregate all body chunks until more_body is False. At that point we set the is_complete flag to True which makes the Session send the HTTP response to the client via the socket. A more potent ASGI server should be able to send the HTTP response in chunks but we’re making our life a bit easier here.

You can read the details about the events and the scope object when using ASGI for HTTP here.

Let’s also turn the serve_forever() function into a method on an ASGIServer class in ./asgi/server/server.py to be more in line with the WSGI server implementation.

#./asgi/server/server.py
...

class ASGIServer:
    def __init__(self, host: str, port: int, app):
        self.host = host
        self.port = port
        self.app = app

    async def serve_forever(self):
        server_socket = socket.socket()
        server_socket.bind((self.host, self.port))
        server_socket.listen(1)
        server_socket.setblocking(False)

        loop = asyncio.get_event_loop()
        while True:
            client_socket, address = await loop.sock_accept(server_socket)
            print(f"Socket established with {address}.")
            session = Session(client_socket, address, self.app)
            loop.create_task(session.run())

This also requires a change to the import in ./asgi/server/__init__.py.

#./asgi/server/__init__.py
from .server import ASGIServer

And finally we just need to set up a little ASGI test application in ./run.py to be able to verify our implementation. We’re using fastAPI in this example.

#./run.py
from typing import List
import asyncio
from fastapi import FastAPI
from pydantic import BaseModel
from asgi.server import ASGIServer


app = FastAPI()


@app.get("/")
async def root():
    print("hello from /")
    return {"hello": "world"}


class Model(BaseModel):
    id: int
    name: str


@app.post("/create")
async def create(i: Model):
    print("hello from /create")
    return f"created {i}"


if __name__ == "__main__":
    server = ASGIServer("127.0.0.1", 5000, app)
    asyncio.run(server.serve_forever())

An example curl request should work just fine:

$ curl localhost:5000/create -i -H "Content-Type: application/json" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 200 OK
content-length: 27
content-type: application/json

"created id=123 name='abc'"

Notes

And just like that, with a very manageable codebase, we’ve implemented a functional ASGI server. Again, much of the simplicity is due to us not handling errors or edge cases or any kind of server resilience, but it’s still pretty cool to see. Take a look at uvicorn if you want to check out a proper, mature ASGI server. Especially their httptools implementation shows a lot of parallels with the code we’ve just written.

Another point to keep in mind is that this ASGI server implementation only works for HTTP. An implementation for WebSockets would exchange slightly different events between the server and application. You can dig into the uvicorn code to see what that would look like.

Now the only thing left to do in this series is to implement our own little ASGI application framework, let’s do that in the next post.