任务同步和通信

Synchronizing and communicating between tasks

Trio 提供了一组标准的同步和任务间通信原语。这些对象的 API 通常是根据标准库中的类似类建模的,但也存在一些差异。

Trio provides a standard set of synchronization and inter-task communication primitives. These objects' APIs are generally modelled off of the analogous classes in the standard library, but with some differences.

阻塞和非阻塞方法

Blocking and non-blocking methods

标准库的同步原语提供了多种机制来指定超时和阻塞行为,并且可以指示一个操作是因成功还是因超时返回的。

在 Trio 中,我们统一采用以下约定:

  • 我们不提供超时参数。如果你需要超时,可以使用取消作用域(cancel scope)。

  • 对于具有非阻塞版本的操作,阻塞和非阻塞版本是不同的方法,分别命名为 XX_nowait。(这类似于 queue.Queue,但不同于大多数 threading 中的类。)我们喜欢这种方法,因为它使我们能够将阻塞版本设为异步,而将非阻塞版本设为同步。

  • 当非阻塞方法无法成功执行时(例如通道为空、锁已经被占用等),它会引发 trio.WouldBlock 异常。没有类似于 queue.Emptyqueue.Full 的区别 —— 我们只使用一个异常,并始终如一地使用它。

The standard library synchronization primitives have a variety of mechanisms for specifying timeouts and blocking behavior, and of signaling whether an operation returned due to success versus a timeout.

In Trio, we standardize on the following conventions:

  • We don't provide timeout arguments. If you want a timeout, then use

    a cancel scope.

  • For operations that have a non-blocking variant, the blocking and

    non-blocking variants are different methods with names like X and X_nowait, respectively. (This is similar to queue.Queue, but unlike most of the classes in threading.) We like this approach because it allows us to make the blocking version async and the non-blocking version sync.

  • When a non-blocking method cannot succeed (the channel is empty, the

    lock is already held, etc.), then it raises trio.WouldBlock. There's no equivalent to the queue.Empty versus queue.Full distinction – we just have the one exception that we use consistently.

公平性

Fairness

