创建和管理任务

Creating and managing tasks

任务 是一个执行单元,使您能够同时处理多个需要等待的操作。尽管可以创建任意数量的任务,但异步事件循环在任一时间只能运行其中的一个。当任务遇到一个需要任务暂停等待的 await 语句时,事件循环便可以自由地处理其他任务。当第一个任务等待的事件完成后,事件循环将在获得机会时恢复该任务的执行。

AnyIO 中的任务处理大致遵循 Trio 模型。可以使用 任务组 来创建(生成)任务。任务组是一个异步上下文管理器,确保在退出上下文块后,其所有子任务都以某种方式完成。如果子任务或上下文块中的代码引发了异常,则所有子任务都会被取消。否则,上下文管理器会等待所有子任务退出后再继续执行。

示例代码如下:

from anyio import sleep, create_task_group, run


async def sometask(num: int) -> None:
    print('Task', num, 'running')
    await sleep(1)
    print('Task', num, 'finished')


async def main() -> None:
    async with create_task_group() as tg:
        for num in range(5):
            tg.start_soon(sometask, num)

    print('All tasks finished!')

run(main)

启动和初始化任务

Starting and initializing tasks

有时能够等待任务成功初始化自身是非常有用的。例如,在启动网络服务时,您可以让任务启动监听器,然后向调用方发出初始化完成的信号。这样,调用方可以启动依赖于该服务已启动和运行的其他任务。此外,如果套接字绑定失败或在初始化期间出现其他问题,异常会传播给调用方,从而可以捕获并处理异常。

这可以通过 TaskGroup.start() 来实现:

from anyio import (
    TASK_STATUS_IGNORED,
    create_task_group,
    connect_tcp,
    create_tcp_listener,
    run,
)
from anyio.abc import TaskStatus


async def handler(stream):
    ...


async def start_some_service(
    port: int, *, task_status: TaskStatus[None] = TASK_STATUS_IGNORED
):
    async with await create_tcp_listener(
        local_host="127.0.0.1", local_port=port
    ) as listener:
        task_status.started()
        await listener.serve(handler)


async def main():
    async with create_task_group() as tg:
        await tg.start(start_some_service, 5000)
        async with await connect_tcp("127.0.0.1", 5000) as stream:
            ...


run(main)

目标协程函数 必须 调用 task_status.started(),因为调用 TaskGroup.start() 的任务将会阻塞,直到此方法被调用。如果生成的任务未调用它,那么 TaskGroup.start() 调用将引发 RuntimeError

注意

start_soon() 不同,start() 需要使用 await

处理任务组中的多个错误

Handling multiple errors in a task group

在任务组中,多个任务可能会引发异常。当任务响应取消操作时,可能进入异常处理块或 finally: 块,并在此期间引发异常。这就引出了一个问题:哪个异常会从任务组上下文管理器中传播出来?答案是“两个”。实际上,这意味着会引发一个特殊的异常 ExceptionGroup (或 BaseExceptionGroup ),其中包含了两个异常对象。

要捕获可能嵌套在组中的此类异常,需要采取特殊措施。在 Python 3.11 及更高版本中,可以使用 except* 语法来捕获多个异常:

from anyio import create_task_group

try:
    async with create_task_group() as tg:
        tg.start_soon(some_task)
        tg.start_soon(another_task)
except* ValueError as excgroup:
    for exc in excgroup.exceptions:
        ...  # 处理每个 ValueError
except* KeyError as excgroup:
    for exc in excgroup.exceptions:
        ...  # 处理每个 KeyError

如果需要兼容旧版本的 Python,可以使用 exceptiongroup 包中的 catch() 函数:

from anyio import create_task_group
from exceptiongroup import catch

def handle_valueerror(excgroup: ExceptionGroup) -> None:
    for exc in excgroup.exceptions:
        ...  # 处理每个 ValueError

def handle_keyerror(excgroup: ExceptionGroup) -> None:
    for exc in excgroup.exceptions:
        ...  # 处理每个 KeyError

with catch({
    ValueError: handle_valueerror,
    KeyError: handle_keyerror
}):
    async with create_task_group() as tg:
        tg.start_soon(some_task)
        tg.start_soon(another_task)

如果需要在处理器中设置局部变量,可以将其声明为 nonlocal:

def handle_valueerror(exc):
    nonlocal somevariable
    somevariable = 'whatever'

上下文传播

Context propagation

每当生成一个新任务时,context 将被复制到该新任务中。需要特别注意*哪个*上下文会被复制到新生成的任务中。被复制的不是任务组的宿主任务的上下文,而是调用 TaskGroup.start()TaskGroup.start_soon() 的任务的上下文。

与 asyncio.TaskGroup 的区别

Differences with asyncio.TaskGroup

asyncio.TaskGroup 类是在 Python 3.11 中新增的,其设计与 AnyIO 的 TaskGroup 类非常相似。然而,asyncio 的对应类在语义上有一些重要的区别:

  • 任务组本身是直接实例化的,而不是通过工厂函数创建

  • 任务仅通过 create_task() 生成;没有 start()start_soon() 方法

  • create_task() 方法返回一个任务对象,可以进行 await 操作(或取消)

  • 通过 create_task() 生成的任务只能单独取消(任务组中没有 cancel() 方法或类似的方法)

  • 当通过 create_task() 生成的任务在其协程开始运行之前被取消时,它将无法处理取消异常

  • asyncio.TaskGroup 不允许在某个任务发生异常并触发任务组关闭后再启动新任务