Part 11 - Our own ASGI framework

(The changes introduced in this post start here.)

We’ve already seen a minimal HTTP ASGI application in the form of an awaitable function. It looks like this:

async def appplication(scope, receive, send):
	# make sure this is a HTTP connection
    assert scope["type"] == "http"

	await send({
		"type": "http.response.start",
		"status": 200,
		"headers": [
			[b"Content-Type", b"text/plain"],
		],
	})
	await send({
		"type": "http.response.body",
		"body": b"Hello, World!",
		"more_body": False,
	})

It totally disregards the request (other than making sure it’s HTTP) and then sends 2 response events to the ASGI server to submit status, headers and a simple body.

The basic framework

To build up our framework, let’s first just copy and adjust the functionality we had in our WSGI framework. That was mainly: - a class-based Application object - being able to define path operations via decorators - get access to the HTTP request data via a Request class instance - Response class abstractions for a number of common response types (e.g. plain-text, JSON, …)

The ASGIApplication class is implemented in ./asgi/application/application.py (also create a new, empty __init__.py in that new module).

#./asgi/application/application.py
from typing import Callable
from dataclasses import dataclass
from .request import Request
from .response import PlainTextResponse, BaseResponse


class ASGIApplication:
    def __init__(self):
        self.path_operations = dict()

    def _register_path_operation(
        self, path: str, http_method: str, func: Callable
    ):
        po = PathOperation(path, http_method)
        self.path_operations[po] = func

    def _create_register_decorator(self, path: str, http_method: str):
        def decorator(func: Callable):
            self._register_path_operation(path, http_method, func)
            return func

        return decorator

    def get(self, path: str):
        return self._create_register_decorator(path, "GET")

    def post(self, path: str):
        return self._create_register_decorator(path, "POST")

    async def __call__(self, scope, receive, send):
        po = PathOperation(scope["path"], scope["method"])
        func = self.path_operations.get(po)
        if func is None:
            response = PlainTextResponse(status_code=404)
        else:
            request = Request.from_scope(scope)
            await request.fetch_body(receive)

			# now the request is fully processed, let's await the 
			# path operation function
            ret = await func(request=request)
            if isinstance(ret, BaseResponse):
                response = ret
            else:
                response = PlainTextResponse(body=ret)
        await send(response.get_response_start_event())
        await send(response.get_response_body_event())


@dataclass(frozen=True, eq=True)
class PathOperation:
    path: str
    http_method: str

The way that path operations are registered is exactly the same as in the WSGIApplication (see here for reference). The only difference in functionality is the __call__ implementation, which is now an async awaitable with the function signature that is expected of an ASGI application.

Just like in our WSGIApplication application, we first check whether we have a registered path operation for the request path and method. If so, we’re building a request object based on the scope. The request body is not part of the scope, instead it is streamed into the application through ASGI events. This is implemented on the Request object fetch_body() method (we’ll look at it shortly).

At this point the request has fully arrived in the application and we can start awaiting the actual path operation function. If the return from that is already a Response-type object, we’ll use it as the response. If not we create a default response using the return value as the body.

Finally, just like in the super simple application we looked at in the beginning of this post, the response is sent back to the ASGI server by awaiting on send twice. Once with the event that transfers the response status and headers, and then once more for the event that transfers the response body. N.b. that in a more sophisticated ASGI application implementation, it should be possible to stream the response body in chunks, and not necessarily accumulate it (like we do) and send it to the ASGI server in a single event.

Now let’s look at the Request and Response implementations that do some of the heavy lifting in the background. In ./asgi/application/request.py we implement the following:

#./asgi/application/request.py
from typing import Dict
from dataclasses import dataclass


@dataclass
class Request:
    query: Dict[str, str]
    headers: Dict[bytes, bytes]
    body: bytes = b""

    @classmethod
    def from_scope(cls, scope: Dict):
        query = {}
        if scope["query_string"]:
            qs = scope["query_string"]
            query = dict(entry.split("=") for entry in qs.split("&"))
        headers = dict(scope["headers"])
        return cls(query, headers)

    async def fetch_body(self, receive):
        while True:
            event = await receive()
            self.body += event["body"]
            if not event["more_body"]:
                break

