nullprogram.com/blog/2020/07/30/
This article was discussed on Hacker News.
A common situation in asyncio Python programs is asynchronous
initialization. Some resource must be initialized exactly once before it
can be used, but the initialization itself is asynchronous — such as an
asyncpg database. Let’s talk about a couple of solutions.
The naive “solution” would be to track the initialization state in a
variable:
initialized = False
async def one_time_setup():
"Do not call more than once!"
...
async def maybe_initialize():
global initialized
if not initialized:
await one_time_setup()
initialized = True
The reasoning for initialized
is the expectation of calling the
function more than once. However, if it might be called from concurrent
tasks there’s a race condition. If the second caller arrives while the
first is awaiting one_time_setup()
, the function will be called a
second time.
Switching the order of the call and the assignment won’t help:
async def maybe_initialize():
global initialized
if not initialized:
initialized = True
await one_time_setup()
Since asyncio is cooperative, the first caller doesn’t give up control
until to other tasks until the await
, meaning one_time_setup()
will
never be called twice. However, the second caller may return before
one_time_setup()
has completed. What we want is for one_time_setup()
to be called exactly once, but for no caller to return until it has
returned.
Mutual exclusion
My first thought was to use a mutex lock. This will protect the
variable and prevent followup callers from progressing too soon. Tasks
arriving while one_time_setup()
is still running will block on the
lock.
initialized = False
initialized_lock = asyncio.Lock()
async def maybe_initialize():
global initialized
async with initialized_lock:
if not initialized:
await one_time_setup()
initialized = True
Unfortunately this has a serious downside: asyncio locks are
associated with the loop where they were created. Since the
lock variable is global, maybe_initialize()
can only be called from
the same loop that loaded the module. asyncio.run()
creates a new loop
so it’s incompatible.
# create a loop: always an error
asyncio.run(maybe_initialize())
# reuse the loop: maybe an error
loop = asyncio.get_event_loop()
loop.run_until_complete((maybe_initialize()))
(IMHO, it was a mistake for the asyncio API to include explicit loop
objects. It’s a low-level concept that unavoidably leaks through most
high-level abstractions.)
A workaround is to create the lock lazily. Thank goodness creating a
lock isn’t itself asynchronous!
initialized = False
initialized_lock = None
async def maybe_initialize():
global initialized, initialized_lock
if not initialized_lock:
initialized_lock = asyncio.Lock()
async with initialized_lock:
if not initialized:
await one_time_setup()
initialized = True
This is better, but maybe_initialize()
can still only ever be called
from a single loop.
asyncio.run(maybe_initialize()) # ok
asyncio.run(maybe_initialize()) # error!
Once
The pthreads API provides pthread_once
to solve this problem.
C++11 has similarly has std::call_once
. We can build something
similar using a future-like object.
future = None
async def maybe_initialize():
global future
if not future:
future = asyncio.create_task(one_time_setup())
await future
Awaiting a coroutine more than once is an error, but tasks are
future-like objects and can be awaited more than once. At least on
CPython, they can also be awaited in other loops! So not only is this
simpler, it also solves the loop problem!
asyncio.run(maybe_initialize()) # ok
asyncio.run(maybe_initialize()) # still ok
This can be tidied up nicely in a @once
decorator:
def once(func):
future = None
async def once_wrapper(*args, **kwargs):
nonlocal future
if not future:
future = asyncio.create_task(func(*args, **kwargs))
return await future
return once_wrapper
No more need for maybe_initialize()
, just decorate the original
one_time_setup()
:
@once
async def one_time_setup():
...