跳转至

11. 在Group中管理多个协程

Manage Multiple Coroutines in a Group

我们可以使用 **asyncio.TaskGroup** 来一起管理多个协程。

如果组中的某个任务因异常失败,组中的所有任务都会自动被取消。

We can manage multiple coroutines together in a group using the asyncio.TaskGroup.

If a task in the group fails with an exception, all tasks in the group are cancelled automatically.

11.1 asyncio.TaskGroup简介

What is an asyncio.TaskGroup

Python 3.11 引入了用于管理关联 asyncio 任务组的 asyncio.TaskGroup

添加了 TaskGroup 类,这是一种异步上下文管理器,持有一个任务组,并在退出时等待所有任务完成。对于新代码,推荐使用 TaskGroup 而不是直接使用 create_task()gather()

asyncio.TaskGroup 类旨在替代用于创建任务的 asyncio.create_task() 函数和用于等待任务组的 asyncio.gather() 函数。

历史上,我们使用 asyncio.create_task() 函数创建并启动协程作为 asyncio.Task

例如:

...
# 创建并启动协程作为任务
task = asyncio.create_task(coro())

这会创建一个新的 asyncio.Task 对象,并将其发送到 asyncio 事件循环中尽快执行。

然后,我们可以选择等待该任务完成。

例如:

...
# 等待任务完成
result = await task

你可以在教程中了解更多关于将协程作为 asyncio.Task 对象执行的信息:

正如我们所见,asyncio.gather() 函数用于同时创建和启动多个协程作为 asyncio.Task 对象发送到事件循环中,允许调用者将它们作为一个组来处理。

最常见的用法是等待所有启动的任务完成。

例如:

...
# 启动协程作为任务并等待它们完成
results = await asyncio.gather(coro1(), coro2(), coro3())

asyncio.TaskGroup 可以执行这些操作,并且是首选的方法。

一个持有任务组的异步上下文管理器。可以使用 create_task() 将任务添加到组中。当上下文管理器退出时,所有任务都会被等待完成。

Python 3.11 introduced the asyncio.TaskGroup task for managing a group of associated asyncio task.

Added the TaskGroup class, an asynchronous context manager holding a group of tasks that will wait for all of them upon exit. For new code this is recommended over using create_task() and gather() directly.

The asyncio.TaskGroup class is intended as a replacement for the asyncio.create_task() function for creating tasks and the asyncio.gather() function for waiting on a group of tasks.

Historically, we create and issue a coroutine as an asyncio.Task using the asyncio.create_task() function.

For example:

...
# create and issue coroutine as task
task = asyncio.create_task(coro())

This creates a new asyncio.Task object and issues it to the asyncio event loop for execution as soon as it is able.

We can then choose to await the task and wait for it to be completed.

For example:

...
# wait for task to complete
result = await task

You can learn more about executing coroutines as asyncio.Task objects in the tutorial:

As we have seen, the asyncio.gather() function is used to create and issue many coroutines simultaneously as asyncio.Task objects to the event loop, allowing the caller to treat them all as a group.

The most common usage is to wait for all issued tasks to complete.

For example:

...
# issue coroutines as tasks and wait for them to complete
results = await asyncio.gather(coro1(), coro2(), coro2)

The asyncio.TaskGroup can perform both of these activities and is the preferred approach.

An asynchronous context manager holding a group of tasks. Tasks can be added to the group using create_task(). All tasks are awaited when the context manager exits.

11.2 如何创建一个asyncio.TaskGroup

How to Create an asyncio.TaskGroup

asyncio.TaskGroup 对象实现了异步上下文管理器接口,这是该类的首选使用方式。

这意味着该类的实例是通过 “async with” 表达式创建和使用的。

例如:

...
# 创建一个 TaskGroup
async with asyncio.TaskGroup() as group:
    # ...

如果你对 “async with” 表达式不熟悉,可以参考以下教程:

回想一下,异步上下文管理器实现了可以等待的 __aenter__()__aexit__() 方法。

对于 asyncio.TaskGroup,当上下文管理器块退出时,__aexit__() 方法会自动被调用,并会等待所有由 asyncio.TaskGroup 创建的任务完成。

这意味着无论是正常退出 TaskGroup 对象的块,还是通过异常退出,都会自动等待直到所有组内任务完成。

...
# 创建一个 TaskGroup
async with asyncio.TaskGroup() as group:
    # ...
# 等待所有组内任务完成

你可以在以下教程中了解更多关于异步上下文管理器的信息:

An asyncio.TaskGroup object implements the asynchronous context manager interface, and this is the preferred usage of the class.

This means that an instance of the class is created and is used via the “async with” expression.

For example:

...
# create a taskgroup
async with asyncio.TaskGroup() as group:
    # ...

If you are new to the “async with” expression, see the tutorial:

Recall that an asynchronous context manager implements the __aenter__() and __aexit__() methods which can be awaited.

In the case of the asyncio.TaskGroup, the __aexit__() method which is called automatically when the context manager block is exited will await all tasks created by the asyncio.TaskGroup.

This means that exiting the TaskGroup object’s block normally or via an exception will automatically await until all group tasks are done.

...
# create a taskgroup
async with asyncio.TaskGroup() as group:
    # ...
# wait for all group tasks are done

You can learn more about asynchronous context managers in the tutorial:

11.3 如何使用 asyncio.TaskGroup 创建任务

How to Create Tasks Using asyncio.TaskGroup

我们可以通过 asyncio.TaskGroup 对象的 create_task() 方法在任务组中创建一个任务。

例如:

...
# 创建一个 TaskGroup
async with asyncio.TaskGroup() as group:
    # 创建并启动一个任务
    task = group.create_task(coro())

这将创建一个 asyncio.Task 对象,并将其发送到 asyncio 事件循环中执行,类似于 asyncio.create_task() 函数,不同之处在于这个任务与任务组相关联。

如果我们选择直接等待该任务,可以获取结果。

例如:

...
# 创建一个 TaskGroup
async with asyncio.TaskGroup() as group:
    # 创建并启动一个任务
    result = await group.create_task(coro())

使用 asyncio.TaskGroup 的好处是我们可以在组中启动多个任务,并在任务之间执行代码,例如检查结果或收集更多数据。

We can create a task in the task group via the create_task() method on the asyncio.TaskGroup object.

For example:

...
# create a taskgroup
async with asyncio.TaskGroup() as group:
    # create and issue a task
    task = group.create_task(coro())

This will create an asyncio.Task object and issue it to the asyncio event loop for execution, just like the asyncio.create_task() function, except that the task is associated with the group.

We can await the task directly if we choose and get results.

For example:

...
# create a taskgroup
async with asyncio.TaskGroup() as group:
    # create and issue a task
    result = await group.create_task(coro())

The benefit of using the asyncio.TaskGroup is that we can issue multiple tasks in the group and execute code in between. such as checking results or gathering more data.

11.4 如何等待使用 asyncio.TaskGroup 的任务

How to Wait on Tasks Using asyncio.TaskGroup

我们可以通过退出异步上下文管理器块来等待组中的所有任务。

因此,这些任务会自动被等待,不需要额外的操作。

例如:

...
# 创建一个 TaskGroup
async with asyncio.TaskGroup() as group:
    # ...
# 等待所有组内任务完成

如果不希望这种行为,则必须在退出上下文管理器之前,确保所有任务已经“完成”(结束、取消或失败)。

We can wait on all tasks in the group by exiting the asynchronous context manager block.

As such, the tasks are awaited automatically and nothing additional is required.

For example:

...
# create a taskgroup
async with asyncio.TaskGroup() as group:
    # ...
# wait for all group tasks are done

If this behavior is not preferred, then we must ensure all tasks are “done” (finished, canceled, or failed) before exiting the context manager.

11.5 在一个任务失败时如何取消所有使用 asyncio.TaskGroup 的任务

How to Cancel All Tasks If One Task Fails Using asyncio.TaskGroup

