跳转至

10. 同时运行多个协程

10. Run Many Coroutines Concurrently

asyncio 的一个好处是我们可以同时运行许多协程。

这些协程可以在一个组中创建并存储,然后同时一起执行。

这可以使用 asyncio.gather() 函数来实现。

让我们仔细看看。

A benefit of asyncio is that we can run many coroutines concurrently.

These coroutines can be created in a group and stored, then executed all together at the same time.

This can be achieved using the asyncio.gather() function.

Let’s take a closer look.

10.1 什么是 Asyncio Gather()

10.1 What is Asyncio gather()

asyncio.gather() 模块函数允许调用者将多个可等待项组合在一起。

一旦分组,可等待项就可以并发执行、等待和取消。

同时运行 aws 序列中的可等待对象。

COROUTINES AND TASKS

对于分组和执行多个协程或多个任务来说,它是一个有用的实用函数。

例如:

...
# 运行可等待对象集合
results = await asyncio.gather(coro1(), asyncio.create_task(coro2()))

在我们可能预先创建许多任务或协程,然后希望一次执行所有任务或协程并等待它们全部完成然后再继续的情况下,我们可以使用 asyncio.gather() 函数。

这是一种可能的情况,其中需要许多类似任务的结果,例如 具有不同数据的相同任务或协程。

可等待对象可以并发执行,返回结果,并且主程序可以通过使用它所依赖的结果来恢复。

gather() 函数比简单地等待任务完成更强大。

它允许将一组可等待对象视为单个可等待对象。

这允许:

  • 通过 await 表达式执行并等待组中的所有可等待任务完成。
  • 从所有分组的可等待项中获取结果,以便稍后通过 result() 方法检索。
  • 通过 cancel() 方法取消的可等待组。
  • 检查组中的所有可等待项是否已通过 done() 方法完成。
  • 仅当组中的所有任务完成时才执行回调函数。

和更多。

The asyncio.gather() module function allows the caller to group multiple awaitables together.

Once grouped, the awaitables can be executed concurrently, awaited, and canceled.

Run awaitable objects in the aws sequence concurrently.

COROUTINES AND TASKS

It is a helpful utility function for both grouping and executing multiple coroutines or multiple tasks.

For example:

...
# run a collection of awaitables
results = await asyncio.gather(coro1(), asyncio.create_task(coro2()))

We may use the asyncio.gather() function in situations where we may create many tasks or coroutines up-front and then wish to execute them all at once and wait for them all to complete before continuing on.

This is a likely situation where the result is required from many like-tasks, e.g. same task or coroutine with different data.

The awaitables can be executed concurrently, results returned, and the main program can resume by making use of the results on which it is dependent.

The gather() function is more powerful than simply waiting for tasks to complete.

It allows a group of awaitables to be treated as a single awaitable.

This allows:

  • Executing and waiting for all awaitables in the group to be done via an await expression.
  • Getting results from all grouped awaitables to be retrieved later via the result() method.
  • The group of awaitables to be canceled via the cancel() method.
  • Checking if all awaitables in the group are done via the done() method.
  • Executing callback functions only when all tasks in the group are done.

And more.

10.2 如何使用 Asyncio Gather()

10.2 How to use Asyncio gather()

在本节中,我们将仔细研究如何使用 asyncio.gather() 函数。

asyncio.gather() 函数采用一个或多个可等待项作为参数。

回想一下,可等待的可能是协程、FutureTask

因此,我们可以调用 gather() 函数:

  • 多项任务
  • 多个协程
  • 任务和协程的混合

例如:

...
# 执行多个协程
asyncio.gather(coro1(), coro2())

如果将 Task 对象提供给 gather(),它们将已经在运行,因为 Task 是作为创建的一部分进行调度的。

asyncio.gather() 函数将可等待对象作为位置参数。

我们无法创建可等待项的列表或集合并将其提供给收集,因为这会导致错误。

例如:

...
# 无法直接提供可等待列表
asyncio.gather([coro1(), coro2()])

如果首先使用星号运算符 (*) 将其解包到单独的表达式中,则可以提供可等待列表。

例如:

...
# 收集一份已解压的等待列表
asyncio.gather(*[coro1(), coro2()])

如果向 gather() 提供协程,它们会自动包装在 Task 对象中。

gather() 函数不会阻塞。

相反,它返回一个代表可等待组的 asyncio.Future 对象。

例如:

...
# 得到一个代表多个可等待对象的Future对象
group = asyncio.gather(coro1(), coro2())

一旦创建了 Future 对象,它就会在事件循环中自动调度。

可等待代表组,组中的所有可等待将尽快执行。

这意味着如果调用者没有执行任何其他操作,则预定的等待组将运行(假设调用者挂起)。

这也意味着您不必等待从 gather() 返回的 Future

例如:

...
# 得到一个代表多个可等待对象的Future对象
group = asyncio.gather(coro1(), coro2())
# 挂起并等待一段时间,该组可能正在执行..
await asyncio.sleep(10)

可以等待返回的 Future 对象,它将等待组中的所有可等待任务完成。

例如:

...
# 运行可等待对象组
await group

等待从 gather() 返回的 Future 将返回可等待项的返回值列表。

如果可等待项不返回值,则此列表将包含默认的 “None” 返回值。

例如:

...
# 运行可等待组并获取返回值
results = await group

这通常在一行中执行。

例如:

...
# 在一行中运行任务并获取结果
results = await asyncio.gather(coro1(), coro2())

In this section, we will take a closer look at how we might use the asyncio.gather() function.

The asyncio.gather() function takes one or more awaitables as arguments.

Recall an awaitable may be a coroutine, a Future or a Task.

Therefore, we can call the gather() function with:

  • Multiple tasks
  • Multiple coroutines
  • Mixture of tasks and coroutines

For example:

...
# execute multiple coroutines
asyncio.gather(coro1(), coro2())

If Task objects are provided to gather(), they will already be running because Tasks are scheduled as part of being created.

The asyncio.gather() function takes awaitables as position arguments.

We cannot create a list or collection of awaitables and provide it to gather, as this will result in an error.

For example:

...
# cannot provide a list of awaitables directly
asyncio.gather([coro1(), coro2()])

A list of awaitables can be provided if it is first unpacked into separate expressions using the star operator (*).

For example:

...
# gather with an unpacked list of awaitables
asyncio.gather(*[coro1(), coro2()])

If coroutines are provided to gather(), they are wrapped in Task objects automatically.

The gather() function does not block.

Instead, it returns an asyncio.Future object that represents the group of awaitables.

For example:

...
# get a future that represents multiple awaitables
group = asyncio.gather(coro1(), coro2())

Once the Future object is created it is scheduled automatically within the event loop.

The awaitable represents the group, and all awaitables in the group will execute as soon as they are able.

This means that if the caller did nothing else, the scheduled group of awaitables will run (assuming the caller suspends).

It also means that you do not have to await the Future that is returned from gather().

For example:

...
# get a future that represents multiple awaitables
group = asyncio.gather(coro1(), coro2())
# suspend and wait a while, the group may be executing..
await asyncio.sleep(10)

The returned Future object can be awaited which will wait for all awaitables in the group to be done.

For example:

...
# run the group of awaitables
await group

Awaiting the Future returned from gather() will return a list of return values from the awaitables.

If the awaitables do not return a value, then this list will contain the default “None” return value.

For example:

...
# run the group of awaitables and get return values
results = await group

This is more commonly performed in one line.

For example:

...
# run tasks and get results on one line
results = await asyncio.gather(coro1(), coro2())

10.3 对于列表中的多个协程的 Gather() 示例

10.3 Example of gather() For Many Coroutines in a List

预先创建多个协程然后稍后收集它们是很常见的。

这允许程序准备要并发执行的任务,然后立即触发它们的并发执行并等待它们完成。

我们可以手动或使用列表理解将许多协程收集到一个列表中。

例如:

...
# 创建许多协程
coros = [task_coro(i) for i in range(10)]

然后我们可以对列表中的所有协程调用gather()

协程列表不能直接提供给 gather() 函数,因为这会导致错误。

相反, gather() 函数要求将每个可等待项作为单独的位置参数提供。

这可以通过将列表展开为单独的表达式并将它们传递给 gather() 函数来实现。 星号运算符(*)将为我们执行此操作。

例如:

...
# 运行任务
await asyncio.gather(*coros)

将它们结合在一起,下面列出了使用 Gather() 运行预先准备的协程列表的完整示例。

# SuperFastPython.com
# 收集列表中许多协程的示例
import asyncio

# 用于任务的协程
async def task_coro(value):
    # 报告消息
    print(f'>task {value} executing')
    # 睡一会儿
    await asyncio.sleep(1)

# coroutine used for the entry point
async def main():
    # 报告消息
    print('main starting')
    # 创建许多协程
    coros = [task_coro(i) for i in range(10)]
    # 运行任务
    await asyncio.gather(*coros)
    # 报告消息
    print('main done')

# 启动异步程序
asyncio.run(main())

运行该示例将执行 main() 协程作为程序的入口点。

然后,main() 协程使用列表理解创建一个包含 10 个协程对象的列表。

然后将该列表提供给 gather() 函数,并使用星号运算符将其解包为 10 个单独的表达式。

然后,main() 协程等待从调用 Gather() 返回的 Future 对象,挂起并等待所有已调度的协程完成其执行。

协程会尽快运行,报告其独特的消息并在终止前休眠。

仅当组中的所有协程完成后, main() 协程才会恢复并报告其最终消息。

这强调了我们如何准备协程集合并将它们作为单独的表达式提供给 gather() 函数。

main starting
>task 0 executing
>task 1 executing
>task 2 executing
>task 3 executing
>task 4 executing
>task 5 executing
>task 6 executing
>task 7 executing
>task 8 executing
>task 9 executing
main done

您可以在教程中了解有关如何使用 Gather() 函数的更多信息:

接下来,我们将探讨如何等待一组异步任务。

It is common to create multiple coroutines beforehand and then gather them later.

This allows a program to prepare the tasks that are to be executed concurrently and then trigger their concurrent execution all at once and wait for them to complete.

We can collect many coroutines together into a list either manually or using a list comprehension.

For example:

...
# create many coroutines
coros = [task_coro(i) for i in range(10)]

We can then call gather() with all coroutines in the list.

The list of coroutines cannot be provided directly to the gather() function as this will result in an error.

Instead, the gather() function requires each awaitable to be provided as a separate positional argument.

This can be achieved by unwrapping the list into separate expressions and passing them to the gather() function. The star operator (*) will perform this operation for us.

For example:

...
# run the tasks
await asyncio.gather(*coros)
Tying this together, the complete example of running a list of pre-prepared coroutines with gather() is listed below.

# SuperFastPython.com
# example of gather for many coroutines in a list
import asyncio

# coroutine used for a task
async def task_coro(value):
    # report a message
    print(f'>task {value} executing')
    # sleep for a moment
    await asyncio.sleep(1)

# coroutine used for the entry point
async def main():
    # report a message
    print('main starting')
    # create many coroutines
    coros = [task_coro(i) for i in range(10)]
    # run the tasks
    await asyncio.gather(*coros)
    # report a message
    print('main done')

# start the asyncio program
asyncio.run(main())

Running the example executes the main() coroutine as the entry point to the program.

The main() coroutine then creates a list of 10 coroutine objects using a list comprehension.

This list is then provided to the gather() function and unpacked into 10 separate expressions using the star operator.

The main() coroutine then awaits the Future object returned from the call to gather(), suspending and waiting for all scheduled coroutines to complete their execution.

The coroutines run as soon as they are able, reporting their unique messages and sleeping before terminating.

Only after all coroutines in the group are complete does the main() coroutine resume and report its final message.

This highlights how we might prepare a collection of coroutines and provide them as separate expressions to the gather() function.

main starting
>task 0 executing
>task 1 executing
>task 2 executing
>task 3 executing
>task 4 executing
>task 5 executing
>task 6 executing
>task 7 executing
>task 8 executing
>task 9 executing
main done

You can learn more about how to use the gather() function in the tutorial:

Next, we will explore how to wait on a group of asyncio tasks.


最后更新: 2024年9月4日
创建日期: 2024年9月4日