Part 11 - Our own ASGI framework
(The changes introduced in this post start here.)
We’ve already seen a minimal HTTP
ASGI
application in the form of an awaitable function. It looks like this:
async def appplication(scope, receive, send):
# make sure this is a HTTP connection
assert scope["type"] == "http"
await send({
"type": "http.response.start",
"status": 200,
"headers": [
[b"Content-Type", b"text/plain"],
],
})
await send({
"type": "http.response.body",
"body": b"Hello, World!",
"more_body": False,
})
It totally disregards the request (other than making sure it’s HTTP
) and then sends 2 response events to the ASGI
server to submit status, headers and a simple body.
The basic framework
To build up our framework, let’s first just copy and adjust the functionality we had in our WSGI
framework. That was mainly:
- a class-based Application
object
- being able to define path operations via decorators
- get access to the HTTP
request data via a Request
class instance
- Response
class abstractions for a number of common response types (e.g. plain-text, JSON
, …)
The ASGIApplication
class is implemented in ./asgi/application/application.py
(also create a new, empty __init__.py
in that new module).
#./asgi/application/application.py
from typing import Callable
from dataclasses import dataclass
from .request import Request
from .response import PlainTextResponse, BaseResponse
class ASGIApplication:
def __init__(self):
self.path_operations = dict()
def _register_path_operation(
self, path: str, http_method: str, func: Callable
):
po = PathOperation(path, http_method)
self.path_operations[po] = func
def _create_register_decorator(self, path: str, http_method: str):
def decorator(func: Callable):
self._register_path_operation(path, http_method, func)
return func
return decorator
def get(self, path: str):
return self._create_register_decorator(path, "GET")
def post(self, path: str):
return self._create_register_decorator(path, "POST")
async def __call__(self, scope, receive, send):
po = PathOperation(scope["path"], scope["method"])
func = self.path_operations.get(po)
if func is None:
response = PlainTextResponse(status_code=404)
else:
request = Request.from_scope(scope)
await request.fetch_body(receive)
# now the request is fully processed, let's await the
# path operation function
ret = await func(request=request)
if isinstance(ret, BaseResponse):
response = ret
else:
response = PlainTextResponse(body=ret)
await send(response.get_response_start_event())
await send(response.get_response_body_event())
@dataclass(frozen=True, eq=True)
class PathOperation:
path: str
http_method: str
The way that path operations are registered is exactly the same as in the WSGIApplication
(see here for reference). The only difference in functionality is the __call__
implementation, which is now an async
awaitable with the function signature that is expected of an ASGI
application.
Just like in our WSGIApplication
application, we first check whether we have a registered path operation for the request path and method. If so, we’re building a request
object based on the scope
. The request body is not part of the scope
, instead it is streamed into the application through ASGI
events. This is implemented on the Request
object fetch_body()
method (we’ll look at it shortly).
At this point the request has fully arrived in the application and we can start awaiting the actual path operation function. If the return from that is already a Response
-type object, we’ll use it as the response. If not we create a default response using the return value as the body.
Finally, just like in the super simple application we looked at in the beginning of this post, the response is sent back to the ASGI
server by awaiting on send
twice. Once with the event that transfers the response status and headers, and then once more for the event that transfers the response body. N.b. that in a more sophisticated ASGI
application implementation, it should be possible to stream the response body in chunks, and not necessarily accumulate it (like we do) and send it to the ASGI
server in a single event.
Now let’s look at the Request
and Response
implementations that do some of the heavy lifting in the background. In ./asgi/application/request.py
we implement the following:
#./asgi/application/request.py
from typing import Dict
from dataclasses import dataclass
@dataclass
class Request:
query: Dict[str, str]
headers: Dict[bytes, bytes]
body: bytes = b""
@classmethod
def from_scope(cls, scope: Dict):
query = {}
if scope["query_string"]:
qs = scope["query_string"]
query = dict(entry.split("=") for entry in qs.split("&"))
headers = dict(scope["headers"])
return cls(query, headers)
async def fetch_body(self, receive):
while True:
event = await receive()
self.body += event["body"]
if not event["more_body"]:
break
Again, this is super similar to what we did in the WSGI
application framework implementation (check here). The most significant addition is the new fetch_body()
async
method. It is used to listen for the request body events coming from the ASGI
server, and accumulates those into the body
variable until the request body is exhausted.
In ./asgi/application/response.py
the following classes are implemented:
#./asgi/application/response.py
import json
from typing import List, Tuple, Optional, Any
class BaseResponse:
def __init__(
self,
status_code: int = 200,
headers: Optional[List[Tuple[bytes, bytes]]] = None,
body: Optional[Any] = None,
):
self.status_code = status_code
self.headers = headers if headers is not None else []
self.body = self.body_conversion(body) if body is not None else b""
self.add_content_type_and_content_length()
def add_content_type_and_content_length(self):
header_names = {name for name, value in self.headers}
if not b"Content-Type" in header_names:
self.headers.append((b"Content-Type", self.content_type))
if self.body and not b"Content-Length" in header_names:
content_length = str(len(self.body)).encode("utf-8")
self.headers.append((b"Content-Length", content_length))
def get_response_start_event(self):
return {
"type": "http.response.start",
"status": self.status_code,
"headers": self.headers,
}
def get_response_body_event(self):
return {
"type": "http.response.body",
"body": self.body,
"more_body": False,
}
class PlainTextResponse(BaseResponse):
content_type = b"plain/text"
@classmethod
def body_conversion(cls, body):
return body.encode("utf-8")
class HTMLResponse(BaseResponse):
content_type = b"plain/html"
@classmethod
def body_conversion(cls, body):
return body.encode("utf-8")
class JSONResponse(BaseResponse):
content_type = b"application/json"
@classmethod
def body_conversion(cls, body):
return json.dumps(body).encode("utf-8")
Once more, this is basically identical to what we did in WSGI
land (check here). The only big change are the additional methods get_response_start_event()
and get_response_body_event()
which are used to create the necessary ASGI
events for sending a response back to the ASGI
server.
And that’s it. With only a few minor changes we’ve ported our WSGI
application framework to the ASGI
protocol. Let’s define some imports in ./asgi/application/__init__.py
#./asgi/application/__init__.py
from .application import ASGIApplication
from .request import Request
and adjust the ./run.py
to use our own application framework instead of fastAPI
:
#./run.py
from typing import List
import asyncio
from asgi.server import ASGIServer
from asgi.application import ASGIApplication, Request
from asgi.application.response import JSONResponse
app = ASGIApplication()
@app.get("/")
async def root(request: Request):
print("hello from /")
return {"hello": "world"}
@app.post("/create")
async def create(request: Request):
print("hello from /create")
return JSONResponse(body=f"created {request.body}")
if __name__ == "__main__":
server = ASGIServer("127.0.0.1", 5000, app)
asyncio.run(server.serve_forever())
A quick curl
test reveals that all is in order:
$ curl localhost:5000/create -i -H "Content-Type: application/json" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 45
"created b'{\"id\": 123, \"name\": \"abc\"}'"
Rethinking access to request data
In my opinion, one of the ugliest parts of the current framework is the need to define request
as an input to every single path operation function, regardless of whether it actually wants to access any request data. Different frameworks have solved this in different ways. E.g. in flask
you have the context-local request
object (imported globally) to avoid this problem. But I find the way fastAPI
approaches this to be much more straightforward and descriptive.
In fastAPI
you explicitly define which parameters from the request you want to get access to (and their type) and fastAPI
will make sure to parse them out of the request data and call your path operation function with them as inputs.
class InputModel(BaseModel):
id: int
name: str
@app.post("/")
async def root(i: InputModel, limit: int):
pass
With this path operation function signature fastAPI
will try to create the i
variable from the request body (because the type is a pydantic
model), and the limit
variable from a query
parameter (because it is a simple python type) of that name. If any of those expected parameters are not present on the request, or can’t be converted to the specified types, fastAPI
will return with a descriptive error response right away, before even calling your path operation function.
The beauty of this approach is that it’s super concise and you can see at one glance which request data some path operation function needs. Also, once the function is called you can be sure that the required data was indeed present on the request and was properly validated and converted by fastAPI
. So in the function itself, you don’t need to do any data validation anymore, you can assume that the request was as expected.
Now, there are many different parameters you might want to get from a request (body, query, path, header, …) and inference from the specified type (e.g. if something is a complex pydantic
model or a simple python type) will only let you differentiate between very few different parameter categories. That’s why fastAPI
also provides a more explicit way of defining the kind of parameter that a certain path operation function input represents:
from fastapi import Body, Query, Header
class InputModel(BaseModel):
id: int
name: str
@app.post("/")
async def root(
i: InputModel = Body(...),
limit: int = Query(...),
x_my_header: str = Header(...),
):
pass
Now it’s made explicit where each of the parameters should be taken from. The ellipsis input ...
simply means that even though a default value is used (one should say abused), this still doesn’t count as a default value for the function input, i.e. the parameter is still expected to be filled in from the request.
Those kinds of explicitly stated path operation parameters are what I’d like us to add to our framework now. Basically, we now need to do a bunch of stuff (grabbing data from the request, validating it, piping it into specific variables) before we actually await
the path operation function. Let’s implement such a function in a new file ./asgi/application/path_operation.py
. We’ll also put the PathOperation
class there (which previously was in ./asgi/application/application.py
), since it fits better in that new file.
#./asgi/application/path_operation.py
from typing import Awaitable
from dataclasses import dataclass
from inspect import signature
import json
from pydantic import ValidationError
from .request import Request
from .response import PlainTextResponse
@dataclass(frozen=True, eq=True)
class PathOperation:
path: str
http_method: str
class Body:
pass
class Query:
pass
class Header:
pass
async def await_path_operation_function(func: Awaitable, req: Request):
sig = signature(func)
parameters = {}
try:
for param_name, param in sig.parameters.items():
if isinstance(param.default, Body):
param = param.annotation(**json.loads(req.body))
parameters[param_name] = param
elif isinstance(param.default, Query):
try:
param = param.annotation(req.query[param_name])
except KeyError as e:
raise ValueError(f"No query parameter {str(e)} found.")
parameters[param_name] = param
elif isinstance(param.default, Header):
# transform to kebab case (which is usually used for headers)
name = param_name.replace("_", "-").encode("utf-8")
try:
param = param.annotation(req.headers[name])
except KeyError as e:
raise ValueError(f"No header {str(e)} found.")
parameters[param_name] = param
return await func(**parameters)
except (ValidationError, ValueError) as e:
return PlainTextResponse(status_code=403, body=str(e))
The meat of this new file is the await_path_operation_function()
awaitable. It takes the path operation function and a Request
instance as inputs. It then uses the python inspect
module to parse the path operation function signature (i.e. get us the information about function inputs and their (optional) type annotations and default values).
We then simply iterate over all parameters in the path operation function signature and check whether the default value is one of our 3 parameter types (Body
, Query
, Header
). If so, it gets a special treatment. For a Body
parameter, we try to create the pydantic
model instance from the JSON
of the request body. For Query
or Header
parameters we try to look them up by name from the req
object and convert them to the specified annotation type. All of the parsed parameters are saved and then finally the path operation function is awaited with those parsed parameters as keyword argument inputs.
There is also some basic exception handling going on that returns a 403
error response with information about what went wrong when trying to parse the request.
All that is left to do now is to actually use this new way of awaiting the path operation function in our application in ./asgi/application/application.py
:
#./asgi/application/application.py
@@ -2,6 +2,7 @@ from typing import Callable
from dataclasses import dataclass
from .request import Request
from .response import PlainTextResponse, BaseResponse
+from .path_operation import PathOperation, await_path_operation_function
class ASGIApplication:
@@ -35,16 +36,10 @@ class ASGIApplication:
else:
request = Request.from_scope(scope)
await request.fetch_body(receive)
- ret = await func(request=request)
+ ret = await await_path_operation_function(func, request)
if isinstance(ret, BaseResponse):
response = ret
else:
response = PlainTextResponse(body=ret)
await send(response.get_response_start_event())
await send(response.get_response_body_event())
-
-
-@dataclass(frozen=True, eq=True)
-class PathOperation:
- path: str
- http_method: str
We’ll also add the parameter categories to ./asgi/application/__init__.py
:
#./asgi/application/__init__.py
from .application import ASGIApplication
from .request import Request
from .path_operation import Body, Query, Header
and change the test application in ./run.py
to the following:
#./run.py
from typing import List
import asyncio
from asgi.server import ASGIServer
from asgi.application import ASGIApplication, Body, Query, Header
from asgi.application.response import JSONResponse
from pydantic import BaseModel
app = ASGIApplication()
@app.get("/")
async def root():
print("hello from /")
return {"hello": "world"}
class InputModel(BaseModel):
id: int
name: str
@app.post("/create")
async def create(
i: InputModel = Body(), limit: int = Query(), x_my_header: str = Header()
):
print("hello from /create")
msg = f"created {dict(i)}, limit: {limit}, header: {x_my_header}"
return JSONResponse(body=msg)
if __name__ == "__main__":
server = ASGIServer("127.0.0.1", 5000, app)
asyncio.run(server.serve_forever())
Now we can check out the new application in action. Let’s fire some queries.
$ curl localhost:5000/create?limit=13 -i -H "Content-Type: application/json" -H "x-my-header: test" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 200 OK
Content-Type: application/json
Content-Length: 64
"created {'id': 123, 'name': 'abc'}, limit: 13, header: b'test'"
That seems to work fine. Let’s see some of the error cases in action. For example, a missing field in the request body:
$ curl localhost:5000/create?limit=13 -i -H "Content-Type: application/json" -H "x-my-header: test" -d '{"id": 123, "title": "abc"}'
HTTP/1.1 403 Forbidden
Content-Type: plain/text
Content-Length: 82
1 validation error for InputModel
name
field required (type=value_error.missing)
Or not sending the required header along:
$ curl localhost:5000/create?limit=13 -i -H "Content-Type: application/json" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 403 Forbidden
Content-Type: plain/text
Content-Length: 31
No header b'x-my-header' found.
Or a wrong type on the limit
query parameter:
$ curl localhost:5000/create?limit=abc -i -H "Content-Type: application/json" -H "x-my-header: test" -d '{"id": 123, "name": "abc"}'
HTTP/1.1 403 Forbidden
Content-Type: plain/text
Content-Length: 45
invalid literal for int() with base 10: 'abc'
Of course, the current implementation is riddled with all kinds of problems: How can we use path parameters? What happens if you define more than 1 body parameter? How would you define actual default values for some of the parameters, so that they don’t necessarily have to come in through the request?
But despite all of these issues, it looks quite a lot like fastAPI
already. And additional features would not require any logical leaps anymore, you would simply need to pile on additional business logic on top of the established structure.
Notes
Five things that I use all the time in fastAPI
are not implemented in our little framework yet:
- response marshalling (i.e. doing to responses what we already do to requests)
- authentication abstractions
- dependency injection
- error handlers
- openAPI
schema generation
Adding those features is left as an exercise to the readers. The fastapi
documentation is excellent and will give you plenty of information about each of those. Also if you want to check out a mature, more low-level ASGI
framework, have a look at starlette
. It’s what fastAPI
is based on.
Originally I also wanted to implement a little pydantic
clone in this series, to be completely independent from any 3rd party libraries. But I decided to scrap it. Writing all of these posts already took a lot more time than I originally imagined. And the focus here is really on the networking aspects, so a deeper understanding of pydantic
is not really essential for the knowledge I wanted to impart.
I hope you’ve learned something and had some fun along the way.