15. 在 Asyncio 中运行阻塞任务¶
15. Run a Blocking Task in Asyncio
阻塞任务是阻止当前线程继续执行的任务。
如果在 asyncio 程序中执行阻塞任务,它将停止整个事件循环,从而阻止任何其他协程进行。
我们可以通过 asyncio.to_thread() 和 loop.run_in_executor() 函数在 asyncio 程序中异步运行阻塞调用。
A blocking task is a task that stops the current thread from progressing.
If a blocking task is executed in an asyncio program it stops the entire event loop, preventing any other coroutines from progressing.
We can run blocking calls asynchronously in an asyncio program via the asyncio.to_thread() and loop.run_in_executor() functions.
15.1 需要在 Asyncio 中运行阻塞任务¶
15.1 Need to Run Blocking Tasks in Asyncio
asyncio的重点是异步编程和非阻塞IO。
然而,我们经常需要在异步应用程序中执行阻塞函数调用。
这可能有多种原因,例如:
- 执行 CPU 密集型任务,例如计算某些内容。
- 执行阻塞 IO 密集型任务,例如从文件中读取或写入。
- 调用尚不支持 asyncio 的第三方库。
直接在 asyncio 程序中进行阻塞调用将导致事件循环在阻塞调用执行时停止。 它不会允许其他协程在后台运行。
我们如何在 asyncio 程序中异步执行阻塞调用?
The focus of asyncio is asynchronous programming and non-blocking IO.
Nevertheless, we often need to execute a blocking function call within an asyncio application.
This could be for many reasons, such as:
- To execute a CPU-bound task like calculating something.
- To execute a blocking IO-bound task like reading or writing from a file.
- To call into a third-party library that does not support asyncio yet.
Making a blocking call directly in an asyncio program will cause the event loop to stop while the blocking call is executing. It will not allow other coroutines to run in the background.
How can we execute a blocking call in an asyncio program asynchronously?
15.2 如何运行阻塞任务¶
15.2 How to Run Blocking Tasks
asyncio 模块提供了两种在 asyncio 程序中执行阻塞调用的方法。
第一种是使用 asyncio.to_thread() 函数。
这是高级 API 中的内容,适用于应用程序开发人员。
asyncio.to_thread() 函数采用要执行的函数名称和任何参数。
该函数在单独的线程中执行。 它返回一个可以作为独立任务等待或调度的协程。
例如:
...
# 在单独的线程中执行函数
await asyncio.to_thread(task)
在返回的协程有机会在事件循环中运行之前,任务不会开始执行。
asyncio.to_thread() 函数在后台创建一个 ThreadPoolExecutor 来执行阻塞调用。
因此, asyncio.to_thread() 函数仅适用于 IO 密集型任务。
另一种方法是使用 loop.run_in_executor() 函数。
这是在低级 asyncio API 中,首先需要访问事件循环,例如通过 asyncio.get_running_loop() 函数。
loop.run_in_executor() 函数需要一个执行器和一个要执行的函数。
如果为执行器提供 None,则使用默认执行器,即 ThreadPoolExecutor。
loop.run_in_executor() 函数返回一个可等待的对象,如果需要,可以等待。 该任务将立即开始执行,因此不需要等待或安排返回的可等待对象来开始执行阻塞调用。
例如:
...
# 获取事件循环
loop = asyncio.get_running_loop()
# 在单独的线程中执行函数
await loop.run_in_executor(None, task)
或者,可以创建一个执行器并将其传递给 loop.run_in_executor() 函数,该函数将在执行器中执行异步调用。
在这种情况下,调用者必须管理执行器,在调用者完成后将其关闭。
例如:
...
# 创建进程池
with ProcessPoolExecutor as exe:
# 获取事件循环
loop = asyncio.get_running_loop()
# 在单独的线程中执行函数
await loop.run_in_executor(exe, task)
# 进程池自动关闭...
这两种方法允许阻塞调用作为 asyncio 程序中的异步任务执行。
现在我们知道如何在 asyncio 程序中执行阻塞调用,让我们看一些有效的示例。
The asyncio module provides two approaches for executing blocking calls in asyncio programs.
The first is to use the asyncio.to_thread() function.
This is in the high-level API and is intended for application developers.
The asyncio.to_thread() function takes a function name to execute and any arguments.
The function is executed in a separate thread. It returns a coroutine that can be awaited or scheduled as an independent task.
For example:
...
# execute a function in a separate thread
await asyncio.to_thread(task)
The task will not begin executing until the returned coroutine is given an opportunity to run in the event loop.
The asyncio.to_thread() function creates a ThreadPoolExecutor behind the scenes to execute blocking calls.
As such, the asyncio.to_thread() function is only appropriate for IO-bound tasks.
An alternative approach is to use the loop.run_in_executor() function.
This is in the low-level asyncio API and first requires access to the event loop, such as via the asyncio.get_running_loop() function.
The loop.run_in_executor() function takes an executor and a function to execute.
If None is provided for the executor, then the default executor is used, which is a ThreadPoolExecutor.
The loop.run_in_executor() function returns an awaitable that can be awaited if needed. The task will begin executing immediately, so the returned awaitable does not need to be awaited or scheduled for the blocking call to start executing.
For example:
...
# get the event loop
loop = asyncio.get_running_loop()
# execute a function in a separate thread
await loop.run_in_executor(None, task)
Alternatively, an executor can be created and passed to the loop.run_in_executor() function, which will execute the asynchronous call in the executor.
The caller must manage the executor in this case, shutting it down once the caller is finished with it.
For example:
...
# create a process pool
with ProcessPoolExecutor as exe:
# get the event loop
loop = asyncio.get_running_loop()
# execute a function in a separate thread
await loop.run_in_executor(exe, task)
# process pool is shutdown automatically...
These two approaches allow a blocking call to be executed as an asynchronous task in an asyncio program.
Now that we know how to execute blocking calls in an asyncio program, let’s look at some worked examples.
15.3 使用 to_thread() 在 Asyncio 中运行 I/O 密集型任务的示例¶
15.3 Example of Running I/O-Bound Task in Asyncio with to_thread()
我们可以探索如何使用 asyncio.to_thread() 在 asyncio 程序中执行阻塞 IO 绑定调用。
在此示例中,我们将定义一个函数来阻止调用者几秒钟。 然后,我们将使用 asyncio.to_thread() 函数在 asyncio 的线程池中异步执行此函数。
这将使调用者能够自由地继续其他活动。
下面列出了完整的示例。
# SuperFastPython.com
# 在 asyncio 中运行阻塞 io 绑定任务的示例
import asyncio
import time
# 阻塞 io 绑定任务
def blocking_task():
# 报告消息
print('Task starting')
# 阻塞暂停2s
time.sleep(2)
# 报告消息
print('Task done')
# 主协程
async def main():
# 报告消息
print('Main running the blocking task')
# 为阻塞任务创建一个协程
coro = asyncio.to_thread(blocking_task)
# 安排任务
task = asyncio.create_task(coro)
# 报告消息
print('Main doing other things')
# 允许计划任务启动
await asyncio.sleep(0)
# 等待任务
await task
# 运行异步程序
asyncio.run(main())
运行该示例首先创建 main() 协程,并将其作为 asyncio 程序的入口点运行。
main() 协程运行并报告一条消息。 然后它向线程池发出对阻塞函数的调用。 这返回一个协程,
然后协程被包装在 Task 中并独立执行。
main() 协程可以自由地继续其他活动。 在这种情况下,它会休眠一会儿以允许计划任务开始执行。 这使得目标函数可以在后台发布到ThreadPoolExecutor并开始运行。
然后 main() 协程挂起并等待任务完成。
阻塞函数报告一条消息,休眠2秒,然后报告最后一条消息。
这突出显示了我们如何在单独的线程中从 asyncio 程序异步执行阻塞 IO 绑定任务。
Main running the blocking task
Main doing other things
Task starting
Task done
您可以在教程中了解有关 to_thread() 函数的更多信息:
接下来,我们将探讨如何开发和使用异步迭代器。
We can explore how to execute a blocking IO-bound call in an asyncio program using asyncio.to_thread().
In this example, we will define a function that blocks the caller for a few seconds. We will then execute this function asynchronously in a thread pool from asyncio using the asyncio.to_thread() function.
This will free the caller to continue with other activities.
The complete example is listed below.
# SuperFastPython.com
# example of running a blocking io-bound task in asyncio
import asyncio
import time
# a blocking io-bound task
def blocking_task():
# report a message
print('Task starting')
# block for a while
time.sleep(2)
# report a message
print('Task done')
# main coroutine
async def main():
# report a message
print('Main running the blocking task')
# create a coroutine for the blocking task
coro = asyncio.to_thread(blocking_task)
# schedule the task
task = asyncio.create_task(coro)
# report a message
print('Main doing other things')
# allow the scheduled task to start
await asyncio.sleep(0)
# await the task
await task
# run the asyncio program
asyncio.run(main())
Running the example first creates the main() coroutine and runs it as the entry point into the asyncio program.
The main() coroutine runs and reports a message. It then issues a call to the blocking function call to the thread pool. This returns a coroutine,
The coroutine is then wrapped in a Task and executed independently.
The main() coroutine is free to continue with other activities. In this case, it sleeps for a moment to allow the scheduled task to start executing. This allows the target function to be issued to the ThreadPoolExecutor behind the scenes and start running.
The main() coroutine then suspends and waits for the task to complete.
The blocking function reports a message, sleeps for 2 seconds, then reports a final message.
This highlights how we can execute a blocking IO-bound task in a separate thread asynchronously from an asyncio program.
Main running the blocking task
Main doing other things
Task starting
Task done
You can learn more about the to_thread() function in the tutorial:
Next, we will explore how to develop and use asynchronous iterators.
创建日期: 2024年9月4日