From yield to async/await
Generators
Python added the yield
statement to the language all the way back in 2001 (python 2.2
). This addition was motivated by PEP 255 and was designed to enable generators.
A generator is basically a lazy iterator, a lazy producer of values. Instead of holding all the items in memory beforehand, a generator can always create the next value on the fly once requested.
A typical motivating example for generators is usually a function that computes al fibonacci numbers up to the n-th. It’s a good example because it encapsulates 2 essential features of generators: not having to hold all produced items in memory at once and the ability to maintain state between iterations.
Let’s check what a traditional function version would look like. It first calculates all required numbers and then returns a list of results:
def fibonacci_list(n):
a, b = 1, 1
numbers = []
for _ in range(n):
numbers.append(a)
a, b = b, a + b
return numbers
A generator version would be the following. It only ever yield
s the latest requested value and maintains the function state between iterations.
def fibonacci_generator(n):
a, b = 1, 1
for _ in range(n):
yield a
a, b = b, a + b
Although both constructs look like functions, they are driven/executed in a very different way. fibonacci_list
, being a regular function, just needs to be called.
>>> fibonacci_list(7)
[1, 1, 2, 3, 5, 8, 13]
Given the fact that you know it returns an iterable list, you could also use it directly in a for
loop.
>>> for n in fibonacci_list(3):
... print(n)
...
1
1
2
If you try to call fibonacci_generator
, nothing really seems to happen at first. You only get a generator object back, not any actual output values.
>>> fibonacci_generator(7)
<generator object fibonacci_generator at 0x7efdb354d120>
The way you drive it is by using its __next__
interface (called by the python built-in function next()
). This advances the generator to the next yield
statement and returns you the value of the expression to the right of yield
. You can do that again and again until the generator function runs out. At this point a StopIteration
exception is raised.
>>> g = fibonacci_generator(3)
>>> next(c)
1
>>> next(c)
1
>>> next(c)
2
>>> next(C)
# StopIteration
This is exactly the python iterator protocol, just that a generator creates values lazily. So you can, of course, use it in a for
loop naturally. At this point, the technical differences between a function that returns a list of values and a generator effectively vanish behind the iterator protocol.
>>> for n in fibonacci_generator(7):
... print(n)
...
1
1
2
3
5
8
13
The simple fibonacci example fails to really showcase the advantages that a generator brings to the table. Holding a couple of integers in memory isn’t really a challenge. But how about iterating over all lines in some large file? If you implement that using a generator you would never need to hold more than a single line of the file in memory at a time.
def lines():
with open("file.txt") as f:
for line in f:
yield line
That’s definitely a lot better than holding the entire file in memory if all you ever need to do is look at a single line at a time. Or how about a generator that can yield you a range of integers indefinitely? That wouldn’t even be possible with a list.
def indefinite():
n = 0
while True:
yield n
n += 1
So, in general, this is a pretty cool concept. You essentially have something that looks very much like a function but its execution can be suspended and resumed at yield
statements. It also integrates nicely with the iterator protocol, so in many cases you don’t have to think about how to “drive it”, because a for
loop will do it for you automatically.
Why not use a custom iterator?
Nothing that was shown in the previous examples couldn’t be done with a custom object implementing the iterator protocol. For example the fibonacci generator could be implemented as follows:
class Fibonacci:
def __init__(self, n):
self.a, self.b = 1, 1
self.n = n
self.i = 0
def __next__(self):
# need a special 0-case here
if self.i == 0:
self.i += 1
return self.a
elif self.i < self.n:
self.a, self.b = self.b, self.a + self.b
self.i += 1
return self.a
else:
raise StopIteration
def __iter__(self):
return self
But I hope you would agree that there is quite a lot of boilerplate going on in that code. You manually need to keep track of the loop counter, you manually need to raise a StopIteration
, you have to write a class with 3 methods instead of a single function. The generator implementation of the same functionality is much cleaner. And since this kind of functionality is so commonly needed in programs, generator functions with yield
statements were considered a useful addition to python.
Complex internal state
Since all variables in a generator function persist when suspending, you can store complex internal state. For example, we could easily change the lines()
generator to only yield
lines that weren’t seen before and that contain more characters than the previous line.
def lines():
lines_seen = {}
previous_line_len = 0
with open("file.txt") as f:
for line in f:
if not hash(line) in lines_seen and len(line) > previous_line_len:
yield line
lines_seen.add(hash(line))
previous_line_len = len(line)
This generator packs quite a punch, despite being so concise.
Returning from a generator
When generators were first introduced, it wasn’t possible to return a final value with return
. You could use a blank return
statement (which would then raise a StopIteration
subsequently), but you could not use return <expression>
. So essentially it was just a way to explicitly end your generator.
Note that this behavior is also implicit in the examples we’ve looked at so far. All the finite generators we’ve seen simply ran the function until completion to end the iteration. Running a function to completion, without an explicit return
, triggers an implicit empty return
in python.
Not being able to return
some kind of final value makes sense when you only want to use generator functions as lazy iterators. If you think of e.g. a for
loop, after all values are exhausted, what should a final, additional value even be?
So, initially, the only way to get values out of a generator function is to use yield <expression>
. We’ll see how that changed in later implementations, and why.
Why do we reuse syntax of regular function definitions?
One point of contention with generators has always been that they look too similar to regular functions. They reuse the def
syntax, and the only way to know whether something is a function or a generator function is to notice yield
statements in the function body.
Given that the way you “drive” regular functions and generator functions is very different, this was a source of confusion. Especially for people new to the language. Maybe one could have introduced some new syntax to explicitly define a generator?
gen fibonacci():
yield <something>
Actually, the discussion around this issue is part of PEP 255. Check it out to get some more insight into why the def
syntax was reused for generator functions, and also some of the other design decisions around generator functions.
Coroutines
In generators, the behavior at yield
statements is limited to “producing” values. There is no way to send in a value when resuming a generator. Once that is possible you get a full-fledged coroutine, a function that can be stopped/resumed at specific points and can both produce/receive a values at these points.
Coroutines enable straightforward solutions to many programming problems. One example would be simulations, where everytime you resume the coroutine, the simulation can advance “one step”. This is not something we want to focus on in this post, but you can see a nice example implementation of Conway’s Game Of Life using coroutines here. Another use for coroutines would be to build a lightweight concurrency system, where a central scheduler drives a bunch of coroutines (e.g. in a round-robin fashion). Each of them can make progress before any of them necessarily needs to have finished.
This is quite cool. It enables you to implement a multitasking system without additional threads or processes (which are usually costly to spin up). Also, the scheduler lies entirely in your program, so you have full control over when certain coroutines get scheduled for execution. This is very different than e.g. using threads, where the kernel would switch rapidly between threads at arbitrary points (without you having any control over where exactly those points will be).
In 2005 (python 2.5
) exactly those features were added on top of generators. The reasoning is described in PEP 342. It’s interesting to see the evolution from PEP 255 to PEP 342. The first motivates generators mainly as lazy data producers, the second one puts much more emphasis on using coroutines for implementing lightweight concurrency schemes. From now on, in this post, I won’t call those functions with yield
in them “generators” or “generator functions” anymore, instead I’ll call them “coroutines”.
The changes introduced through PEP 342 give coroutines some new abilities, chiefly among them 3 new methods .send()
and .throw()
and .close()
. The first one is used to advance a coroutine and send in a value. The second is used to advance a coroutine and send in an exception that then gets raised in the coroutine. The third one is used to shut down a coroutine, which raises a special GeneratorExit
exception in the coroutine which you can handle to do some cleanup if you want. To facilitate the “receiving” of a value within the coroutine, the yield
statement became a yield
expression, i.e. it can be used on the right-hand side of an assignment.
def coro():
while True:
value = yield
print(f"Got: {value}")
>>> c = coro()
>>> c.send(None)
>>> c.send("Hello")
Got: Hello
>>> c.send("World")
Got: World
As you can see, yield
still suspends the coroutine execution. A peculiarity of working with coroutines in python is that after creating the object, you manually need to advance it to the first yield
. This can be done by sending in an empty value, either through c.send(None)
or next(c)
(those two ways of advancing a coroutine are equivalent).
The way you interact with a coroutine is conceptually slightly different to a generator. A generator is driven by the iterator protocol, e.g. through a for
loop, and “pulling” out a value at every step. A coroutine instead is driven by “pushing” in a value (which can be None
) at every step.
You could also both produce and consume values in a coroutine at the same time. Consider the following example that uses its internal state to keep track of the maximum value it has seen.
def maximum():
maxi = None
while True:
n = yield maxi
maxi = n if maxi is None or n > maxi else maxi
>>> m = maximum()
>>> m.send(None)
>>> m.send(13)
13
>>> m.send(5)
13
>>> m.send(20)
20
Cooperative Multitasking
Now let’s have a look at how you could use those coroutines for a simple concurrency scheme. We’ll define 2 coroutines and we’ll have a scheduler call them round-robin style.
def coro1():
print("coro1 doing some work")
yield
print("coro1 doing some work")
yield
def coro2():
print("coro2 doing some work")
yield
print("coro2 doing some work")
yield
def scheduler():
c1 = coro1()
c2 = coro2()
c1.send(None)
c2.send(None)
c1.send(None)
c2.send(None)
>>> scheduler()
coro1 doing some work
coro2 doing some work
coro1 doing some work
coro2 doing some work
We make intermittent progress in 2 separate coroutines without the need for separate threads or processes. This really doesn’t have anything to do with producing or consuming data anymore. Instead yield
is used as a signal that gives control back to the caller of a coroutine (in this case, the scheduler). A feature that started out as a simple way of writing lazy iterators is now being used for a totally different purpose.
You might not immediately appreciate the value of this possibility, but it’s a pretty big deal e.g. in network programming. If you want to write a server in python that can handle thousands of concurrent clients, you simply won’t be able to do that using threads. They consume too many resources. But you can easily have thousands of tiny coroutine workers that are suspended and resumed by a smart scheduler. In fact, we’ll do exactly that in the next blog post where we’ll implement a tiny event loop entirely in python.
In the following examples you’ll see that we’re not really using any of the new coroutine features (like .throw()
or close()
). We’re mostly making use of .send(None)
to advance coroutines, which is the same thing as next(coro)
, which was already available with simple generator functions. So it might seem like those new coroutine features from PEP 342 were not necessary after all. While that is technically true for our examples, the new capabilities (especially .throw()
and .close()
) are vital e.g. for more mature implementations of schedulers.
Automatic scheduler
The simple manual scheduler we built could also be automated and chew away on a number of coroutines until they are all completely exhausted. The heart of the scheduler
function is a queue of coroutines to advance. Then as long as there is any coroutine left to run, the scheduler pops it off the list, executes it until the next yield
(or its end) and then appends it to the queue again for the next cycle.
from collections import deque
def scheduler(coros):
ready = deque(coros)
while ready:
try:
# take next coroutine that is ready to run
coro = ready.popleft()
# run it until the next "yield"
result = coro.send(None)
# schedule it for another execution
ready.append(coro)
except StopIteration:
pass
>>> scheduler([coro1(), coro2()])
coro1 doing some work
coro2 doing some work
coro1 doing some work
coro2 doing some work
Asynchronous execution
It’s already pretty interesting that you can switch between coroutines in that way. But it gets really interesting when you consider that a program quite often needs to wait for I/O (communication with a network service, read/write of files, read/write of data in a database, etc.).
In traditional, synchronous code, your program simply needs to wait at those points. And this might take a while. E.g. downloading a website from a webserver might take a couple of seconds, during which your program blocks and can’t do anything else.
Let’s simulate those blocking I/O calls with time.sleep()
and write a mock program that both fetches a website and reads some data from a database.
import time
def get_page():
print("Starting to download page")
time.sleep(1)
print("Done downloading page")
return "<html>Hello</html>"
def read_db():
print("Starting to retrieve data from db")
time.sleep(0.5)
print("Connected to db")
time.sleep(1)
print("Done retrieving data from db")
return "db-data"
def run():
start = time.time()
get_page()
read_db()
print(f"Time elapsed: {time.time()-start:.3}s")
>>> run()
Starting to download page
Done downloading page
Starting to retrieve data from db
Connected to db
Done retrieving data from db
Time elapsed: 2.51s
As expected, the program takes ~2.5s to complete as, everything is done sequentially. The program first waits for the page to download, then for a connection to the db, then on fetching data from the db. So the total execution time is roughly the sum of all the blocking calls, because not much else is happening (except for some prints).
Now, wouldn’t it be nice if, while waiting for the page to download, we could simultaneously make progress on our database query? If only there was a way to hand execution control back to a central scheduler while making blocking I/O calls, such that the scheduler could run something else in the meantime … I guess you can see where this is going.
We’ll extend our simple scheduler with an additional list of sleeping tasks. A coroutine can request to be “put to sleep” for a specified amount of time by yielding the tuple ("sleep", <delay>)
. In that case the scheduler computes a deadline to awake this coroutine in the future, based on the current time and the given delay. The coroutine is then put into the sleeping
list instead of being scheduled for execution again right away. Python’s heapq
is used to push/pop items from the sleeping
list in order of nearest deadline.
The behavior of the scheduler loop is still to execute coroutines from the ready
queue if anything is available. But if nothing is ready to run right now, the scheduler will wait for the sleeping coroutine with the nearest deadline to become available and execute that one.
import heapq
def scheduler(coros):
start = time.time()
ready = deque(coros)
sleeping = []
while True:
if not ready and not sleeping:
break
# wait for nearest sleeper,
# if no coro can be executed immediately right now
if not ready:
deadline, coro = heapq.heappop(sleeping)
if deadline > time.time():
time.sleep(deadline - time.time())
ready.append(coro)
try:
coro = ready.popleft()
result = coro.send(None)
# the special case of a coro that wants to be put to sleep
if len(result) == 2 and result[0] == "sleep":
deadline = time.time() + result[1]
heapq.heappush(sleeping, (deadline, coro))
else:
print(f"Got: {result}")
ready.append(coro)
except StopIteration:
pass
print(f"Time elapsed: {time.time()-start:.3}s")
At this point we can rewrite get_page()
and read_db()
as coroutines and run them concurrently using our scheduler.
def get_page():
print("Starting to download page")
yield ("sleep", 1)
print("Done downloading page")
yield "<html>Hello</html>"
def read_db():
print("Starting to retrieve data from db")
yield ("sleep", 0.5)
print("Connected to db")
yield ("sleep", 1)
print("Done retrieving data from db")
yield "db-data"
>>> scheduler([get_page(), read_db()])
Starting to download page
Starting to retrieve data from db
Connected to db
Done downloading page
Got: <html>Hello</html>
Done retrieving data from db
Got: db-data
Time elapsed: 1.5s
Since the overwhelming majority of time in these 2 functions is spent waiting on I/O (mock I/O in our example, of course) we get a nice speed improvement. The total elapsed time is now roughly equal to the amount of time it takes to run the slowest coroutine.
This is roughly how the scheduling would be sequenced:
It gets even better once you have many more coroutines to schedule concurrently. Here is an example with 500 get_page()
and 500 read_db()
coroutines.
>>> scheduler([get_page() if i%2 == 0 else get_from_db() for i in range(1000)])
# ... a lot of output ...
Time elapsed: 1.6s
It’s basically still ~1.5s, the slight overhead here is mainly due to printing so many lines to stdout. The synchronous execution of this would have taken 2500s - a bit more than 40 minutes.
As mentioned before, you can use other approaches for this kind of concurrent behavior (e.g. threads), but it would be quite costly to start 1000 threads in python. Our little example program runs in a single thread, with very little overhead for the 1000 coroutines we’re executing. I hope this makes it clear why coroutines are so interesting, especially in programs with lots of I/O interaction. Like e.g. a network server.
This style of program execution is called “asynchronous” or “non-blocking”.
Nested coroutines
One requirement that comes up frequently when working with coroutines is to be able to nest them. You want to be able to run a coroutine from within a coroutine but have the yield
s bubble up to the top-level scheduler. This is useful to split the work into different coroutines, just like you split normal program logic into different functions. The yield
s need to “bubble up” so that you can still have a central scheduler in charge of executing your set of coroutines.
For example, we could imagine a worker that first downloads a page and then dumps it into a database. And we might want to do that with massive concurrency.
Unfortunately that isn’t easily done with plain yield
in python.
def get_page():
print("Starting to download page")
yield ("sleep", 1)
print("Done downloading page")
yield "<html>Hello</html>"
def write_db(data):
print("Starting to write data to db")
yield ("sleep", 0.5)
print("Connected to db")
yield ("sleep", 1)
print("Done writing data to db")
def worker():
page = get_page() # HOW TO DO THIS?
write_db(page) # AND THIS?
>>> scheduler([worker()]) # would raise an error
This does not work because worker()
is not even a proper coroutine. It is just a function that creates 2 coroutine objects but never interacts with them. So the scheduler can’t do anything with it. But we can’t schedule get_page()
and write_db()
concurrently in the scheduler either, because we first need to have downloaded the data before we can write it to the database. We definitely need some worker()
coroutine that ties both the other coroutines together, but how can we do it?
Looping over nested coroutines
One way to solve this is to loop over nested coroutines, and explicitly yield
once again for every step in the sub-coroutine. This basically flattens the entire nested structure into the topmost coroutine and all the nested yield
s bubble up correctly to the scheduler.
def worker():
for step in get_page():
yield step
page = step
for step in write_db(page):
yield step
>>> scheduler([worker()])
Starting to download page
Done downloading page
Got: <html>Hello</html>
Starting to write data to db
Connected to db
Done writing data to db
It works as expected but having to write this construct each time, for something that is essentially a function call, is quite cumbersome. It also only solves the “data generator” part of the problem (i.e. you can get all nested yield
ed data bubbled up to the scheduler). But you can’t actually .send()/.throw()/.close()
values from the scheduler to a nested coroutine.
Another convolution is that you couldn’t use return <expression>
statements in coroutines back then (as mentioned earlier). So there was no natural way of expressing a final value of a coroutine that has finished running. In the example we get around this by assuming that the final yield
ed value of the get_page()
coroutine is the downloaded page. Quite hacky.
Again, you could run this in massive concurrency, downloading thousands of pages and dumping them into a db in a fraction of the time it would take to run this sequentially.
yield from
A real solution for calling nested coroutines came in 2009 (python 3.3
) in the form of yield from
. This new expression is described in PEP 380 and works as follows:
def get_page():
print("Starting to download page")
yield ("sleep", 1)
print("Done downloading page")
return "<html>Hello</html>"
def write_db(data):
print("Starting to write data to db")
yield ("sleep", 0.5)
print("Connected to db")
yield ("sleep", 1)
print("Done writing data to db")
def worker():
page = yield from get_page()
yield from write_db(page)
>>> scheduler([worker(), worker(), worker()])
Starting to download page
Starting to download page
Starting to download page
Done downloading page
Starting to write data to db
Done downloading page
Starting to write data to db
Done downloading page
Starting to write data to db
Connected to db
Connected to db
Connected to db
Done writing data to db
Done writing data to db
Done writing data to db
Now this looks basically just like calling a function. It now also became possible to use return <expression>
in a coroutine, with the value of the yield from
expression taking on the returned value.
The sequencing for a single worker would look like the following:
The concurrent speed improvements for several workers would be gained at the “idling” points, where the scheduler would be free to execute coroutines before any of the sleeping ones reawaken.
async/await
The need to handle potentially blocking I/O calls in applications has increased massively over the last decades. With the rise of the internet and connectivity, waiting to receive or send data in a network is now a commonplace activity.
As we’ve seen, coroutines offer a straightforward way to implement asynchronous/non-blocking code execution. Once a coroutine e.g. needs to wait for some I/O data to become available, it simply yield
s control back to the scheduler and the scheduler can run some other coroutine in the meantime.
A basic way to implement a coroutine execution scheme has been possible in python since 2001 and the introduction of the yield
statement. But as we’ve also discussed earlier in this post, the syntax around coroutines in python was very muddied. The yield
statement started out as a way to implement lazy iterators: generators. Defining a generator looks just like defining a function, except for the presence of yield
keywords that you need to scan the function body for. Then step-by-step new features were added to generators that allowed for more full-fledged coroutines, moving away from the initial motivation of a lazy data producer.
Acknowledging all these subtle issues that had accumulated over the years, PEP 492 from 2015 (python 3.5
) set out to make coroutines a first-class language feature in python.
The core of the proposal is to introduce a new async
/await
syntax in python. This pattern was already established in other programming languages to define non-blocking functions and how they can call each other. See [here][https://en.wikipedia.org/wiki/Async/await] for some background and examples.
A coroutine can now be defined explicitly as:
async def coro():
pass
Coroutines can call each other using the await
syntax, which works exactly like yield from
did for generator-based coroutines.
async def coro():
result = await some_other_coro()
Now you can clearly see from the function header whether it is a regular function or a coroutine. It is also clear that this is not a generator function (lazy producer of data), those can still be defined using the old syntax.
Our example in async/await
Let’s rewrite our example to adhere to this new syntax.
async def get_page():
print("Starting to download page")
await sleep(1)
print("Done downloading page")
return "<html>Hello</html>"
async def write_db(data):
print("Starting to write data to db")
await sleep(0.5)
print("Connected to db")
await sleep(1)
print("Done writing data to db")
async def worker():
page = await get_page()
await write_db(page)
All previous def
s are now async def
s. All previous yield from
s are now await
s.
An interesting point is how the yield
expressions have changed. Previously we yielded
tuples like ("sleep", <delay>)
that signaled to the scheduler that this coroutine wanted to be put to sleep for a certain amount of time. However, in async
coroutines we can’t use yield
expressions anymore, we can only await
on Awaitable
s. An Awaitable
is any object that implements the magic __await__
method. An instance of an async
coroutine would be an example for an Awaitable
.
So if the only thing an async
coroutine can do is await on some other Awaitable
, how can we ever yield
back control to the scheduler? A solution to this riddle is the following implementation of the sleep()
function:
class Sleep:
def __init__(self, delay):
self.delay = delay
def __await__(self):
yield ("sleep", self.delay)
def sleep(delay):
return Sleep(delay)
The Sleep
class is an Awaitable
(it implements the __await__
method). In that magic method it uses our sleep signal to the scheduler.
So under the hood, deep down, the mechanics of async
/await
coroutines are still based on yield
. Because of that our simple scheduler actually still works without any modification.
>>> scheduler([worker(), worker(), worker()])
Starting to download page
Starting to download page
Done downloading page
Starting to write data to db
Done downloading page
Starting to write data to db
Done downloading page
Starting to write data to db
Connected to db
Connected to db
Connected to db
Done writing data to db
Done writing data to db
Done writing data to db
The only difference is that now, using async
/await
syntax, the purpose of the program is much clearer. You see an async def
and you know right away that it’s a coroutine that can be suspended/resumed. You see await
and you know right away that work is being delegated to some other Awaitable
.
asyncio
You may have noticed that I have not mentioned asyncio
even once in this post (well, now it happened). Many introductions to async
/await
coroutines in python are tightly coupled with an introduction to asyncio
. It’s almost as if one concept can’t exist without the other.
But that’s not true, as we’ve seen. We were able to make perfectly good use of async
/await
coroutines with our own scheduler implementation. asyncio
with it’s event loop and utilities is just one way to drive a set of coroutines. Admittedly, being part of the standard library puts it in a position to probably be the only way that 90% of python users will ever use. Still, I think it’s very important to understand the mechanics of async
coroutines and that the whole async
/await
syntax should be seen more like an API that you can drive however you want.
We can easily port our little example to run in asyncio
, the only thing we need to replace is the sleep()
function.
import asyncio
from asyncio import sleep
async def get_page():
print("Starting to download page")
await sleep(1)
print("Done downloading page")
return "<html>Hello</html>"
async def write_db(data):
print("Starting to write data to db")
await sleep(0.5)
print("Connected to db")
await sleep(1)
print("Done writing data to db")
async def worker():
page = await get_page()
await write_db(page)
>>> asyncio.run(asyncio.gather(worker(), worker(), worker()))
Starting to download page
Starting to download page
Starting to download page
Done downloading page
Starting to write data to db
Done downloading page
Starting to write data to db
Done downloading page
Starting to write data to db
Connected to db
Connected to db
Connected to db
Done writing data to db
Done writing data to db
Done writing data to db
Notes
The main takeaway I’d like you to have from this post is the fact that coroutines in a basic form were possible in python since version 2.2
in 2001. All this new craze about async
/await
in python is just yield
when you dig deep enough down.
In the coming post we’ll build our own event loop from scratch. So far we’ve only seen a basic scheduler that can either execute a coroutine or put it to sleep, that’s not so interesting. The improved version will be able to also handle network I/O without blocking and we’ll use that to build a network server with high concurrency in a single thread. Hopefully that will give you some more insight into how libraries like asyncio
work under the hood.