使用线程

Working with threads

实际的异步应用程序有时需要执行网络、文件或计算开销大的操作。这些操作通常会阻塞异步事件循环,导致性能问题。解决方案是将此类代码放在 工作线程(worker threads) 中运行。使用工作线程可以让事件循环继续运行其他任务,同时工作线程运行阻塞调用。

在工作线程中运行函数

Running a function in a worker thread

在工作线程中运行一个(同步的)可调用对象:

import time

from anyio import to_thread, run


async def main():
    await to_thread.run_sync(time.sleep, 5)

run(main)

默认情况下,当任务在等待工作线程完成时,它们会被保护免受取消。您可以传递 cancellable=True 参数来允许这些任务被取消。然而,请注意,线程仍然会继续运行——只是它的结果将被忽略。

从工作线程调用异步代码

Calling asynchronous code from a worker thread

如果您需要从工作线程调用一个协程函数,可以这样做:

from anyio import from_thread, sleep, to_thread, run


def blocking_function():
    from_thread.run(sleep, 5)


async def main():
    await to_thread.run_sync(blocking_function)

run(main)

备注

必须使用 run_sync() 启动工作线程,才能使此功能正常工作。

从工作线程调用同步代码

Calling synchronous code from a worker thread

有时您可能需要从工作线程在事件循环线程中调用同步代码。常见的情况包括设置异步事件或将数据发送到内存对象流。因为这些方法不是线程安全的,所以您需要安排它们在事件循环线程中调用,使用 run_sync():

import time

from anyio import Event, from_thread, to_thread, run

def worker(event):
    time.sleep(1)
    from_thread.run_sync(event.set)

async def main():
    event = Event()
    await to_thread.run_sync(worker, event)
    await event.wait()

run(main)

从外部线程调用异步代码

Calling asynchronous code from an external thread

如果您需要从一个不是由事件循环生成的工作线程中运行异步代码,您需要一个 阻塞门户(blocking portal) 。这个门户需要在事件循环线程中获取。

一种方法是使用 start_blocking_portal 启动一个新事件循环并获取门户(它的参数与 run() 基本相同):

from anyio.from_thread import start_blocking_portal


with start_blocking_portal(backend='trio') as portal:
    portal.call(...)

如果您已经有一个正在运行的事件循环,并希望允许外部线程访问,您可以直接创建一个 BlockingPortal:

from anyio import run
from anyio.from_thread import BlockingPortal


async def main():
    async with BlockingPortal() as portal:
        # ...将门户交给外部线程...
        await portal.sleep_until_stopped()

run(main)

从工作线程生成任务

Spawning tasks from worker threads

当您需要生成一个在后台运行的任务时,可以使用 start_task_soon() 来实现:

from concurrent.futures import as_completed

from anyio import sleep
from anyio.from_thread import start_blocking_portal


async def long_running_task(index):
    await sleep(1)
    print(f'Task {index} running...')
    await sleep(index)
    return f'Task {index} return value'


with start_blocking_portal() as portal:
    futures = [portal.start_task_soon(long_running_task, i) for i in range(1, 5)]
    for future in as_completed(futures):
        print(future.result())

通过取消返回的 Future 来取消这种方式生成的任务。

阻塞门户还提供了一个与 TaskGroup.start() 类似的方法 start_task(),它与其对等方法一样,通过调用 task_status.started() 等待可调用对象信号准备就绪:

from anyio import sleep, TASK_STATUS_IGNORED
from anyio.from_thread import start_blocking_portal


async def service_task(*, task_status=TASK_STATUS_IGNORED):
    task_status.started('STARTED')
    await sleep(1)
    return 'DONE'


with start_blocking_portal() as portal:
    future, start_value = portal.start_task(service_task)
    print('Task has started with value', start_value)

    return_value = future.result()
    print('Task has finished with return value', return_value)

从工作线程使用异步上下文管理器

Using asynchronous context managers from worker threads

您可以使用 wrap_async_context_manager() 将异步上下文管理器包装为同步上下文管理器:

from anyio.from_thread import start_blocking_portal


class AsyncContextManager:
    async def __aenter__(self):
        print('entering')

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print('exiting with', exc_type)


async_cm = AsyncContextManager()
with start_blocking_portal() as portal, portal.wrap_async_context_manager(async_cm):
    print('inside the context manager block')

备注

不能在事件循环线程中的同步回调中使用包装的异步上下文管理器。

上下文传播

Context propagation

在工作线程中运行函数时,当前上下文会被复制到工作线程。因此,任务中可用的任何上下文变量也将在线程中运行的代码中可用。与上下文变量一样,对它们所做的任何更改将不会传播回调用的异步任务。

从工作线程调用异步代码时,上下文再次被复制到事件循环线程中调用目标函数的任务。

调整默认最大工作线程数

Adjusting the default maximum worker thread count

默认的 AnyIO 工作线程限制器值为 40,这意味着任何没有显式 limiter 参数的 to_thread.run_sync() 调用将最多会启动 40 个线程。您可以像这样调整此限制:

from anyio import to_thread

async def foo():

# 设置最大工作线程数为 60 to_thread.current_default_thread_limiter().total_tokens = 60

备注

AnyIO 的默认线程池限制器不会影响 asyncio 中的默认线程池执行器。

对工作线程中的取消做出反应

Reacting to cancellation in worker threads

虽然 Python 没有机制来取消在线程中运行的代码,但 AnyIO 提供了一种机制,允许用户代码自愿检查宿主任务的作用域是否已被取消,如果已取消,则引发取消异常。这可以通过简单地调用 from_thread.check_cancelled() 来实现:

from anyio import to_thread, from_thread

def sync_function():
while True:

from_thread.check_cancelled() print("还没有取消") sleep(1)

async def foo():
with move_on_after(3):

await to_thread.run_sync(sync_function)

按需共享阻塞门户

Sharing a blocking portal on demand

如果您正在构建一个需要按需启动阻塞门户的同步 API,您可能需要比每次调用都启动一个阻塞门户更高效的解决方案。为此,您可以使用 BlockingPortalProvider:

from anyio.from_thread import BlockingPortalProvider

class MyAPI:
    def __init__(self, async_obj) -> None:
        self._async_obj = async_obj
        self._portal_provider = BlockingPortalProvider()

    def do_stuff(self) -> None:
        with self._portal_provider as portal:
            portal.call(async_obj.do_async_stuff)

现在,无论有多少线程同时调用 MyAPI 实例上的 do_stuff() 方法,都会使用相同的阻塞门户来处理内部的异步调用。很容易看出,这比每个调用都启动自己的阻塞门户要高效得多。