Again, this is super similar to what we did in the WSGI application framework implementation (check here). The most significant addition is the new fetch_body() async method. It is used to listen for the request body events coming from the ASGI server, and accumulates those into the body variable until the request body is exhausted.

In ./asgi/application/response.py the following classes are implemented:

#./asgi/application/response.py
import json
from typing import List, Tuple, Optional, Any


class BaseResponse:
    def __init__(
        self,
        status_code: int = 200,
        headers: Optional[List[Tuple[bytes, bytes]]] = None,
        body: Optional[Any] = None,
    ):
        self.status_code = status_code
        self.headers = headers if headers is not None else []
        self.body = self.body_conversion(body) if body is not None else b""
        self.add_content_type_and_content_length()

    def add_content_type_and_content_length(self):
        header_names = {name for name, value in self.headers}
        if not b"Content-Type" in header_names:
            self.headers.append((b"Content-Type", self.content_type))
        if self.body and not b"Content-Length" in header_names:
            content_length = str(len(self.body)).encode("utf-8")
            self.headers.append((b"Content-Length", content_length))

    def get_response_start_event(self):
        return {
            "type": "http.response.start",
            "status": self.status_code,
            "headers": self.headers,
        }

    def get_response_body_event(self):
        return {
            "type": "http.response.body",
            "body": self.body,
            "more_body": False,
        }


class PlainTextResponse(BaseResponse):
    content_type = b"plain/text"

    @classmethod
    def body_conversion(cls, body):
        return body.encode("utf-8")


class HTMLResponse(BaseResponse):
    content_type = b"plain/html"

    @classmethod
    def body_conversion(cls, body):
        return body.encode("utf-8")


class JSONResponse(BaseResponse):
    content_type = b"application/json"

    @classmethod
    def body_conversion(cls, body):
        return json.dumps(body).encode("utf-8")

Once more, this is basically identical to what we did in WSGI land (check here). The only big change are the additional methods get_response_start_event() and get_response_body_event() which are used to create the necessary ASGI events for sending a response back to the ASGI server.

And that’s it. With only a few minor changes we’ve ported our WSGI application framework to the ASGI protocol. Let’s define some imports in ./asgi/application/__init__.py

#./asgi/application/__init__.py
from .application import ASGIApplication
from .request import Request

and adjust the ./run.py to use our own application framework instead of fastAPI:

#./run.py
from typing import List
import asyncio
from asgi.server import ASGIServer
from asgi.application import ASGIApplication, Request
from asgi.application.response import JSONResponse


app = ASGIApplication()


@app.get("/")
async def root(request: Request):
    print("hello from /")
    return {"hello": "world"}


@app.post("/create")
async def create(request: Request):
    print("hello from /create")
    return JSONResponse(body=f"created {request.body}")


if __name__ == "__main__":
    server = ASGIServer("127.0.0.1", 5000, app)
    asyncio.run(server.serve_forever())

A quick curl test reveals that all is in order:

$ curl localhost:5000/create -i -H "Content-Type: application/json" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 45

"created b'{\"id\": 123, \"name\": \"abc\"}'"

Rethinking access to request data

In my opinion, one of the ugliest parts of the current framework is the need to define request as an input to every single path operation function, regardless of whether it actually wants to access any request data. Different frameworks have solved this in different ways. E.g. in flask you have the context-local request object (imported globally) to avoid this problem. But I find the way fastAPI approaches this to be much more straightforward and descriptive.

In fastAPI you explicitly define which parameters from the request you want to get access to (and their type) and fastAPI will make sure to parse them out of the request data and call your path operation function with them as inputs.

class InputModel(BaseModel):
    id: int
	name: str

@app.post("/")
async def root(i: InputModel, limit: int):
	pass

