跳转至

Asyncio 表达式及API备忘录

为什么是asyncio ?

Asyncio 提供适合非阻塞套接字 I/O 应用程序的基于协程的并发性。

协程

导入:

import asyncio

定义一个协程:

async def custom_coroutine():
    # ...

创建协程对象:

coro = custom_coroutine()

运行协程的入口:

asyncio.run(main())

从协程中挂起和运行协程:

await custom_coroutine()

sleep 一个协程

await asyncio.sleep(1)

异步推导和循环

异步推导式

res = [r async for r in async_gen()]

Await 推导式

res = [r await a in awaitables]

异步for循环

async for item in async_gen():
    print(item)

任务(Task)

将任务作为协程并调度为独立运行。

创建并调度一个任务(高级):

task = asyncio.create_task(coro)

创建并调度一个任务(低级):

task = asyncio.create_task(coro)

创建并调度一个任务

task = asyncio.ensure_future(coro)

挂起并等待任务完成

await task

获取当前任务

task = asyncio.current_task()

获取所有正在运行的任务

tasks = asyncio.all_tasks()

获取任务结果

value = task.result()

获取任务未处理的异常

ex = task.exception()

取消一个任务

was_canceled = task.cancel()

检查任务是否运行完毕(未运行)

if task.done():
    # ...

检查任务是否被取消

if task.cancelled():
    # ...

添加执行完后的回调函数

task.add_done_callback(handler)

删除任务的执行后的回调函数

task.remove_done_callback(handler)

设置和获取任务名称

task.set_name("MyTask")
name = task.get_name()

操作任务合集

操作一个可等待任务合集、任务、或者任务合集

带超时的使用Await

try:
    await asyncio.wait_for(tk, timeout=1)
except asyncio.TimeoutError:
    # ...

防止任务被取消

shielded = asyncio.shield(task)

在新线程中运行阻塞任务

coro = asyncio.to_thread(myfunc)

在asyncio事件循环中运行任务

fut = run_coroutine_threadsafe(coro, loop)

将许多可等待协程对象作为组运行

await asyncio.gather(c1(), c2())

等待一个集合中的所有协程任务

done, pen = await asyncio.wait(tasks)

带超时的等待一个集合中的所有协程任务

try:
    done, pen = await asyncio.wait(tasks, timeout=5)
except asyncio.TimeoutError:
    # ...

等待一个任务合集中的第一个任务完成

done, pen = await asyncio.wait(tasks, return_when=FIRST_COMPLETED)

等待一个任务合集中的第一个任务失败

done, pen = await asyncio.wait(tasks, return_when=FIRST_EXCEPTION)

按任务完成顺序获取结果

for c in asyncio.as_completed(tasks):
    result = await c

非阻塞I/O的子进程

在子进程中运行命令

p = await create_subprocess_exec('ls')

使用shell和子进程运行命令

p = await create_subprocess_shell('ls')

从子进程中读取数据

await process.communicate(input=data)

终止一个子进程

process.terminate()

非阻塞I/O 流

打开一个客户端TCP链接

reader, writer = await open_connection('google.com', 80)

启动一个TCP服务

server = await start_server(hanler, '127.0.0.1', 9876)

从套接字读取数据

data = await reader.readline()

往套接字写入数据

writer.write(data)

清空socket并等待就绪

await writer.drain()

关闭套接字连接

writer.close()
await writer.wait_closed()

信号量和事件和条件

信号量,设置 num 个位置

semaphore = asyncio.Semaphore(10)
await semaphore.acquire()
# ...
semaphore.release()

信号量, 上下文管理

async with semaphore:
    # ...

创建事件,并设置事件

event = asyncio.Event()
event.set()

检查事件是否设置

if event.is_set():
    # ...

等待事件被设置(阻塞)

await event.wait()

条件变量

condition = asyncio.Condition()
await condition.acquired()
# ...
condition.release()

等待条件被通知(阻塞)

async with condition:
    await condition.wait()

等待表达式的条件(阻塞)

async with condition:
    await condition.wait_for(check)

通知任何等待条件的单个线程

async with condition:
    condition.notify(n=1)

通知所有等待条件的线程

async with condition:
    condition.notify_all()

异步锁(Async Locks)

互斥锁

lock = asyncio.Lock()
await lock.acquire()
# ...
lock.release()

互斥锁和上下文管理器

async with lock:
    # ...

队列

Queue 、LifoQueue, PriorityQueue

普通队列、后进先出队列、优先级队列

创建队列

queue = asyncio.Queue()

创建队列并限制数量

queue = asyncio.Queue(100)

添加元素到队列中(阻塞,如果限制了数量):

await queue.put(item)

从队列中获取元素

item = await queue.get()

检查队列是否为空

if queue.empty():
    # ...

检查队列是否已满

if queue.full():
    # ...

获取队列当前容量

capacity = queue.qsize()

将工作单元标记为完成

queue.task_done()

等待所有单元完成

await queue.join()

异步生成器和迭代器

定义异步生成器

async def async_generator():
    for i in range(10):
        await asyncio.sleep(1)
        yield i

定义异步迭代器

class AsyncIterator():
    def __init__(self):
        self.counter = 0
    def __aiter__(self):
        return self
    async def __anext__(self):
        if self.counter >= 10:
            raise StopAsyncIteration
        await asyncio.sleep(1)
        self.counter += 1
        return self.counter

使用异步迭代器

async for value in AsyncIterator():
    # ...

异步上下文管理器

定义异步上下文管理器

class AsyncContextManager():
    async def __aenter__(self):
        await asyncio.sleep(1)
    async def __aexit__(self, et, exc, tb):
        await asyncio.sleep(1)

使用异步迭代器

async with CustomClass() as mgmr:
    # ...

最后更新: 2023年7月3日
创建日期: 2023年7月3日