如果组中的一个任务因异常失败,那么组内所有未完成的任务将被取消。

这一过程是自动执行的,不需要额外的代码。

例如:

# 处理组中任何任务的失败
try:
    ...
    # 创建一个 TaskGroup
    async with asyncio.TaskGroup() as group:
        # 创建并启动任务
        task1 = group.create_task(coro1())
        # 创建并启动任务
        task2 = group.create_task(coro2())
        # 创建并启动任务
        task3 = group.create_task(coro3())
    # 等待所有组内任务完成
except:
    # 所有未完成的任务都会被取消
    pass

如果不希望这种行为,那么每个任务的失败必须在任务内部自行管理,例如在协程内部使用 try-except 块。

现在我们已经了解了如何使用 asyncio.TaskGroup,接下来让我们看看一些实际的示例。

If one task in the group fails with an exception, then all non-done tasks remaining in the group will be canceled.

This is performed automatically and does not require any additional code.

For example:

# handle the failure of any tasks in the group
try:
    ...
    # create a taskgroup
    async with asyncio.TaskGroup() as group:
        # create and issue a task
        task1 = group.create_task(coro1())
        # create and issue a task
        task2 = group.create_task(coro2())
        # create and issue a task
        task3 = group.create_task(coro3())
    # wait for all group tasks are done
except:
    # all non-done tasks are cancelled
    pass

If this behavior is not preferred, then the failure of each task must be managed within the tasks themselves, e.g. by a try-except block within the coroutine.

Now that we know how to use the asyncio.TaskGroup, let’s look at some worked examples.

11.6 使用 TaskGroup 等待多个任务的示例

Example of Waiting on Multiple Tasks with a TaskGroup

我们可以通过在 asyncio.TaskGroup 中创建多个任务,然后等待所有任务完成的方式来进行探索。

这可以通过首先定义一组表示我们希望完成的任务的不同协程来实现。

在这个示例中,我们将定义 3 个协程,每个协程报告一条不同的信息,然后休眠一秒钟。

# 协程任务
async def task1():
    # 输出信息
    print('来自协程 1 的问候')
    # 休眠以模拟等待
    await asyncio.sleep(1)

# 协程任务
async def task2():
    # 输出信息
    print('来自协程 2 的问候')
    # 休眠以模拟等待
    await asyncio.sleep(1)

# 协程任务
async def task3():
    # 输出信息
    print('来自协程 3 的问候')
    # 休眠以模拟等待
    await asyncio.sleep(1)

接下来,我们可以定义一个 main() 协程,通过上下文管理器接口创建 asyncio.TaskGroup

# asyncio 入口点
async def main():
    # 创建任务组
    async with asyncio.TaskGroup() as group:
    # ...

然后,我们可以将每个协程作为任务创建并发送到事件循环中,尽管它们作为组的一部分被一起收集。

...
# 运行第一个任务
group.create_task(task1())
# 运行第二个任务
group.create_task(task2())
# 运行第三个任务
group.create_task(task3())

注意,我们不需要保留 asyncio.Task 对象的引用,因为 asyncio.TaskGroup 会为我们跟踪它们。

此外,我们也不需要等待这些任务,因为当我们退出 asyncio.TaskGroup 的上下文管理器块时,我们将等待组中的所有任务。

结合起来,完整的示例如下所示。

# asyncio 任务组示例
import asyncio

# 协程任务
async def task1():
    # 输出信息
    print('来自协程 1 的问候')
    # 休眠以模拟等待
    await asyncio.sleep(1)

# 协程任务
async def task2():
    # 输出信息
    print('来自协程 2 的问候')
    # 休眠以模拟等待
    await asyncio.sleep(1)

# 协程任务
async def task3():
    # 输出信息
    print('来自协程 3 的问候')
    # 休眠以模拟等待
    await asyncio.sleep(1)

# asyncio 入口点
async def main():
    # 创建任务组
    async with asyncio.TaskGroup() as group:
        # 运行第一个任务
        group.create_task(task1())
        # 运行第二个任务
        group.create_task(task2())
        # 运行第三个任务
        group.create_task(task3())
    # 等待所有任务完成...
    print('完成')