With this path operation function signature fastAPI will try to create the i variable from the request body (because the type is a pydantic model), and the limit variable from a query parameter (because it is a simple python type) of that name. If any of those expected parameters are not present on the request, or can’t be converted to the specified types, fastAPI will return with a descriptive error response right away, before even calling your path operation function.

The beauty of this approach is that it’s super concise and you can see at one glance which request data some path operation function needs. Also, once the function is called you can be sure that the required data was indeed present on the request and was properly validated and converted by fastAPI. So in the function itself, you don’t need to do any data validation anymore, you can assume that the request was as expected.

Now, there are many different parameters you might want to get from a request (body, query, path, header, …) and inference from the specified type (e.g. if something is a complex pydantic model or a simple python type) will only let you differentiate between very few different parameter categories. That’s why fastAPI also provides a more explicit way of defining the kind of parameter that a certain path operation function input represents:

from fastapi import Body, Query, Header

class InputModel(BaseModel):
    id: int
	name: str

@app.post("/")
async def root(
    i: InputModel = Body(...),
    limit: int = Query(...),
    x_my_header: str = Header(...),
):
	pass

Now it’s made explicit where each of the parameters should be taken from. The ellipsis input ... simply means that even though a default value is used (one should say abused), this still doesn’t count as a default value for the function input, i.e. the parameter is still expected to be filled in from the request.

Those kinds of explicitly stated path operation parameters are what I’d like us to add to our framework now. Basically, we now need to do a bunch of stuff (grabbing data from the request, validating it, piping it into specific variables) before we actually await the path operation function. Let’s implement such a function in a new file ./asgi/application/path_operation.py. We’ll also put the PathOperation class there (which previously was in ./asgi/application/application.py), since it fits better in that new file.

#./asgi/application/path_operation.py
from typing import Awaitable
from dataclasses import dataclass
from inspect import signature
import json
from pydantic import ValidationError
from .request import Request
from .response import PlainTextResponse


@dataclass(frozen=True, eq=True)
class PathOperation:
    path: str
    http_method: str


class Body:
    pass


class Query:
    pass


class Header:
    pass


async def await_path_operation_function(func: Awaitable, req: Request):
    sig = signature(func)
    parameters = {}
    try:
        for param_name, param in sig.parameters.items():
            if isinstance(param.default, Body):
                param = param.annotation(**json.loads(req.body))
                parameters[param_name] = param
            elif isinstance(param.default, Query):
                try:
                    param = param.annotation(req.query[param_name])
                except KeyError as e:
                    raise ValueError(f"No query parameter {str(e)} found.")
                parameters[param_name] = param
            elif isinstance(param.default, Header):
				# transform to kebab case (which is usually used for headers)
                name = param_name.replace("_", "-").encode("utf-8")
                try:
                    param = param.annotation(req.headers[name])
                except KeyError as e:
                    raise ValueError(f"No header {str(e)} found.")
                parameters[param_name] = param
        return await func(**parameters)
    except (ValidationError, ValueError) as e:
        return PlainTextResponse(status_code=403, body=str(e))

The meat of this new file is the await_path_operation_function() awaitable. It takes the path operation function and a Request instance as inputs. It then uses the python inspect module to parse the path operation function signature (i.e. get us the information about function inputs and their (optional) type annotations and default values).

We then simply iterate over all parameters in the path operation function signature and check whether the default value is one of our 3 parameter types (Body, Query, Header). If so, it gets a special treatment. For a Body parameter, we try to create the pydantic model instance from the JSON of the request body. For Query or Header parameters we try to look them up by name from the req object and convert them to the specified annotation type. All of the parsed parameters are saved and then finally the path operation function is awaited with those parsed parameters as keyword argument inputs.

There is also some basic exception handling going on that returns a 403 error response with information about what went wrong when trying to parse the request.

All that is left to do now is to actually use this new way of awaiting the path operation function in our application in ./asgi/application/application.py:

#./asgi/application/application.py
@@ -2,6 +2,7 @@ from typing import Callable
 from dataclasses import dataclass
 from .request import Request
 from .response import PlainTextResponse, BaseResponse
+from .path_operation import PathOperation, await_path_operation_function
 
 
 class ASGIApplication:
@@ -35,16 +36,10 @@ class ASGIApplication:
         else:
             request = Request.from_scope(scope)
             await request.fetch_body(receive)
-            ret = await func(request=request)
+            ret = await await_path_operation_function(func, request)
             if isinstance(ret, BaseResponse):
                 response = ret
             else:
                 response = PlainTextResponse(body=ret)
         await send(response.get_response_start_event())
         await send(response.get_response_body_event())
-
-
-@dataclass(frozen=True, eq=True)
-class PathOperation:
-    path: str
-    http_method: str

We’ll also add the parameter categories to ./asgi/application/__init__.py:

#./asgi/application/__init__.py
from .application import ASGIApplication
from .request import Request
from .path_operation import Body, Query, Header

and change the test application in ./run.py to the following:

#./run.py
from typing import List
import asyncio
from asgi.server import ASGIServer
from asgi.application import ASGIApplication, Body, Query, Header
from asgi.application.response import JSONResponse
from pydantic import BaseModel


app = ASGIApplication()


@app.get("/")
async def root():
    print("hello from /")
    return {"hello": "world"}


class InputModel(BaseModel):
    id: int
    name: str


@app.post("/create")
async def create(
    i: InputModel = Body(), limit: int = Query(), x_my_header: str = Header()
):
    print("hello from /create")
    msg = f"created {dict(i)}, limit: {limit}, header: {x_my_header}"
    return JSONResponse(body=msg)


if __name__ == "__main__":
    server = ASGIServer("127.0.0.1", 5000, app)
    asyncio.run(server.serve_forever())

Now we can check out the new application in action. Let’s fire some queries.

$ curl localhost:5000/create?limit=13 -i -H "Content-Type: application/json" -H "x-my-header: test" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 64

"created {'id': 123, 'name': 'abc'}, limit: 13, header: b'test'"

That seems to work fine. Let’s see some of the error cases in action. For example, a missing field in the request body:

$ curl localhost:5000/create?limit=13 -i -H "Content-Type: application/json" -H "x-my-header: test" -d '{"id": 123, "title": "abc"}'
HTTP/1.1 403 Forbidden
Content-Type: plain/text
Content-Length: 82

1 validation error for InputModel
name
  field required (type=value_error.missing)

Or not sending the required header along:

$ curl localhost:5000/create?limit=13 -i -H "Content-Type: application/json" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 403 Forbidden
Content-Type: plain/text
Content-Length: 31

No header b'x-my-header' found.

Or a wrong type on the limit query parameter:

$ curl localhost:5000/create?limit=abc -i -H "Content-Type: application/json" -H "x-my-header: test" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 403 Forbidden
Content-Type: plain/text
Content-Length: 45

invalid literal for int() with base 10: 'abc'

Of course, the current implementation is riddled with all kinds of problems: How can we use path parameters? What happens if you define more than 1 body parameter? How would you define actual default values for some of the parameters, so that they don’t necessarily have to come in through the request?

But despite all of these issues, it looks quite a lot like fastAPI already. And additional features would not require any logical leaps anymore, you would simply need to pile on additional business logic on top of the established structure.

Notes

Five things that I use all the time in fastAPI are not implemented in our little framework yet: - response marshalling (i.e. doing to responses what we already do to requests) - authentication abstractions - dependency injection - error handlers - openAPI schema generation

Adding those features is left as an exercise to the readers. The fastapi documentation is excellent and will give you plenty of information about each of those. Also if you want to check out a mature, more low-level ASGI framework, have a look at starlette. It’s what fastAPI is based on.

Originally I also wanted to implement a little pydantic clone in this series, to be completely independent from any 3rd party libraries. But I decided to scrap it. Writing all of these posts already took a lot more time than I originally imagined. And the focus here is really on the networking aspects, so a deeper understanding of pydantic is not really essential for the knowledge I wanted to impart.

I hope you’ve learned something and had some fun along the way.