使用同步原语

Using synchronization primitives

同步原语是任务用来相互通信和协调的对象。它们对于分配工作负载、通知其他任务以及保护对共享资源的访问等操作非常有用。

备注

AnyIO 原语不是线程安全的,因此不应直接从工作线程中使用。对于这种情况,请使用 run_sync()

事件

Events

事件用于通知任务它们一直在等待的事情已经发生。一个事件对象可以有多个监听者,当事件被触发时,所有监听者都会被通知。

示例:

from anyio import Event, create_task_group, run


async def notify(event):
    event.set()


async def main():
    event = Event()
    async with create_task_group() as tg:
        tg.start_soon(notify, event)
        await event.wait()
        print('Received notification!')

run(main)

备注

与标准库的事件不同,AnyIO 事件不能被重用,必须被替换。这种做法防止了一类竞争条件,并且与 Trio 库的语义一致。

信号量

Semaphores

信号量用于限制对共享资源的访问。信号量从一个最大值开始,每次任务获取信号量时,该值会递减,而释放时会递增。如果值降到零,任何获取信号量的尝试都会被阻塞,直到另一个任务释放它。

示例:

from anyio import Semaphore, create_task_group, sleep, run

async def use_resource(tasknum, semaphore):
    async with semaphore:
        print('Task number', tasknum, 'is now working with the shared resource')
        await sleep(1)


async def main():
    semaphore = Semaphore(2)
    async with create_task_group() as tg:
        for num in range(10):
            tg.start_soon(use_resource, num, semaphore)

run(main)

小技巧

如果信号量的性能对你来说很重要,你可以将 fast_acquire=True 传递给 Semaphore 。这样做会跳过 Semaphore.acquire() 中的 cancel_shielded_checkpoint() 调用,如果没有争用(即获取信号量立即成功)。在某些情况下,这可能会导致任务在使用信号量的循环中永远不会将控制权交还给事件循环,特别是在该循环没有其他 yield 点的情况下。

Locks

锁用于保护共享资源,以确保每次只有一个任务能够独占访问。它们的功能类似于最大值为 1 的信号量,唯一的区别是,只有获取锁的任务才被允许释放它。

示例:

from anyio import Lock, create_task_group, sleep, run


async def use_resource(tasknum, lock):
    async with lock:
        print('Task number', tasknum, 'is now working with the shared resource')
        await sleep(1)


async def main():
    lock = Lock()
    async with create_task_group() as tg:
        for num in range(4):
            tg.start_soon(use_resource, num, lock)

run(main)

小技巧

如果锁的性能对你来说很重要,你可以将 fast_acquire=True 传递给 Lock。这样做会跳过 Lock.acquire() 中的 cancel_shielded_checkpoint() 调用,如果没有争用(即获取锁立即成功)。在某些情况下,这可能会导致任务在使用锁的循环中永远不会将控制权交还给事件循环,特别是在该循环没有其他 yield 点的情况下。

条件

Conditions

条件实际上是事件和锁的结合体。它首先获取一个锁,然后等待来自事件的通知。一旦条件接收到通知,它将释放锁。通知任务还可以选择一次唤醒多个监听者,甚至是所有监听者。

Lock 类似,Condition 也要求获取锁的任务必须是释放锁的任务。

示例:

from anyio import Condition, create_task_group, sleep, run


async def listen(tasknum, condition):
    async with condition:
        await condition.wait()
        print('Woke up task number', tasknum)


async def main():
    condition = Condition()
    async with create_task_group() as tg:
        for tasknum in range(6):
            tg.start_soon(listen, tasknum, condition)

        await sleep(1)
        async with condition:
            condition.notify(1)

        await sleep(1)
        async with condition:
            condition.notify(2)

        await sleep(1)
        async with condition:
            condition.notify_all()

run(main)

容量限制器

Capacity limiters

容量限制器类似于信号量,不同之处在于单个借用者(默认是当前任务)一次只能持有一个令牌。也可以代表任何可哈希对象借用令牌。

示例:

from anyio import CapacityLimiter, create_task_group, sleep, run


async def use_resource(tasknum, limiter):
    async with limiter:
        print('Task number', tasknum, 'is now working with the shared resource')
        await sleep(1)


async def main():
    limiter = CapacityLimiter(2)
    async with create_task_group() as tg:
        for num in range(10):
            tg.start_soon(use_resource, num, limiter)

run(main)

你可以通过设置限制器的 total_tokens 属性来调整令牌的总数。

资源保护器

Resource guards

某些资源,如套接字,对于并发使用非常敏感,甚至不应允许尝试并发使用。在这种情况下,ResourceGuard 是合适的解决方案:

class Resource:
    def __init__(self):
        self._guard = ResourceGuard()

    async def do_something() -> None:
        with self._guard:
            ...

现在,如果另一个任务在第一个调用尚未完成之前尝试在同一个 Resource 实例上调用 do_something() 方法,这将引发 BusyResourceError

队列

Queues

作为队列的替代,AnyIO 提供了一个更强大的构造:内存对象流