# 入口点
asyncio.run(main())

运行该示例首先执行 main() 协程,为我们启动一个新的事件循环。

main() 协程运行并创建一个 asyncio.TaskGroup

然后,所有三个协程都作为 asyncio.Task 对象创建,并通过 asyncio.TaskGroup 发送到事件循环中。

asyncio.TaskGroup 的上下文管理器块退出,自动等待所有三个任务。

这些任务会输出信息并进入休眠状态。

一旦所有任务完成,main() 协程会输出一条最终信息。

来自协程 1 的问候
来自协程 2 的问候
来自协程 3 的问候
完成

接下来,让我们探索如何使用带有参数和返回值的任务的 asyncio.TaskGroup

你可以在教程中了解更多关于如何使用 TaskGroup 的信息:

接下来,我们将探索如何在有限时间内等待单个协程。

We can explore the case of creating multiple tasks within an asyncio.TaskGroup and then waiting for all tasks to complete.

This can be achieved by first defining a suite of different coroutines that represent the tasks we want to complete.

In this case, we will define 3 coroutines that each report a different message and then sleep for one second.

# coroutine task
async def task1():
    # report a message
    print('Hello from coroutine 1')
    # sleep to simulate waiting
    await asyncio.sleep(1)

# coroutine task
async def task2():
    # report a message
    print('Hello from coroutine 2')
    # sleep to simulate waiting
    await asyncio.sleep(1)

# coroutine task
async def task3():
    # report a message
    print('Hello from coroutine 3')
    # sleep to simulate waiting
    await asyncio.sleep(1)

Next, we can define a main() coroutine that creates the asyncio.TaskGroup via the context manager interface.

# asyncio entry point
async def main():
    # create task group
    async with asyncio.TaskGroup() as group:
    # ...

We can then create and issue each coroutine as a task into the event loop, although collected together as part of the group.

...
# run first task
group.create_task(task1())
# run second task
group.create_task(task2())
# run third task
group.create_task(task3())

Notice that we don’t need to keep a reference to the asyncio.Task objects as the asyncio.TaskGroup will keep track of them for us.

Also, notice that we don’t need to await the tasks because when we exit the context manager block for the asyncio.TaskGroup we will await all tasks in the group.

Tying this together, the complete example is listed below.

# example of asyncio task group
import asyncio

# coroutine task
async def task1():
    # report a message
    print('Hello from coroutine 1')
    # sleep to simulate waiting
    await asyncio.sleep(1)

# coroutine task
async def task2():
    # report a message
    print('Hello from coroutine 2')
    # sleep to simulate waiting
    await asyncio.sleep(1)

# coroutine task
async def task3():
    # report a message
    print('Hello from coroutine 3')
    # sleep to simulate waiting
    await asyncio.sleep(1)

# asyncio entry point
async def main():
    # create task group
    async with asyncio.TaskGroup() as group:
        # run first task
        group.create_task(task1())
        # run second task
        group.create_task(task2())
        # run third task
        group.create_task(task3())
    # wait for all tasks to complete...
    print('Done')

# entry point
asyncio.run(main())

Running the example first executes the main() coroutine, starting a new event loop for us.

The main() coroutine runs and creates an asyncio.TaskGroup.

All three coroutines are then created as asyncio.Task objects and issued to the event loop via the asyncio.TaskGroup.

The context manager block for the asyncio.TaskGroup is exited which automatically awaits all three tasks.

The tasks report their message and sleep.

Once all tasks are completed the main() coroutine reports a final message.

Hello from coroutine 1
Hello from coroutine 2
Hello from coroutine 3
Done

Next, let’s explore how we might use an asyncio.TaskGroup with tasks that take arguments and return values.

You can learn more about how to use the TaskGroup in the tutorial:

Next, we will explore how to wait for a single coroutine with a time limit.


最后更新: 2024年9月4日
创建日期: 2024年9月4日