这些类都保证是“公平的”,这意味着在选择谁将下一个获得锁、从队列中取出项等操作时,任务总是按照等待时间最长的顺序来决定。虽然目前尚不完全清楚这是否是最好的选择(参见 issues#54 ),但目前就是这样工作的。

作为这个含义的一个例子,下面是一个小程序,其中两个任务竞争一个锁。请注意,释放锁的任务总是在其他任务有机会运行之前立即尝试重新获取锁。(请记住,我们这里使用的是协作式多任务,所以实际上是 确定性 的,释放锁的任务会在其他任务醒来之前调用 acquire();在 Trio 中,释放锁不是一个检查点。)如果使用不公平的锁,这将导致同一个任务永远持有锁,另一个任务被饿死。但如果你运行这个程序,你会看到两个任务轮流获得锁:

# fairness-demo.py

import trio

async def loopy_child(number, lock):
   while True:
      async with lock:
            print(f"Child {number} has the lock!")
            await trio.sleep(0.5)

async def main():
   async with trio.open_nursery() as nursery:
      lock = trio.Lock()
      nursery.start_soon(loopy_child, 1, lock)
      nursery.start_soon(loopy_child, 2, lock)

trio.run(main)

These classes are all guaranteed to be "fair", meaning that when it comes time to choose who will be next to acquire a lock, get an item from a queue, etc., then it always goes to the task which has been waiting longest. It's not entirely clear whether this is the best choice, but for now that's how it works.

As an example of what this means, here's a small program in which two tasks compete for a lock. Notice that the task which releases the lock always immediately attempts to re-acquire it, before the other task has a chance to run. (And remember that we're doing cooperative multi-tasking here, so it's actually deterministic that the task releasing the lock will call acquire() before the other task wakes up; in Trio releasing a lock is not a checkpoint.) With an unfair lock, this would result in the same task holding the lock forever and the other task being starved out. But if you run this, you'll see that the two tasks politely take turns:

# fairness-demo.py

import trio

async def loopy_child(number, lock):
   while True:
      async with lock:
            print(f"Child {number} has the lock!")
            await trio.sleep(0.5)

async def main():
   async with trio.open_nursery() as nursery:
      lock = trio.Lock()
      nursery.start_soon(loopy_child, 1, lock)
      nursery.start_soon(loopy_child, 2, lock)

trio.run(main)

使用 Event 广播事件

Broadcasting an event with :class:`Event`

class trio.Event

基类:object

A waitable boolean value useful for inter-task synchronization, inspired by threading.Event.

An event object has an internal boolean flag, representing whether the event has happened yet. The flag is initially False, and the wait() method waits until the flag is True. If the flag is already True, then wait() returns immediately. (If the event has already happened, there's nothing to wait for.) The set() method sets the flag to True, and wakes up any waiters.

This behavior is useful because it helps avoid race conditions and lost wakeups: it doesn't matter whether set() gets called just before or after wait(). If you want a lower-level wakeup primitive that doesn't have this protection, consider Condition or trio.lowlevel.ParkingLot.

备注

Unlike threading.Event, trio.Event has no ~threading.Event.clear method. In Trio, once an Event has happened, it cannot un-happen. If you need to represent a series of events, consider creating a new Event object for each one (they're cheap!), or other synchronization methods like channels or trio.lowlevel.ParkingLot.

is_set()

Return the current value of the internal flag.

返回类型:

bool

set()

Set the internal flag value to True, and wake any waiting tasks.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: EventStatistics

  • tasks_waiting: The number of tasks blocked on this event's wait() method.

await wait()

Block until the internal flag value becomes True.

If it's already True, then this method returns immediately.

返回类型:

None

class trio.EventStatistics(tasks_waiting)

基类:object

An object containing debugging information.

Currently the following fields are defined:

  • tasks_waiting: The number of tasks blocked on this event's trio.Event.wait() method.

使用通道在任务之间传递值

Using channels to pass values between tasks

Channels 允许你在不同任务之间安全且便捷地传递对象。它们特别适用于实现生产者/消费者模式。

核心的通道 API 由抽象基类 trio.abc.SendChanneltrio.abc.ReceiveChannel 定义。你可以使用它们来实现自定义的通道,执行类似于在进程之间或通过网络传递对象的操作。但在许多情况下,你只想在单个进程内的不同任务之间传递对象,针对这种情况,你可以使用 trio.open_memory_channel()

trio.open_memory_channel(max_buffer_size)

Open a channel for passing objects between tasks within a process.

Memory channels are lightweight, cheap to allocate, and entirely in-memory. They don't involve any operating-system resources, or any kind of serialization. They just pass Python objects directly between tasks (with a possible stop in an internal buffer along the way).

Channel objects can be closed by calling ~trio.abc.AsyncResource.aclose or using async with. They are not automatically closed when garbage collected. Closing memory channels isn't mandatory, but it is generally a good idea, because it helps avoid situations where tasks get stuck waiting on a channel when there's no-one on the other side. See 使用通道进行干净关闭 for details.

Memory channel operations are all atomic with respect to cancellation, either ~trio.abc.ReceiveChannel.receive will successfully return an object, or it will raise Cancelled while leaving the channel unchanged.

参数:

max_buffer_size (int or math.inf) -- The maximum number of items that can be buffered in the channel before send() blocks. Choosing a sensible value here is important to ensure that backpressure is communicated promptly and avoid unnecessary latency; see 在通道中缓冲 for more details. If in doubt, use 0.

返回类型:

tuple[MemorySendChannel[TypeVar(T)], MemoryReceiveChannel[TypeVar(T)]]

返回:

A pair (send_channel, receive_channel). If you have trouble remembering which order these go in, remember: data flows from left → right.

In addition to the standard channel methods, all memory channel objects provide a statistics() method, which returns an object with the following fields:

  • current_buffer_used: The number of items currently stored in the channel buffer.

  • max_buffer_size: The maximum number of items allowed in the buffer, as passed to open_memory_channel().

  • open_send_channels: The number of open MemorySendChannel endpoints pointing to this channel. Initially 1, but can be increased by MemorySendChannel.clone().

  • open_receive_channels: Likewise, but for open MemoryReceiveChannel endpoints.

  • tasks_waiting_send: The number of tasks blocked in send on this channel (summing over all clones).

  • tasks_waiting_receive: The number of tasks blocked in receive on this channel (summing over all clones).

备注

如果你曾经使用过 threadingasyncio 模块,你可能熟悉 queue.Queueasyncio.Queue。在 Trio 中,open_memory_channel() 是你在寻找队列时使用的功能。主要的区别在于,Trio 将经典的队列接口分成了两个对象。这种做法的优势是,它使得将两端放置在不同进程中成为可能,而无需重写代码,并且我们可以分别关闭这两端。

MemorySendChannelMemoryReceiveChannel 还提供了超出核心通道接口的其他一些功能:

class trio.MemorySendChannel(state, closed=False, tasks=NOTHING)

基类:SendChannel[SendType]

await aclose()

Close this send channel object asynchronously.

See MemorySendChannel.close.

返回类型:

None

clone()

Clone this send channel object.

This returns a new MemorySendChannel object, which acts as a duplicate of the original: sending on the new object does exactly the same thing as sending on the old object. (If you're familiar with os.dup, then this is a similar idea.)

However, closing one of the objects does not close the other, and receivers don't get EndOfChannel until all clones have been closed.

This is useful for communication patterns that involve multiple producers all sending objects to the same destination. If you give each producer its own clone of the MemorySendChannel, and then make sure to close each MemorySendChannel when it's finished, receivers will automatically get notified when all producers are finished. See 管理多个生产者和/或多个消费者 for examples.

抛出:

trio.ClosedResourceError -- if you already closed this MemorySendChannel object.

返回类型:

MemorySendChannel[TypeVar(SendType, contravariant=True)]

close()

Close this send channel object synchronously.

All channel objects have an asynchronous ~.AsyncResource.aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.

Using with send_channel: will close the channel object on leaving the with block.

返回类型:

None

await send(value)

See SendChannel.send <trio.abc.SendChannel.send>.

Memory channels allow multiple tasks to call send at the same time.

返回类型:

None

send_nowait(value)

Like ~trio.abc.SendChannel.send, but if the channel's buffer is full, raises WouldBlock instead of blocking.

返回类型:

None

statistics()

Returns a MemoryChannelStatistics for the memory channel this is associated with.

返回类型:

MemoryChannelStatistics

class trio.MemoryReceiveChannel(state, closed=False, tasks=NOTHING)

基类:ReceiveChannel[ReceiveType]

await aclose()

Close this receive channel object asynchronously.

See MemoryReceiveChannel.close.

返回类型:

None

clone()

Clone this receive channel object.

This returns a new MemoryReceiveChannel object, which acts as a duplicate of the original: receiving on the new object does exactly the same thing as receiving on the old object.

However, closing one of the objects does not close the other, and the underlying channel is not closed until all clones are closed. (If you're familiar with os.dup, then this is a similar idea.)

This is useful for communication patterns that involve multiple consumers all receiving objects from the same underlying channel. See 管理多个生产者和/或多个消费者 for examples. :rtype: MemoryReceiveChannel[TypeVar(ReceiveType, covariant=True)]

警告

The clones all share the same underlying channel. Whenever a clone receive()s a value, it is removed from the channel and the other clones do not receive that value. If you want to send multiple copies of the same stream of values to multiple destinations, like itertools.tee(), then you need to find some other solution; this method does not do that.

抛出:

trio.ClosedResourceError -- if you already closed this MemoryReceiveChannel object.

close()

Close this receive channel object synchronously.

All channel objects have an asynchronous ~.AsyncResource.aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.

Using with receive_channel: will close the channel object on leaving the with block.

返回类型:

None

await receive()

See ReceiveChannel.receive <trio.abc.ReceiveChannel.receive>.

Memory channels allow multiple tasks to call receive at the same time. The first task will get the first item sent, the second task will get the second item sent, and so on.

返回类型:

TypeVar(ReceiveType, covariant=True)

receive_nowait()

Like ~trio.abc.ReceiveChannel.receive, but if there's nothing ready to receive, raises WouldBlock instead of blocking.

返回类型:

TypeVar(ReceiveType, covariant=True)

statistics()

Returns a MemoryChannelStatistics for the memory channel this is associated with.

返回类型:

MemoryChannelStatistics

class trio.MemoryChannelStatistics(current_buffer_used, max_buffer_size, open_send_channels, open_receive_channels, tasks_waiting_send, tasks_waiting_receive)

基类:object

Channels allow you to safely and conveniently send objects between different tasks. They're particularly useful for implementing producer/consumer patterns.

The core channel API is defined by the abstract base classes trio.abc.SendChannel and trio.abc.ReceiveChannel. You can use these to implement your own custom channels, that do things like pass objects between processes or over the network. But in many cases, you just want to pass objects between different tasks inside a single process, and for that you can use trio.open_memory_channel():

trio.open_memory_channel(max_buffer_size)

Open a channel for passing objects between tasks within a process.

Memory channels are lightweight, cheap to allocate, and entirely in-memory. They don't involve any operating-system resources, or any kind of serialization. They just pass Python objects directly between tasks (with a possible stop in an internal buffer along the way).

Channel objects can be closed by calling ~trio.abc.AsyncResource.aclose or using async with. They are not automatically closed when garbage collected. Closing memory channels isn't mandatory, but it is generally a good idea, because it helps avoid situations where tasks get stuck waiting on a channel when there's no-one on the other side. See 使用通道进行干净关闭 for details.

Memory channel operations are all atomic with respect to cancellation, either ~trio.abc.ReceiveChannel.receive will successfully return an object, or it will raise Cancelled while leaving the channel unchanged.

参数:

max_buffer_size (int or math.inf) -- The maximum number of items that can be buffered in the channel before send() blocks. Choosing a sensible value here is important to ensure that backpressure is communicated promptly and avoid unnecessary latency; see 在通道中缓冲 for more details. If in doubt, use 0.

返回类型:

tuple[MemorySendChannel[TypeVar(T)], MemoryReceiveChannel[TypeVar(T)]]

返回:

A pair (send_channel, receive_channel). If you have trouble remembering which order these go in, remember: data flows from left → right.

In addition to the standard channel methods, all memory channel objects provide a statistics() method, which returns an object with the following fields:

  • current_buffer_used: The number of items currently stored in the channel buffer.

  • max_buffer_size: The maximum number of items allowed in the buffer, as passed to open_memory_channel().

  • open_send_channels: The number of open MemorySendChannel endpoints pointing to this channel. Initially 1, but can be increased by MemorySendChannel.clone().

  • open_receive_channels: Likewise, but for open MemoryReceiveChannel endpoints.

  • tasks_waiting_send: The number of tasks blocked in send on this channel (summing over all clones).

  • tasks_waiting_receive: The number of tasks blocked in receive on this channel (summing over all clones).

备注

If you've used the threading or asyncio modules, you may be familiar with queue.Queue or asyncio.Queue. In Trio, open_memory_channel() is what you use when you're looking for a queue. The main difference is that Trio splits the classic queue interface up into two objects. The advantage of this is that it makes it possible to put the two ends in different processes without rewriting your code, and that we can close the two sides separately.

MemorySendChannel and MemoryReceiveChannel also expose several more features beyond the core channel interface:

class trio.MemorySendChannel(state, closed=False, tasks=NOTHING)

基类:SendChannel[SendType]

await aclose()

Close this send channel object asynchronously.

See MemorySendChannel.close.

返回类型:

None

clone()

Clone this send channel object.

This returns a new MemorySendChannel object, which acts as a duplicate of the original: sending on the new object does exactly the same thing as sending on the old object. (If you're familiar with os.dup, then this is a similar idea.)

However, closing one of the objects does not close the other, and receivers don't get EndOfChannel until all clones have been closed.

This is useful for communication patterns that involve multiple producers all sending objects to the same destination. If you give each producer its own clone of the MemorySendChannel, and then make sure to close each MemorySendChannel when it's finished, receivers will automatically get notified when all producers are finished. See 管理多个生产者和/或多个消费者 for examples.

抛出:

trio.ClosedResourceError -- if you already closed this MemorySendChannel object.

返回类型:

MemorySendChannel[TypeVar(SendType, contravariant=True)]

close()

Close this send channel object synchronously.

All channel objects have an asynchronous ~.AsyncResource.aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.

Using with send_channel: will close the channel object on leaving the with block.

返回类型:

None

await send(value)

See SendChannel.send <trio.abc.SendChannel.send>.

Memory channels allow multiple tasks to call send at the same time.

返回类型:

None

send_nowait(value)

Like ~trio.abc.SendChannel.send, but if the channel's buffer is full, raises WouldBlock instead of blocking.

返回类型:

None

statistics()

Returns a MemoryChannelStatistics for the memory channel this is associated with.

返回类型:

MemoryChannelStatistics

class trio.MemoryReceiveChannel(state, closed=False, tasks=NOTHING)

基类:ReceiveChannel[ReceiveType]

await aclose()

Close this receive channel object asynchronously.

See MemoryReceiveChannel.close.

返回类型:

None

clone()

Clone this receive channel object.

This returns a new MemoryReceiveChannel object, which acts as a duplicate of the original: receiving on the new object does exactly the same thing as receiving on the old object.

However, closing one of the objects does not close the other, and the underlying channel is not closed until all clones are closed. (If you're familiar with os.dup, then this is a similar idea.)

This is useful for communication patterns that involve multiple consumers all receiving objects from the same underlying channel. See 管理多个生产者和/或多个消费者 for examples. :rtype: MemoryReceiveChannel[TypeVar(ReceiveType, covariant=True)]

警告

The clones all share the same underlying channel. Whenever a clone receive()s a value, it is removed from the channel and the other clones do not receive that value. If you want to send multiple copies of the same stream of values to multiple destinations, like itertools.tee(), then you need to find some other solution; this method does not do that.

抛出:

trio.ClosedResourceError -- if you already closed this MemoryReceiveChannel object.

close()

Close this receive channel object synchronously.

All channel objects have an asynchronous ~.AsyncResource.aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.

Using with receive_channel: will close the channel object on leaving the with block.

返回类型:

None

await receive()

See ReceiveChannel.receive <trio.abc.ReceiveChannel.receive>.

Memory channels allow multiple tasks to call receive at the same time. The first task will get the first item sent, the second task will get the second item sent, and so on.

返回类型:

TypeVar(ReceiveType, covariant=True)

receive_nowait()

Like ~trio.abc.ReceiveChannel.receive, but if there's nothing ready to receive, raises WouldBlock instead of blocking.

返回类型:

TypeVar(ReceiveType, covariant=True)

statistics()

Returns a MemoryChannelStatistics for the memory channel this is associated with.

返回类型:

MemoryChannelStatistics

class trio.MemoryChannelStatistics(current_buffer_used, max_buffer_size, open_send_channels, open_receive_channels, tasks_waiting_send, tasks_waiting_receive)

基类:object

一个简单的通道示例

A simple channel example

这是一个如何使用内存通道的简单示例:

import trio


async def main():
    async with trio.open_nursery() as nursery:
        # Open a channel:
        send_channel, receive_channel = trio.open_memory_channel(0)
        # Start a producer and a consumer, passing one end of the channel to
        # each of them:
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


async def producer(send_channel):
    # Producer sends 3 messages
    for i in range(3):
        # The producer sends using 'await send_channel.send(...)'
        await send_channel.send(f"message {i}")


async def consumer(receive_channel):
    # The consumer uses an 'async for' loop to receive the values:
    async for value in receive_channel:
        print(f"got value {value!r}")


trio.run(main)

如果你运行这个程序,它会打印:

got value "message 0"
got value "message 1"
got value "message 2"

然后它会一直挂起。(使用控制-C退出。)

Here's a simple example of how to use memory channels:

import trio


async def main():
    async with trio.open_nursery() as nursery:
        # Open a channel:
        send_channel, receive_channel = trio.open_memory_channel(0)
        # Start a producer and a consumer, passing one end of the channel to
        # each of them:
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


async def producer(send_channel):
    # Producer sends 3 messages
    for i in range(3):
        # The producer sends using 'await send_channel.send(...)'
        await send_channel.send(f"message {i}")


async def consumer(receive_channel):
    # The consumer uses an 'async for' loop to receive the values:
    async for value in receive_channel:
        print(f"got value {value!r}")


trio.run(main)

If you run this, it prints:

got value "message 0"
got value "message 1"
got value "message 2"

And then it hangs forever. (Use control-C to quit.)

使用通道进行干净关闭

Clean shutdown with channels

当然,我们通常不希望程序挂起。发生了什么?问题在于生产者发送了 3 条消息后退出了,但消费者无法得知生产者已经消失:对它来说,可能随时会有另一条消息到来。所以它会一直挂起,等待第四条消息。

这是修复后的新版本:它产生与前一个版本相同的输出,然后干净地退出。唯一的变化是在生产者和消费者中增加了 async with 块:

import trio


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


async def producer(send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"message {i}")


async def consumer(receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"got value {value!r}")


trio.run(main)

这里最重要的是生产者的 async with。当生产者退出时,这会关闭 send_channel,这就告诉消费者没有更多的消息要来了,因此它可以干净地退出 async for 循环。然后程序会退出,因为两个任务都已退出。

我们还在消费者中添加了一个 async with。这并不是那么重要,但它有助于我们捕捉错误或其他问题。例如,假设消费者由于某种原因提前退出了——可能是因为一个 bug。那么生产者就会把消息发送到空洞中,并可能无限期地卡住。但是,如果消费者关闭了 receive_channel,生产者就会收到一个 BrokenResourceError,提醒它应该停止发送消息,因为没有人在接收。

如果你想查看消费者提前退出的效果,可以尝试在 async for 循环中添加一个 break 语句——你应该会看到生产者收到一个 BrokenResourceError

Of course we don't generally like it when programs hang. What happened? The problem is that the producer sent 3 messages and then exited, but the consumer has no way to tell that the producer is gone: for all it knows, another message might be coming along any moment. So it hangs forever waiting for the 4th message.

Here's a new version that fixes this: it produces the same output as the previous version, and then exits cleanly. The only change is the addition of async with blocks inside the producer and consumer:

import trio


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


async def producer(send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"message {i}")


async def consumer(receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"got value {value!r}")


trio.run(main)

The really important thing here is the producer's async with . When the producer exits, this closes the send_channel, and that tells the consumer that no more messages are coming, so it can cleanly exit its async for loop. Then the program shuts down because both tasks have exited.

We also added an async with to the consumer. This isn't as important, but it can help us catch mistakes or other problems. For example, suppose that the consumer exited early for some reason – maybe because of a bug. Then the producer would be sending messages into the void, and might get stuck indefinitely. But, if the consumer closes its receive_channel, then the producer will get a BrokenResourceError to alert it that it should stop sending messages because no-one is listening.

If you want to see the effect of the consumer exiting early, try adding a break statement to the async for loop – you should see a BrokenResourceError from the producer.

管理多个生产者和/或多个消费者

Managing multiple producers and/or multiple consumers

你也可以有多个生产者和多个消费者,共享同一个通道。不过,这样做会使得关闭过程稍微复杂一些。

例如,考虑一下我们之前示例的简单扩展,现在有两个生产者和两个消费者:

# This example usually crashes!

import trio
import random


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        # Start two producers
        nursery.start_soon(producer, "A", send_channel)
        nursery.start_soon(producer, "B", send_channel)
        # And two consumers
        nursery.start_soon(consumer, "X", receive_channel)
        nursery.start_soon(consumer, "Y", receive_channel)


async def producer(name, send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"{i} from producer {name}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


async def consumer(name, receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"consumer {name} got value {value!r}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


trio.run(main)

这两个生产者 A 和 B 每个发送 3 条消息。然后这些消息会随机分配给两个消费者 X 和 Y。因此,我们希望看到类似这样的输出:

consumer Y got value '0 from producer B'
consumer X got value '0 from producer A'
consumer Y got value '1 from producer A'
consumer Y got value '1 from producer B'
consumer X got value '2 from producer B'
consumer X got value '2 from producer A'

然而,在大多数情况下,结果并非如此——输出的前一部分是正常的,但当程序运行到结束时,会因为 ClosedResourceError 崩溃。如果你运行几次这个程序,你会发现有时 tracebacks 显示是 send 崩溃,有时是 receive 崩溃,甚至有时它根本不会崩溃。

发生了什么呢?假设生产者 A 最先完成。它退出了,并且它的 async with 块关闭了 send_channel。但是等等!生产者 B 仍在使用这个 send_channel... 所以下次 B 调用 send 时,它会遇到 ClosedResourceError

然而,有时如果我们幸运的话,两个生产者可能同时完成(或者足够接近),这样它们都在关闭 send_channel 之前完成最后一次 send

但是,即使那样,我们仍然没有完全解决问题!在生产者退出后,两个消费者会争先恐后地检查 send_channel 是否已关闭。假设 X 赢得了比赛。它退出了 async for 循环,然后退出了 async with 块... 并关闭了 receive_channel,而 Y 仍在使用它。再次,这会导致崩溃。

我们本可以通过一些复杂的账务处理来确保只有 最后 一个生产者和 最后 一个消费者关闭它们的通道端点... 但那样做会非常繁琐且不可靠。幸运的是,有更好的方法!这是我们上面程序的修复版本:

import trio
import random


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        async with send_channel, receive_channel:
            # Start two producers, giving each its own private clone
            nursery.start_soon(producer, "A", send_channel.clone())
            nursery.start_soon(producer, "B", send_channel.clone())
            # And two consumers, giving each its own private clone
            nursery.start_soon(consumer, "X", receive_channel.clone())
            nursery.start_soon(consumer, "Y", receive_channel.clone())


async def producer(name, send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"{i} from producer {name}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


async def consumer(name, receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"consumer {name} got value {value!r}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


trio.run(main)

这个示例演示了如何使用 MemorySendChannel.cloneMemoryReceiveChannel.clone 方法。它们的作用是创建通道端点的副本,这些副本就像原始端点一样工作——唯一的区别是它们可以独立地关闭。而且,只有在 所有 副本都关闭之后,底层的通道才会关闭。因此,这完全解决了我们的关闭问题,如果你运行这个程序,你会看到它打印六行输出后干净地退出。

请注意我们使用的一个小技巧:main 中的代码创建了克隆对象并将它们传递给所有子任务,然后使用 async with 关闭原始对象。另一种选择是将副本传递给所有任务(除了最后一个),然后将原始对象传递给最后一个任务,如下所示:

# 也有效,但更容易出错:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel)
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel)

但是这样更容易出错,尤其是在你使用循环来启动生产者/消费者时。

请确保不要写:

# 错误,程序将挂起:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel.clone())
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel.clone())

在这里,我们将副本传递给任务,但从未关闭原始对象。这意味着我们有 3 个发送通道对象(原始对象 + 两个副本),但我们只关闭了 2 个它们,因此消费者将永远等待最后一个通道被关闭。

You can also have multiple producers, and multiple consumers, all sharing the same channel. However, this makes shutdown a little more complicated.

For example, consider this naive extension of our previous example, now with two producers and two consumers:

# This example usually crashes!

import trio
import random


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        # Start two producers
        nursery.start_soon(producer, "A", send_channel)
        nursery.start_soon(producer, "B", send_channel)
        # And two consumers
        nursery.start_soon(consumer, "X", receive_channel)
        nursery.start_soon(consumer, "Y", receive_channel)


async def producer(name, send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"{i} from producer {name}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


async def consumer(name, receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"consumer {name} got value {value!r}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


trio.run(main)

The two producers, A and B, send 3 messages apiece. These are then randomly distributed between the two consumers, X and Y. So we're hoping to see some output like:

consumer Y got value '0 from producer B'
consumer X got value '0 from producer A'
consumer Y got value '1 from producer A'
consumer Y got value '1 from producer B'
consumer X got value '2 from producer B'
consumer X got value '2 from producer A'

However, on most runs, that's not what happens – the first part of the output is OK, and then when we get to the end the program crashes with ClosedResourceError. If you run the program a few times, you'll see that sometimes the traceback shows send crashing, and other times it shows receive crashing, and you might even find that on some runs it doesn't crash at all.

Here's what's happening: suppose that producer A finishes first. It exits, and its async with block closes the send_channel. But wait! Producer B was still using that send_channel... so the next time B calls send, it gets a ClosedResourceError.

Sometimes, though if we're lucky, the two producers might finish at the same time (or close enough), so they both make their last send before either of them closes the send_channel.

But, even if that happens, we're not out of the woods yet! After the producers exit, the two consumers race to be the first to notice that the send_channel has closed. Suppose that X wins the race. It exits its async for loop, then exits the async with block... and closes the receive_channel, while Y is still using it. Again, this causes a crash.

We could avoid this by using some complicated bookkeeping to make sure that only the last producer and the last consumer close their channel endpoints... but that would be tiresome and fragile. Fortunately, there's a better way! Here's a fixed version of our program above:

import trio
import random


async def main():
    async with trio.open_nursery() as nursery:
        send_channel, receive_channel = trio.open_memory_channel(0)
        async with send_channel, receive_channel:
            # Start two producers, giving each its own private clone
            nursery.start_soon(producer, "A", send_channel.clone())
            nursery.start_soon(producer, "B", send_channel.clone())
            # And two consumers, giving each its own private clone
            nursery.start_soon(consumer, "X", receive_channel.clone())
            nursery.start_soon(consumer, "Y", receive_channel.clone())


async def producer(name, send_channel):
    async with send_channel:
        for i in range(3):
            await send_channel.send(f"{i} from producer {name}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


async def consumer(name, receive_channel):
    async with receive_channel:
        async for value in receive_channel:
            print(f"consumer {name} got value {value!r}")
            # Random sleeps help trigger the problem more reliably
            await trio.sleep(random.random())


trio.run(main)

This example demonstrates using the MemorySendChannel.clone and MemoryReceiveChannel.clone methods. What these do is create copies of our endpoints, that act just like the original – except that they can be closed independently. And the underlying channel is only closed after all the clones have been closed. So this completely solves our problem with shutdown, and if you run this program, you'll see it print its six lines of output and then exits cleanly.

Notice a small trick we use: the code in main creates clone objects to pass into all the child tasks, and then closes the original objects using async with. Another option is to pass clones into all-but-one of the child tasks, and then pass the original object into the last task, like:

# Also works, but is more finicky:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel)
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel)

But this is more error-prone, especially if you use a loop to spawn the producers/consumers.

Just make sure that you don't write:

# Broken, will cause program to hang:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel.clone())
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel.clone())

Here we pass clones into the tasks, but never close the original objects. That means we have 3 send channel objects (the original + two clones), but we only close 2 of them, so the consumers will hang around forever waiting for that last one to be closed.

在通道中缓冲

Buffering in channels

当你调用 open_memory_channel() 时,必须指定通道中可以缓冲多少个值。如果缓冲区已满,那么任何调用 send() 的任务都会停止,并等待另一个任务调用 receive()。这是有用的,因为它会产生 背压:如果通道的生产者运行速度快于消费者,它会迫使生产者放慢速度。

你可以通过 open_memory_channel(0) 完全禁用缓冲。在这种情况下,任何调用 send() 的任务都会等待,直到另一个任务调用 receive(),反之亦然。这类似于 经典的通信顺序进程模型 中通道的工作方式,并且如果你不确定应该使用多大的缓冲区,这是一个合理的默认值。(这就是我们在上面示例中使用它的原因。)

在另一个极端,你可以通过使用 open_memory_channel(math.inf) 来使缓冲区无限大。在这种情况下,send() 始终 会立即返回。通常,这不是一个好主意。为了看清楚为什么,考虑一个生产者运行速度快于消费者的程序:

# Simulate a producer that generates values 10x faster than the
# consumer can handle them.

import trio
import math


async def producer(send_channel):
    count = 0
    while True:
        # Pretend that we have to do some work to create this message, and it
        # takes 0.1 seconds:
        await trio.sleep(0.1)
        await send_channel.send(count)
        print("Sent message:", count)
        count += 1


async def consumer(receive_channel):
    async for value in receive_channel:
        print("Received message:", value)
        # Pretend that we have to do some work to handle this message, and it
        # takes 1 second
        await trio.sleep(1)


async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


trio.run(main)

如果你运行这个程序,你会看到类似这样的输出:

Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Sent message: 4
Sent message: 5
Sent message: 6
Sent message: 7
Sent message: 8
Sent message: 9
Received message: 1
Sent message: 10
Sent message: 11
Sent message: 12
...

平均而言,生产者每秒发送十条消息,但消费者每秒只调用一次 receive。这意味着每秒,通道的内部缓冲区必须增长以容纳额外的九个项。经过一分钟,缓冲区将包含大约 540 条消息;经过一小时,它将增加到大约 32,400 条。最终,程序将耗尽内存。在我们耗尽内存之前,处理单个消息的延迟将变得极其糟糕。例如,在一分钟时,生产者正在发送大约第 600 条消息,但消费者仍在处理第 60 条消息。第 600 条消息将在通道中等待大约 9 分钟,直到消费者赶上并处理它。

现在,尝试将 open_memory_channel(math.inf) 替换为 open_memory_channel(0),然后再次运行它。我们会看到类似这样的输出:

Sent message: 0
Received message: 0
Received message: 1
Sent message: 1
Received message: 2
Sent message: 2
Sent message: 3
Received message: 3
...

现在, send 调用会等待 receive 调用完成,这迫使生产者放慢速度以匹配消费者的速度。(可能看起来有些奇怪的是,一些值在报告为 "Sent" 之前先报告为 "Received";这是因为实际的发送/接收操作是同时发生的,因此哪一行先打印是随机的。)

现在,让我们尝试设置一个小但非零的缓冲区大小,如 open_memory_channel(3)。你认为会发生什么?

我得到的是:

Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Received message: 1
Sent message: 4
Received message: 2
Sent message: 5
...

所以你可以看到,生产者提前发送了 3 条消息,然后停止等待:当消费者读取消息 1 时,它发送消息 4,然后当消费者读取消息 2 时,它发送消息 5,依此类推。一旦它达到稳定状态,这个版本的行为就像我们之前将缓冲区大小设置为 0 的版本一样,只是它使用了更多的内存,并且每条消息在被处理之前在缓冲区中停留的时间更长(即消息的延迟更高)。

当然,真实的生产者和消费者通常比这更复杂,在某些情况下,适量的缓冲可能会提高吞吐量。但过多的缓冲会浪费内存并增加延迟,因此如果你想调整应用程序的性能,应该进行实验,以找出最适合你的缓冲区大小。

那么我们为什么还要支持无限缓冲呢? 好问题!尽管我们上面看到的所有问题,在某些情况下,你确实需要一个无限缓冲区。例如,考虑一个使用通道跟踪所有它仍然想抓取的 URL 的网页爬虫。每个爬虫运行一个循环,它从通道中获取一个 URL,抓取它,检查 HTML 中的外部链接,然后将新的 URL 添加到通道中。这创建了一个 循环流,其中每个消费者也是一个生产者。在这种情况下,如果你的通道缓冲区满了,那么爬虫在尝试将新 URL 添加到通道时会被阻塞,如果所有的爬虫都被阻塞,那么它们就无法从通道中取出任何 URL,因此它们永远会陷入死锁。使用无限通道可以避免这种情况,因为它意味着 send() 永远不会阻塞。

When you call open_memory_channel(), you have to specify how many values can be buffered internally in the channel. If the buffer is full, then any task that calls send() will stop and wait for another task to call receive(). This is useful because it produces backpressure: if the channel producers are running faster than the consumers, then it forces the producers to slow down.

You can disable buffering entirely, by doing open_memory_channel(0). In that case any task that calls send() will wait until another task calls receive(), and vice versa. This is similar to how channels work in the classic Communicating Sequential Processes model, and is a reasonable default if you aren't sure what size buffer to use. (That's why we used it in the examples above.)

At the other extreme, you can make the buffer unbounded by using open_memory_channel(math.inf). In this case, send() always returns immediately. Normally, this is a bad idea. To see why, consider a program where the producer runs more quickly than the consumer:

# Simulate a producer that generates values 10x faster than the
# consumer can handle them.

import trio
import math


async def producer(send_channel):
    count = 0
    while True:
        # Pretend that we have to do some work to create this message, and it
        # takes 0.1 seconds:
        await trio.sleep(0.1)
        await send_channel.send(count)
        print("Sent message:", count)
        count += 1


async def consumer(receive_channel):
    async for value in receive_channel:
        print("Received message:", value)
        # Pretend that we have to do some work to handle this message, and it
        # takes 1 second
        await trio.sleep(1)


async def main():
    send_channel, receive_channel = trio.open_memory_channel(math.inf)
    async with trio.open_nursery() as nursery:
        nursery.start_soon(producer, send_channel)
        nursery.start_soon(consumer, receive_channel)


trio.run(main)

If you run this program, you'll see output like:

Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Sent message: 4
Sent message: 5
Sent message: 6
Sent message: 7
Sent message: 8
Sent message: 9
Received message: 1
Sent message: 10
Sent message: 11
Sent message: 12
...

On average, the producer sends ten messages per second, but the consumer only calls receive once per second. That means that each second, the channel's internal buffer has to grow to hold an extra nine items. After a minute, the buffer will have ~540 items in it; after an hour, that grows to ~32,400. Eventually, the program will run out of memory. And well before we run out of memory, our latency on handling individual messages will become abysmal. For example, at the one minute mark, the producer is sending message ~600, but the consumer is still processing message ~60. Message 600 will have to sit in the channel for ~9 minutes before the consumer catches up and processes it.

Now try replacing open_memory_channel(math.inf) with open_memory_channel(0), and run it again. We get output like:

Sent message: 0
Received message: 0
Received message: 1
Sent message: 1
Received message: 2
Sent message: 2
Sent message: 3
Received message: 3
...

Now the send calls wait for the receive calls to finish, which forces the producer to slow down to match the consumer's speed. (It might look strange that some values are reported as "Received" before they're reported as "Sent"; this happens because the actual send/receive happen at the same time, so which line gets printed first is random.)

Now, let's try setting a small but nonzero buffer size, like open_memory_channel(3). what do you think will happen?

I get:

Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Received message: 1
Sent message: 4
Received message: 2
Sent message: 5
...

So you can see that the producer runs ahead by 3 messages, and then stops to wait: when the consumer reads message 1, it sends message 4, then when the consumer reads message 2, it sends message 5, and so on. Once it reaches the steady state, this version acts just like our previous version where we set the buffer size to 0, except that it uses a bit more memory and each message sits in the buffer for a bit longer before being processed (i.e., the message latency is higher).

Of course real producers and consumers are usually more complicated than this, and in some situations, a modest amount of buffering might improve throughput. But too much buffering wastes memory and increases latency, so if you want to tune your application you should experiment to see what value works best for you.

Why do we even support unbounded buffers then? Good question! Despite everything we saw above, there are times when you actually do need an unbounded buffer. For example, consider a web crawler that uses a channel to keep track of all the URLs it still wants to crawl. Each crawler runs a loop where it takes a URL from the channel, fetches it, checks the HTML for outgoing links, and then adds the new URLs to the channel. This creates a circular flow, where each consumer is also a producer. In this case, if your channel buffer gets full, then the crawlers will block when they try to add new URLs to the channel, and if all the crawlers got blocked, then they aren't taking any URLs out of the channel, so they're stuck forever in a deadlock. Using an unbounded channel avoids this, because it means that send() never blocks.

较低级别的同步原语

Lower-level synchronization primitives

就我个人而言,我发现事件和通道通常足够实现我关心的大多数功能,而且它们比本节中讨论的低级原语更易于阅读代码。但如果你需要它们,它们在这里。(如果你发现自己在使用这些原语来实现新的更高级别的同步原语,那么你可能还想查看 trio.lowlevel 中的设施,以便更直接地接触 Trio 的底层同步逻辑。本节中讨论的所有类都是基于 trio.lowlevel 中的公共 API 实现的;它们没有对 Trio 内部的特殊访问权限。)

class trio.CapacityLimiter(total_tokens)

基类:AsyncContextManagerMixin

An object for controlling access to a resource with limited capacity.

Sometimes you need to put a limit on how many tasks can do something at the same time. For example, you might want to use some threads to run multiple blocking I/O operations in parallel... but if you use too many threads at once, then your system can become overloaded and it'll actually make things slower. One popular solution is to impose a policy like "run up to 40 threads at the same time, but no more". But how do you implement a policy like this?

That's what CapacityLimiter is for. You can think of a CapacityLimiter object as a sack that starts out holding some fixed number of tokens:

limit = trio.CapacityLimiter(40)

Then tasks can come along and borrow a token out of the sack:

# Borrow a token:
async with limit:
    # We are holding a token!
    await perform_expensive_operation()
# Exiting the 'async with' block puts the token back into the sack

And crucially, if you try to borrow a token but the sack is empty, then you have to wait for another task to finish what it's doing and put its token back first before you can take it and continue.

Another way to think of it: a CapacityLimiter is like a sofa with a fixed number of seats, and if they're all taken then you have to wait for someone to get up before you can sit down.

By default, trio.to_thread.run_sync() uses a CapacityLimiter to limit the number of threads running at once; see trio.to_thread.current_default_thread_limiter for details.

If you're familiar with semaphores, then you can think of this as a restricted semaphore that's specialized for one common use case, with additional error checking. For a more traditional semaphore, see Semaphore.

备注

Don't confuse this with the "leaky bucket" or "token bucket" algorithms used to limit bandwidth usage on networks. The basic idea of using tokens to track a resource limit is similar, but this is a very simple sack where tokens aren't automatically created or destroyed over time; they're just borrowed and then put back.

await acquire()

Borrow a token from the sack, blocking if necessary.

抛出:

RuntimeError -- if the current task already holds one of this sack's tokens.

返回类型:

None

acquire_nowait()

Borrow a token from the sack, without blocking.

抛出:
  • WouldBlock -- if no tokens are available.

  • RuntimeError -- if the current task already holds one of this sack's tokens.

返回类型:

None

await acquire_on_behalf_of(borrower)

Borrow a token from the sack on behalf of borrower, blocking if necessary.

参数:

borrower (Task | object) -- A trio.lowlevel.Task or arbitrary opaque object used to record who is borrowing this token; see acquire_on_behalf_of_nowait() for details.

抛出:

RuntimeError -- if borrower task already holds one of this sack's tokens.

返回类型:

None

acquire_on_behalf_of_nowait(borrower)

Borrow a token from the sack on behalf of borrower, without blocking.

参数:

borrower (Task | object) -- A trio.lowlevel.Task or arbitrary opaque object used to record who is borrowing this token. This is used by trio.to_thread.run_sync() to allow threads to "hold tokens", with the intention in the future of using it to allow deadlock detection and other useful things

抛出:
  • WouldBlock -- if no tokens are available.

  • RuntimeError -- if borrower already holds one of this sack's tokens.

返回类型:

None

property available_tokens: int | float

The amount of capacity that's available to use.

property borrowed_tokens: int

The amount of capacity that's currently in use.

release()

Put a token back into the sack.

抛出:

RuntimeError -- if the current task has not acquired one of this sack's tokens.

返回类型:

None

release_on_behalf_of(borrower)

Put a token back into the sack on behalf of borrower.

抛出:

RuntimeError -- if the given borrower has not acquired one of this sack's tokens.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: CapacityLimiterStatistics

  • borrowed_tokens: The number of tokens currently borrowed from the sack.

  • total_tokens: The total number of tokens in the sack. Usually this will be larger than borrowed_tokens, but it's possibly for it to be smaller if total_tokens was recently decreased.

  • borrowers: A list of all tasks or other entities that currently hold a token.

  • tasks_waiting: The number of tasks blocked on this CapacityLimiter's acquire() or acquire_on_behalf_of() methods.

property total_tokens: int | float

The total capacity available.

You can change total_tokens by assigning to this attribute. If you make it larger, then the appropriate number of waiting tasks will be woken immediately to take the new tokens. If you decrease total_tokens below the number of tasks that are currently using the resource, then all current tasks will be allowed to finish as normal, but no new tasks will be allowed in until the total number of tasks drops below the new total_tokens.

class trio.Semaphore(initial_value, *, max_value=None)

基类:AsyncContextManagerMixin

A semaphore.

A semaphore holds an integer value, which can be incremented by calling release() and decremented by calling acquire() – but the value is never allowed to drop below zero. If the value is zero, then acquire() will block until someone calls release().

If you're looking for a Semaphore to limit the number of tasks that can access some resource simultaneously, then consider using a CapacityLimiter instead.

This object's interface is similar to, but different from, that of threading.Semaphore.

A Semaphore object can be used as an async context manager; it blocks on entry but not on exit.

参数:
  • initial_value (int) -- A non-negative integer giving semaphore's initial value.

  • max_value (int or None) -- If given, makes this a "bounded" semaphore that raises an error if the value is about to exceed the given max_value.

await acquire()

Decrement the semaphore value, blocking if necessary to avoid letting it drop below zero.

返回类型:

None

acquire_nowait()

Attempt to decrement the semaphore value, without blocking.

抛出:

WouldBlock -- if the value is zero.

返回类型:

None

property max_value: int | None

The maximum allowed value. May be None to indicate no limit.

release()

Increment the semaphore value, possibly waking a task blocked in acquire().

抛出:

ValueError -- if incrementing the value would cause it to exceed max_value.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: ParkingLotStatistics

  • tasks_waiting: The number of tasks blocked on this semaphore's acquire() method.

property value: int

The current value of the semaphore.

class trio.Lock

基类:_LockImpl

A classic mutex.

This is a non-reentrant, single-owner lock. Unlike threading.Lock, only the owner of the lock is allowed to release it.

A Lock object can be used as an async context manager; it blocks on entry but not on exit.

await acquire()

Acquire the lock, blocking if necessary.

抛出:

BrokenResourceError -- if the owner of the lock exits without releasing.

返回类型:

None

acquire_nowait()

Attempt to acquire the lock, without blocking.

抛出:

WouldBlock -- if the lock is held.

返回类型:

None

locked()

Check whether the lock is currently held.

返回:

True if the lock is held, False otherwise.

返回类型:

bool

release()

Release the lock.

抛出:

RuntimeError -- if the calling task does not hold the lock.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: LockStatistics

  • locked: boolean indicating whether the lock is held.

  • owner: the trio.lowlevel.Task currently holding the lock, or None if the lock is not held.

  • tasks_waiting: The number of tasks blocked on this lock's acquire() method.

class trio.StrictFIFOLock

基类:_LockImpl

A variant of Lock where tasks are guaranteed to acquire the lock in strict first-come-first-served order.

An example of when this is useful is if you're implementing something like trio.SSLStream or an HTTP/2 server using h2, where you have multiple concurrent tasks that are interacting with a shared state machine, and at unpredictable moments the state machine requests that a chunk of data be sent over the network. (For example, when using h2 simply reading incoming data can occasionally create outgoing data to send.) The challenge is to make sure that these chunks are sent in the correct order, without being garbled.

One option would be to use a regular Lock, and wrap it around every interaction with the state machine:

# This approach is sometimes workable but often sub-optimal; see below
async with lock:
    state_machine.do_something()
    if state_machine.has_data_to_send():
        await conn.sendall(state_machine.get_data_to_send())

But this can be problematic. If you're using h2 then usually reading incoming data doesn't create the need to send any data, so we don't want to force every task that tries to read from the network to sit and wait a potentially long time for sendall to finish. And in some situations this could even potentially cause a deadlock, if the remote peer is waiting for you to read some data before it accepts the data you're sending.

StrictFIFOLock provides an alternative. We can rewrite our example like:

# Note: no awaits between when we start using the state machine and
# when we block to take the lock!
state_machine.do_something()
if state_machine.has_data_to_send():
    # Notice that we fetch the data to send out of the state machine
    # *before* sleeping, so that other tasks won't see it.
    chunk = state_machine.get_data_to_send()
    async with strict_fifo_lock:
        await conn.sendall(chunk)

First we do all our interaction with the state machine in a single scheduling quantum (notice there are no awaits in there), so it's automatically atomic with respect to other tasks. And then if and only if we have data to send, we get in line to send it – and StrictFIFOLock guarantees that each task will send its data in the same order that the state machine generated it.

Currently, StrictFIFOLock is identical to Lock, but (a) this may not always be true in the future, especially if Trio ever implements more sophisticated scheduling policies, and (b) the above code is relying on a pretty subtle property of its lock. Using a StrictFIFOLock acts as an executable reminder that you're relying on this property.

class trio.Condition(lock=None)

基类:AsyncContextManagerMixin

A classic condition variable, similar to threading.Condition.

A Condition object can be used as an async context manager to acquire the underlying lock; it blocks on entry but not on exit.

参数:

lock (Lock) -- the lock object to use. If given, must be a trio.Lock. If None, a new Lock will be allocated and used.

await acquire()

Acquire the underlying lock, blocking if necessary.

抛出:

BrokenResourceError -- if the owner of the underlying lock exits without releasing.

返回类型:

None

acquire_nowait()

Attempt to acquire the underlying lock, without blocking.

抛出:

WouldBlock -- if the lock is currently held.

返回类型:

None

locked()

Check whether the underlying lock is currently held.

返回:

True if the lock is held, False otherwise.

返回类型:

bool

notify(n=1)

Wake one or more tasks that are blocked in wait().

参数:

n (int) -- The number of tasks to wake.

抛出:

RuntimeError -- if the calling task does not hold the lock.

返回类型:

None

notify_all()

Wake all tasks that are currently blocked in wait().

抛出:

RuntimeError -- if the calling task does not hold the lock.

返回类型:

None

release()

Release the underlying lock.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: ConditionStatistics

  • tasks_waiting: The number of tasks blocked on this condition's wait() method.

  • lock_statistics: The result of calling the underlying Locks statistics() method.

await wait()

Wait for another task to call notify() or notify_all().

When calling this method, you must hold the lock. It releases the lock while waiting, and then re-acquires it before waking up.

There is a subtlety with how this method interacts with cancellation: when cancelled it will block to re-acquire the lock before raising Cancelled. This may cause cancellation to be less prompt than expected. The advantage is that it makes code like this work:

async with condition:
    await condition.wait()

If we didn't re-acquire the lock before waking up, and wait() were cancelled here, then we'd crash in condition.__aexit__ when we tried to release the lock we no longer held.

抛出:
  • RuntimeError -- if the calling task does not hold the lock.

  • BrokenResourceError -- if the owner of the lock exits without releasing, when attempting to re-acquire.

返回类型:

None

这些原语返回可以被检查的统计对象。

class trio.CapacityLimiterStatistics(borrowed_tokens, total_tokens, borrowers, tasks_waiting)

基类:object

An object containing debugging information.

Currently the following fields are defined:

class trio.LockStatistics(locked, owner, tasks_waiting)

基类:object

An object containing debugging information for a Lock.

Currently the following fields are defined:

  • locked (boolean): indicating whether the lock is held.

  • owner: the trio.lowlevel.Task currently holding the lock, or None if the lock is not held.

  • tasks_waiting (int): The number of tasks blocked on this lock's trio.Lock.acquire() method.

class trio.ConditionStatistics(tasks_waiting, lock_statistics)

基类:object

An object containing debugging information for a Condition.

Currently the following fields are defined:

Personally, I find that events and channels are usually enough to implement most things I care about, and lead to easier to read code than the lower-level primitives discussed in this section. But if you need them, they're here. (If you find yourself reaching for these because you're trying to implement a new higher-level synchronization primitive, then you might also want to check out the facilities in trio.lowlevel for a more direct exposure of Trio's underlying synchronization logic. All of classes discussed in this section are implemented on top of the public APIs in trio.lowlevel; they don't have any special access to Trio's internals.)

class trio.CapacityLimiter(total_tokens)

基类:AsyncContextManagerMixin

An object for controlling access to a resource with limited capacity.

Sometimes you need to put a limit on how many tasks can do something at the same time. For example, you might want to use some threads to run multiple blocking I/O operations in parallel... but if you use too many threads at once, then your system can become overloaded and it'll actually make things slower. One popular solution is to impose a policy like "run up to 40 threads at the same time, but no more". But how do you implement a policy like this?

That's what CapacityLimiter is for. You can think of a CapacityLimiter object as a sack that starts out holding some fixed number of tokens:

limit = trio.CapacityLimiter(40)

Then tasks can come along and borrow a token out of the sack:

# Borrow a token:
async with limit:
    # We are holding a token!
    await perform_expensive_operation()
# Exiting the 'async with' block puts the token back into the sack

And crucially, if you try to borrow a token but the sack is empty, then you have to wait for another task to finish what it's doing and put its token back first before you can take it and continue.

Another way to think of it: a CapacityLimiter is like a sofa with a fixed number of seats, and if they're all taken then you have to wait for someone to get up before you can sit down.

By default, trio.to_thread.run_sync() uses a CapacityLimiter to limit the number of threads running at once; see trio.to_thread.current_default_thread_limiter for details.

If you're familiar with semaphores, then you can think of this as a restricted semaphore that's specialized for one common use case, with additional error checking. For a more traditional semaphore, see Semaphore.

备注

Don't confuse this with the "leaky bucket" or "token bucket" algorithms used to limit bandwidth usage on networks. The basic idea of using tokens to track a resource limit is similar, but this is a very simple sack where tokens aren't automatically created or destroyed over time; they're just borrowed and then put back.

await acquire()

Borrow a token from the sack, blocking if necessary.

抛出:

RuntimeError -- if the current task already holds one of this sack's tokens.

返回类型:

None

acquire_nowait()

Borrow a token from the sack, without blocking.

抛出:
  • WouldBlock -- if no tokens are available.

  • RuntimeError -- if the current task already holds one of this sack's tokens.

返回类型:

None

await acquire_on_behalf_of(borrower)

Borrow a token from the sack on behalf of borrower, blocking if necessary.

参数:

borrower (Task | object) -- A trio.lowlevel.Task or arbitrary opaque object used to record who is borrowing this token; see acquire_on_behalf_of_nowait() for details.

抛出:

RuntimeError -- if borrower task already holds one of this sack's tokens.

返回类型:

None

acquire_on_behalf_of_nowait(borrower)

Borrow a token from the sack on behalf of borrower, without blocking.

参数:

borrower (Task | object) -- A trio.lowlevel.Task or arbitrary opaque object used to record who is borrowing this token. This is used by trio.to_thread.run_sync() to allow threads to "hold tokens", with the intention in the future of using it to allow deadlock detection and other useful things

抛出:
  • WouldBlock -- if no tokens are available.

  • RuntimeError -- if borrower already holds one of this sack's tokens.

返回类型:

None

property available_tokens: int | float

The amount of capacity that's available to use.

property borrowed_tokens: int

The amount of capacity that's currently in use.

release()

Put a token back into the sack.

抛出:

RuntimeError -- if the current task has not acquired one of this sack's tokens.

返回类型:

None

release_on_behalf_of(borrower)

Put a token back into the sack on behalf of borrower.

抛出:

RuntimeError -- if the given borrower has not acquired one of this sack's tokens.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: CapacityLimiterStatistics

  • borrowed_tokens: The number of tokens currently borrowed from the sack.

  • total_tokens: The total number of tokens in the sack. Usually this will be larger than borrowed_tokens, but it's possibly for it to be smaller if total_tokens was recently decreased.

  • borrowers: A list of all tasks or other entities that currently hold a token.

  • tasks_waiting: The number of tasks blocked on this CapacityLimiter's acquire() or acquire_on_behalf_of() methods.

property total_tokens: int | float

The total capacity available.

You can change total_tokens by assigning to this attribute. If you make it larger, then the appropriate number of waiting tasks will be woken immediately to take the new tokens. If you decrease total_tokens below the number of tasks that are currently using the resource, then all current tasks will be allowed to finish as normal, but no new tasks will be allowed in until the total number of tasks drops below the new total_tokens.

class trio.Semaphore(initial_value, *, max_value=None)

基类:AsyncContextManagerMixin

A semaphore.

A semaphore holds an integer value, which can be incremented by calling release() and decremented by calling acquire() – but the value is never allowed to drop below zero. If the value is zero, then acquire() will block until someone calls release().

If you're looking for a Semaphore to limit the number of tasks that can access some resource simultaneously, then consider using a CapacityLimiter instead.

This object's interface is similar to, but different from, that of threading.Semaphore.

A Semaphore object can be used as an async context manager; it blocks on entry but not on exit.

参数:
  • initial_value (int) -- A non-negative integer giving semaphore's initial value.

  • max_value (int or None) -- If given, makes this a "bounded" semaphore that raises an error if the value is about to exceed the given max_value.

await acquire()

Decrement the semaphore value, blocking if necessary to avoid letting it drop below zero.

返回类型:

None

acquire_nowait()

Attempt to decrement the semaphore value, without blocking.

抛出:

WouldBlock -- if the value is zero.

返回类型:

None

property max_value: int | None

The maximum allowed value. May be None to indicate no limit.

release()

Increment the semaphore value, possibly waking a task blocked in acquire().

抛出:

ValueError -- if incrementing the value would cause it to exceed max_value.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: ParkingLotStatistics

  • tasks_waiting: The number of tasks blocked on this semaphore's acquire() method.

property value: int

The current value of the semaphore.

class trio.Lock

基类:_LockImpl

A classic mutex.

This is a non-reentrant, single-owner lock. Unlike threading.Lock, only the owner of the lock is allowed to release it.

A Lock object can be used as an async context manager; it blocks on entry but not on exit.

await acquire()

Acquire the lock, blocking if necessary.

抛出:

BrokenResourceError -- if the owner of the lock exits without releasing.

返回类型:

None

acquire_nowait()

Attempt to acquire the lock, without blocking.

抛出:

WouldBlock -- if the lock is held.

返回类型:

None

locked()

Check whether the lock is currently held.

返回:

True if the lock is held, False otherwise.

返回类型:

bool

release()

Release the lock.

抛出:

RuntimeError -- if the calling task does not hold the lock.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: LockStatistics

  • locked: boolean indicating whether the lock is held.

  • owner: the trio.lowlevel.Task currently holding the lock, or None if the lock is not held.

  • tasks_waiting: The number of tasks blocked on this lock's acquire() method.

class trio.StrictFIFOLock

基类:_LockImpl

A variant of Lock where tasks are guaranteed to acquire the lock in strict first-come-first-served order.

An example of when this is useful is if you're implementing something like trio.SSLStream or an HTTP/2 server using h2, where you have multiple concurrent tasks that are interacting with a shared state machine, and at unpredictable moments the state machine requests that a chunk of data be sent over the network. (For example, when using h2 simply reading incoming data can occasionally create outgoing data to send.) The challenge is to make sure that these chunks are sent in the correct order, without being garbled.

One option would be to use a regular Lock, and wrap it around every interaction with the state machine:

# This approach is sometimes workable but often sub-optimal; see below
async with lock:
    state_machine.do_something()
    if state_machine.has_data_to_send():
        await conn.sendall(state_machine.get_data_to_send())

But this can be problematic. If you're using h2 then usually reading incoming data doesn't create the need to send any data, so we don't want to force every task that tries to read from the network to sit and wait a potentially long time for sendall to finish. And in some situations this could even potentially cause a deadlock, if the remote peer is waiting for you to read some data before it accepts the data you're sending.

StrictFIFOLock provides an alternative. We can rewrite our example like:

# Note: no awaits between when we start using the state machine and
# when we block to take the lock!
state_machine.do_something()
if state_machine.has_data_to_send():
    # Notice that we fetch the data to send out of the state machine
    # *before* sleeping, so that other tasks won't see it.
    chunk = state_machine.get_data_to_send()
    async with strict_fifo_lock:
        await conn.sendall(chunk)

First we do all our interaction with the state machine in a single scheduling quantum (notice there are no awaits in there), so it's automatically atomic with respect to other tasks. And then if and only if we have data to send, we get in line to send it – and StrictFIFOLock guarantees that each task will send its data in the same order that the state machine generated it.

Currently, StrictFIFOLock is identical to Lock, but (a) this may not always be true in the future, especially if Trio ever implements more sophisticated scheduling policies, and (b) the above code is relying on a pretty subtle property of its lock. Using a StrictFIFOLock acts as an executable reminder that you're relying on this property.

class trio.Condition(lock=None)

基类:AsyncContextManagerMixin

A classic condition variable, similar to threading.Condition.

A Condition object can be used as an async context manager to acquire the underlying lock; it blocks on entry but not on exit.

参数:

lock (Lock) -- the lock object to use. If given, must be a trio.Lock. If None, a new Lock will be allocated and used.

await acquire()

Acquire the underlying lock, blocking if necessary.

抛出:

BrokenResourceError -- if the owner of the underlying lock exits without releasing.

返回类型:

None

acquire_nowait()

Attempt to acquire the underlying lock, without blocking.

抛出:

WouldBlock -- if the lock is currently held.

返回类型:

None

locked()

Check whether the underlying lock is currently held.

返回:

True if the lock is held, False otherwise.

返回类型:

bool

notify(n=1)

Wake one or more tasks that are blocked in wait().

参数:

n (int) -- The number of tasks to wake.

抛出:

RuntimeError -- if the calling task does not hold the lock.

返回类型:

None

notify_all()

Wake all tasks that are currently blocked in wait().

抛出:

RuntimeError -- if the calling task does not hold the lock.

返回类型:

None

release()

Release the underlying lock.

返回类型:

None

statistics()

Return an object containing debugging information.

Currently the following fields are defined: :rtype: ConditionStatistics

  • tasks_waiting: The number of tasks blocked on this condition's wait() method.

  • lock_statistics: The result of calling the underlying Locks statistics() method.

await wait()

Wait for another task to call notify() or notify_all().

When calling this method, you must hold the lock. It releases the lock while waiting, and then re-acquires it before waking up.

There is a subtlety with how this method interacts with cancellation: when cancelled it will block to re-acquire the lock before raising Cancelled. This may cause cancellation to be less prompt than expected. The advantage is that it makes code like this work:

async with condition:
    await condition.wait()

If we didn't re-acquire the lock before waking up, and wait() were cancelled here, then we'd crash in condition.__aexit__ when we tried to release the lock we no longer held.

抛出:
  • RuntimeError -- if the calling task does not hold the lock.

  • BrokenResourceError -- if the owner of the lock exits without releasing, when attempting to re-acquire.

返回类型:

None

These primitives return statistics objects that can be inspected.

class trio.CapacityLimiterStatistics(borrowed_tokens, total_tokens, borrowers, tasks_waiting)

基类:object

An object containing debugging information.

Currently the following fields are defined:

class trio.LockStatistics(locked, owner, tasks_waiting)

基类:object

An object containing debugging information for a Lock.

Currently the following fields are defined:

  • locked (boolean): indicating whether the lock is held.

  • owner: the trio.lowlevel.Task currently holding the lock, or None if the lock is not held.

  • tasks_waiting (int): The number of tasks blocked on this lock's trio.Lock.acquire() method.

class trio.ConditionStatistics(tasks_waiting, lock_statistics)

基类:object

An object containing debugging information for a Condition.

Currently the following fields are defined: