任务同步和通信¶
Synchronizing and communicating between tasks
Trio 提供了一组标准的同步和任务间通信原语。这些对象的 API 通常是根据标准库中的类似类建模的,但也存在一些差异。
Trio provides a standard set of synchronization and inter-task communication primitives. These objects' APIs are generally modelled off of the analogous classes in the standard library, but with some differences.
阻塞和非阻塞方法¶
Blocking and non-blocking methods
标准库的同步原语提供了多种机制来指定超时和阻塞行为,并且可以指示一个操作是因成功还是因超时返回的。
在 Trio 中,我们统一采用以下约定:
我们不提供超时参数。如果你需要超时,可以使用取消作用域(cancel scope)。
对于具有非阻塞版本的操作,阻塞和非阻塞版本是不同的方法,分别命名为
X
和X_nowait
。(这类似于queue.Queue
,但不同于大多数threading
中的类。)我们喜欢这种方法,因为它使我们能够将阻塞版本设为异步,而将非阻塞版本设为同步。当非阻塞方法无法成功执行时(例如通道为空、锁已经被占用等),它会引发
trio.WouldBlock
异常。没有类似于queue.Empty
和queue.Full
的区别 —— 我们只使用一个异常,并始终如一地使用它。
The standard library synchronization primitives have a variety of mechanisms for specifying timeouts and blocking behavior, and of signaling whether an operation returned due to success versus a timeout.
In Trio, we standardize on the following conventions:
- We don't provide timeout arguments. If you want a timeout, then use
a cancel scope.
- For operations that have a non-blocking variant, the blocking and
non-blocking variants are different methods with names like
X
andX_nowait
, respectively. (This is similar toqueue.Queue
, but unlike most of the classes inthreading
.) We like this approach because it allows us to make the blocking version async and the non-blocking version sync.
- When a non-blocking method cannot succeed (the channel is empty, the
lock is already held, etc.), then it raises
trio.WouldBlock
. There's no equivalent to thequeue.Empty
versusqueue.Full
distinction – we just have the one exception that we use consistently.
公平性¶
Fairness
这些类都保证是“公平的”,这意味着在选择谁将下一个获得锁、从队列中取出项等操作时,任务总是按照等待时间最长的顺序来决定。虽然目前尚不完全清楚这是否是最好的选择(参见 issues#54 ),但目前就是这样工作的。
作为这个含义的一个例子,下面是一个小程序,其中两个任务竞争一个锁。请注意,释放锁的任务总是在其他任务有机会运行之前立即尝试重新获取锁。(请记住,我们这里使用的是协作式多任务,所以实际上是 确定性 的,释放锁的任务会在其他任务醒来之前调用 acquire()
;在 Trio 中,释放锁不是一个检查点。)如果使用不公平的锁,这将导致同一个任务永远持有锁,另一个任务被饿死。但如果你运行这个程序,你会看到两个任务轮流获得锁:
# fairness-demo.py
import trio
async def loopy_child(number, lock):
while True:
async with lock:
print(f"Child {number} has the lock!")
await trio.sleep(0.5)
async def main():
async with trio.open_nursery() as nursery:
lock = trio.Lock()
nursery.start_soon(loopy_child, 1, lock)
nursery.start_soon(loopy_child, 2, lock)
trio.run(main)
These classes are all guaranteed to be "fair", meaning that when it comes time to choose who will be next to acquire a lock, get an item from a queue, etc., then it always goes to the task which has been waiting longest. It's not entirely clear whether this is the best choice, but for now that's how it works.
As an example of what this means, here's a small program in which two
tasks compete for a lock. Notice that the task which releases the lock
always immediately attempts to re-acquire it, before the other task has
a chance to run. (And remember that we're doing cooperative
multi-tasking here, so it's actually deterministic that the task
releasing the lock will call acquire()
before the other
task wakes up; in Trio releasing a lock is not a checkpoint.) With
an unfair lock, this would result in the same task holding the lock
forever and the other task being starved out. But if you run this,
you'll see that the two tasks politely take turns:
# fairness-demo.py
import trio
async def loopy_child(number, lock):
while True:
async with lock:
print(f"Child {number} has the lock!")
await trio.sleep(0.5)
async def main():
async with trio.open_nursery() as nursery:
lock = trio.Lock()
nursery.start_soon(loopy_child, 1, lock)
nursery.start_soon(loopy_child, 2, lock)
trio.run(main)
使用 Event
广播事件¶
Broadcasting an event with :class:`Event`
- class trio.Event¶
基类:
object
A waitable boolean value useful for inter-task synchronization, inspired by
threading.Event
.An event object has an internal boolean flag, representing whether the event has happened yet. The flag is initially False, and the
wait()
method waits until the flag is True. If the flag is already True, thenwait()
returns immediately. (If the event has already happened, there's nothing to wait for.) Theset()
method sets the flag to True, and wakes up any waiters.This behavior is useful because it helps avoid race conditions and lost wakeups: it doesn't matter whether
set()
gets called just before or afterwait()
. If you want a lower-level wakeup primitive that doesn't have this protection, considerCondition
ortrio.lowlevel.ParkingLot
.备注
Unlike threading.Event, trio.Event has no ~threading.Event.clear method. In Trio, once an Event has happened, it cannot un-happen. If you need to represent a series of events, consider creating a new Event object for each one (they're cheap!), or other synchronization methods like channels or trio.lowlevel.ParkingLot.
- statistics()¶
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
EventStatistics
tasks_waiting
: The number of tasks blocked on this event'swait()
method.
- class trio.EventStatistics(tasks_waiting)¶
基类:
object
An object containing debugging information.
Currently the following fields are defined:
tasks_waiting
: The number of tasks blocked on this event'strio.Event.wait()
method.
使用通道在任务之间传递值¶
Using channels to pass values between tasks
Channels 允许你在不同任务之间安全且便捷地传递对象。它们特别适用于实现生产者/消费者模式。
核心的通道 API 由抽象基类 trio.abc.SendChannel
和 trio.abc.ReceiveChannel
定义。你可以使用它们来实现自定义的通道,执行类似于在进程之间或通过网络传递对象的操作。但在许多情况下,你只想在单个进程内的不同任务之间传递对象,针对这种情况,你可以使用 trio.open_memory_channel()
:
- trio.open_memory_channel(max_buffer_size)¶
Open a channel for passing objects between tasks within a process.
Memory channels are lightweight, cheap to allocate, and entirely in-memory. They don't involve any operating-system resources, or any kind of serialization. They just pass Python objects directly between tasks (with a possible stop in an internal buffer along the way).
Channel objects can be closed by calling ~trio.abc.AsyncResource.aclose or using
async with
. They are not automatically closed when garbage collected. Closing memory channels isn't mandatory, but it is generally a good idea, because it helps avoid situations where tasks get stuck waiting on a channel when there's no-one on the other side. See 使用通道进行干净关闭 for details.Memory channel operations are all atomic with respect to cancellation, either ~trio.abc.ReceiveChannel.receive will successfully return an object, or it will raise
Cancelled
while leaving the channel unchanged.- 参数:
max_buffer_size (int or math.inf) -- The maximum number of items that can be buffered in the channel before
send()
blocks. Choosing a sensible value here is important to ensure that backpressure is communicated promptly and avoid unnecessary latency; see 在通道中缓冲 for more details. If in doubt, use 0.- 返回类型:
tuple
[MemorySendChannel
[TypeVar
(T
)],MemoryReceiveChannel
[TypeVar
(T
)]]- 返回:
A pair
(send_channel, receive_channel)
. If you have trouble remembering which order these go in, remember: data flows from left → right.
In addition to the standard channel methods, all memory channel objects provide a
statistics()
method, which returns an object with the following fields:current_buffer_used
: The number of items currently stored in the channel buffer.max_buffer_size
: The maximum number of items allowed in the buffer, as passed toopen_memory_channel()
.open_send_channels
: The number of openMemorySendChannel
endpoints pointing to this channel. Initially 1, but can be increased byMemorySendChannel.clone()
.open_receive_channels
: Likewise, but for openMemoryReceiveChannel
endpoints.tasks_waiting_send
: The number of tasks blocked insend
on this channel (summing over all clones).tasks_waiting_receive
: The number of tasks blocked inreceive
on this channel (summing over all clones).
备注
如果你曾经使用过 threading
或 asyncio
模块,你可能熟悉 queue.Queue
或 asyncio.Queue
。在 Trio 中,open_memory_channel()
是你在寻找队列时使用的功能。主要的区别在于,Trio 将经典的队列接口分成了两个对象。这种做法的优势是,它使得将两端放置在不同进程中成为可能,而无需重写代码,并且我们可以分别关闭这两端。
MemorySendChannel 和 MemoryReceiveChannel 还提供了超出核心通道接口的其他一些功能:
- class trio.MemorySendChannel(state, closed=False, tasks=NOTHING)¶
基类:
SendChannel
[SendType
]- await aclose()¶
Close this send channel object asynchronously.
See MemorySendChannel.close.
- 返回类型:
- clone()¶
Clone this send channel object.
This returns a new MemorySendChannel object, which acts as a duplicate of the original: sending on the new object does exactly the same thing as sending on the old object. (If you're familiar with os.dup, then this is a similar idea.)
However, closing one of the objects does not close the other, and receivers don't get EndOfChannel until all clones have been closed.
This is useful for communication patterns that involve multiple producers all sending objects to the same destination. If you give each producer its own clone of the MemorySendChannel, and then make sure to close each MemorySendChannel when it's finished, receivers will automatically get notified when all producers are finished. See 管理多个生产者和/或多个消费者 for examples.
- 抛出:
trio.ClosedResourceError -- if you already closed this MemorySendChannel object.
- 返回类型:
MemorySendChannel
[TypeVar
(SendType
, contravariant=True)]
- close()¶
Close this send channel object synchronously.
All channel objects have an asynchronous ~.AsyncResource.aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.
Using
with send_channel:
will close the channel object on leaving the with block.- 返回类型:
- await send(value)¶
See SendChannel.send <trio.abc.SendChannel.send>.
Memory channels allow multiple tasks to call send at the same time.
- 返回类型:
- send_nowait(value)¶
Like ~trio.abc.SendChannel.send, but if the channel's buffer is full, raises WouldBlock instead of blocking.
- 返回类型:
- statistics()¶
Returns a MemoryChannelStatistics for the memory channel this is associated with.
- 返回类型:
- class trio.MemoryReceiveChannel(state, closed=False, tasks=NOTHING)¶
基类:
ReceiveChannel
[ReceiveType
]- await aclose()¶
Close this receive channel object asynchronously.
See MemoryReceiveChannel.close.
- 返回类型:
- clone()¶
Clone this receive channel object.
This returns a new MemoryReceiveChannel object, which acts as a duplicate of the original: receiving on the new object does exactly the same thing as receiving on the old object.
However, closing one of the objects does not close the other, and the underlying channel is not closed until all clones are closed. (If you're familiar with os.dup, then this is a similar idea.)
This is useful for communication patterns that involve multiple consumers all receiving objects from the same underlying channel. See 管理多个生产者和/或多个消费者 for examples. :rtype:
MemoryReceiveChannel
[TypeVar
(ReceiveType
, covariant=True)]警告
The clones all share the same underlying channel. Whenever a clone
receive()
s a value, it is removed from the channel and the other clones do not receive that value. If you want to send multiple copies of the same stream of values to multiple destinations, likeitertools.tee()
, then you need to find some other solution; this method does not do that.- 抛出:
trio.ClosedResourceError -- if you already closed this MemoryReceiveChannel object.
- close()¶
Close this receive channel object synchronously.
All channel objects have an asynchronous ~.AsyncResource.aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.
Using
with receive_channel:
will close the channel object on leaving the with block.- 返回类型:
- await receive()¶
See ReceiveChannel.receive <trio.abc.ReceiveChannel.receive>.
Memory channels allow multiple tasks to call receive at the same time. The first task will get the first item sent, the second task will get the second item sent, and so on.
- 返回类型:
TypeVar
(ReceiveType
, covariant=True)
- receive_nowait()¶
Like ~trio.abc.ReceiveChannel.receive, but if there's nothing ready to receive, raises WouldBlock instead of blocking.
- 返回类型:
TypeVar
(ReceiveType
, covariant=True)
- statistics()¶
Returns a MemoryChannelStatistics for the memory channel this is associated with.
- 返回类型:
Channels allow you to safely and conveniently send objects between different tasks. They're particularly useful for implementing producer/consumer patterns.
The core channel API is defined by the abstract base classes
trio.abc.SendChannel
and trio.abc.ReceiveChannel
.
You can use these to implement your own custom channels, that do
things like pass objects between processes or over the network. But in
many cases, you just want to pass objects between different tasks
inside a single process, and for that you can use
trio.open_memory_channel()
:
- trio.open_memory_channel(max_buffer_size)
Open a channel for passing objects between tasks within a process.
Memory channels are lightweight, cheap to allocate, and entirely in-memory. They don't involve any operating-system resources, or any kind of serialization. They just pass Python objects directly between tasks (with a possible stop in an internal buffer along the way).
Channel objects can be closed by calling ~trio.abc.AsyncResource.aclose or using
async with
. They are not automatically closed when garbage collected. Closing memory channels isn't mandatory, but it is generally a good idea, because it helps avoid situations where tasks get stuck waiting on a channel when there's no-one on the other side. See 使用通道进行干净关闭 for details.Memory channel operations are all atomic with respect to cancellation, either ~trio.abc.ReceiveChannel.receive will successfully return an object, or it will raise
Cancelled
while leaving the channel unchanged.- 参数:
max_buffer_size (int or math.inf) -- The maximum number of items that can be buffered in the channel before
send()
blocks. Choosing a sensible value here is important to ensure that backpressure is communicated promptly and avoid unnecessary latency; see 在通道中缓冲 for more details. If in doubt, use 0.- 返回类型:
tuple
[MemorySendChannel
[TypeVar
(T
)],MemoryReceiveChannel
[TypeVar
(T
)]]- 返回:
A pair
(send_channel, receive_channel)
. If you have trouble remembering which order these go in, remember: data flows from left → right.
In addition to the standard channel methods, all memory channel objects provide a
statistics()
method, which returns an object with the following fields:current_buffer_used
: The number of items currently stored in the channel buffer.max_buffer_size
: The maximum number of items allowed in the buffer, as passed toopen_memory_channel()
.open_send_channels
: The number of openMemorySendChannel
endpoints pointing to this channel. Initially 1, but can be increased byMemorySendChannel.clone()
.open_receive_channels
: Likewise, but for openMemoryReceiveChannel
endpoints.tasks_waiting_send
: The number of tasks blocked insend
on this channel (summing over all clones).tasks_waiting_receive
: The number of tasks blocked inreceive
on this channel (summing over all clones).
备注
If you've used the threading
or asyncio
modules, you may be familiar with queue.Queue
or
asyncio.Queue
. In Trio, open_memory_channel()
is
what you use when you're looking for a queue. The main difference
is that Trio splits the classic queue interface up into two
objects. The advantage of this is that it makes it possible to put
the two ends in different processes without rewriting your code,
and that we can close the two sides separately.
MemorySendChannel and MemoryReceiveChannel also expose several more features beyond the core channel interface:
- class trio.MemorySendChannel(state, closed=False, tasks=NOTHING)
基类:
SendChannel
[SendType
]- await aclose()
Close this send channel object asynchronously.
See MemorySendChannel.close.
- 返回类型:
- clone()
Clone this send channel object.
This returns a new MemorySendChannel object, which acts as a duplicate of the original: sending on the new object does exactly the same thing as sending on the old object. (If you're familiar with os.dup, then this is a similar idea.)
However, closing one of the objects does not close the other, and receivers don't get EndOfChannel until all clones have been closed.
This is useful for communication patterns that involve multiple producers all sending objects to the same destination. If you give each producer its own clone of the MemorySendChannel, and then make sure to close each MemorySendChannel when it's finished, receivers will automatically get notified when all producers are finished. See 管理多个生产者和/或多个消费者 for examples.
- 抛出:
trio.ClosedResourceError -- if you already closed this MemorySendChannel object.
- 返回类型:
MemorySendChannel
[TypeVar
(SendType
, contravariant=True)]
- close()
Close this send channel object synchronously.
All channel objects have an asynchronous ~.AsyncResource.aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.
Using
with send_channel:
will close the channel object on leaving the with block.- 返回类型:
- await send(value)
See SendChannel.send <trio.abc.SendChannel.send>.
Memory channels allow multiple tasks to call send at the same time.
- 返回类型:
- send_nowait(value)
Like ~trio.abc.SendChannel.send, but if the channel's buffer is full, raises WouldBlock instead of blocking.
- 返回类型:
- statistics()
Returns a MemoryChannelStatistics for the memory channel this is associated with.
- 返回类型:
- class trio.MemoryReceiveChannel(state, closed=False, tasks=NOTHING)
基类:
ReceiveChannel
[ReceiveType
]- await aclose()
Close this receive channel object asynchronously.
See MemoryReceiveChannel.close.
- 返回类型:
- clone()
Clone this receive channel object.
This returns a new MemoryReceiveChannel object, which acts as a duplicate of the original: receiving on the new object does exactly the same thing as receiving on the old object.
However, closing one of the objects does not close the other, and the underlying channel is not closed until all clones are closed. (If you're familiar with os.dup, then this is a similar idea.)
This is useful for communication patterns that involve multiple consumers all receiving objects from the same underlying channel. See 管理多个生产者和/或多个消费者 for examples. :rtype:
MemoryReceiveChannel
[TypeVar
(ReceiveType
, covariant=True)]警告
The clones all share the same underlying channel. Whenever a clone
receive()
s a value, it is removed from the channel and the other clones do not receive that value. If you want to send multiple copies of the same stream of values to multiple destinations, likeitertools.tee()
, then you need to find some other solution; this method does not do that.- 抛出:
trio.ClosedResourceError -- if you already closed this MemoryReceiveChannel object.
- close()
Close this receive channel object synchronously.
All channel objects have an asynchronous ~.AsyncResource.aclose method. Memory channels can also be closed synchronously. This has the same effect on the channel and other tasks using it, but close is not a trio checkpoint. This simplifies cleaning up in cancelled tasks.
Using
with receive_channel:
will close the channel object on leaving the with block.- 返回类型:
- await receive()
See ReceiveChannel.receive <trio.abc.ReceiveChannel.receive>.
Memory channels allow multiple tasks to call receive at the same time. The first task will get the first item sent, the second task will get the second item sent, and so on.
- 返回类型:
TypeVar
(ReceiveType
, covariant=True)
- receive_nowait()
Like ~trio.abc.ReceiveChannel.receive, but if there's nothing ready to receive, raises WouldBlock instead of blocking.
- 返回类型:
TypeVar
(ReceiveType
, covariant=True)
- statistics()
Returns a MemoryChannelStatistics for the memory channel this is associated with.
- 返回类型:
- class trio.MemoryChannelStatistics(current_buffer_used, max_buffer_size, open_send_channels, open_receive_channels, tasks_waiting_send, tasks_waiting_receive)
基类:
object
一个简单的通道示例¶
A simple channel example
这是一个如何使用内存通道的简单示例:
import trio
async def main():
async with trio.open_nursery() as nursery:
# Open a channel:
send_channel, receive_channel = trio.open_memory_channel(0)
# Start a producer and a consumer, passing one end of the channel to
# each of them:
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
async def producer(send_channel):
# Producer sends 3 messages
for i in range(3):
# The producer sends using 'await send_channel.send(...)'
await send_channel.send(f"message {i}")
async def consumer(receive_channel):
# The consumer uses an 'async for' loop to receive the values:
async for value in receive_channel:
print(f"got value {value!r}")
trio.run(main)
如果你运行这个程序,它会打印:
got value "message 0"
got value "message 1"
got value "message 2"
然后它会一直挂起。(使用控制-C退出。)
Here's a simple example of how to use memory channels:
import trio
async def main():
async with trio.open_nursery() as nursery:
# Open a channel:
send_channel, receive_channel = trio.open_memory_channel(0)
# Start a producer and a consumer, passing one end of the channel to
# each of them:
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
async def producer(send_channel):
# Producer sends 3 messages
for i in range(3):
# The producer sends using 'await send_channel.send(...)'
await send_channel.send(f"message {i}")
async def consumer(receive_channel):
# The consumer uses an 'async for' loop to receive the values:
async for value in receive_channel:
print(f"got value {value!r}")
trio.run(main)
If you run this, it prints:
got value "message 0"
got value "message 1"
got value "message 2"
And then it hangs forever. (Use control-C to quit.)
使用通道进行干净关闭¶
Clean shutdown with channels
当然,我们通常不希望程序挂起。发生了什么?问题在于生产者发送了 3 条消息后退出了,但消费者无法得知生产者已经消失:对它来说,可能随时会有另一条消息到来。所以它会一直挂起,等待第四条消息。
这是修复后的新版本:它产生与前一个版本相同的输出,然后干净地退出。唯一的变化是在生产者和消费者中增加了 async with
块:
import trio
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
async def producer(send_channel):
async with send_channel:
for i in range(3):
await send_channel.send(f"message {i}")
async def consumer(receive_channel):
async with receive_channel:
async for value in receive_channel:
print(f"got value {value!r}")
trio.run(main)
这里最重要的是生产者的 async with
。当生产者退出时,这会关闭 send_channel
,这就告诉消费者没有更多的消息要来了,因此它可以干净地退出 async for
循环。然后程序会退出,因为两个任务都已退出。
我们还在消费者中添加了一个 async with
。这并不是那么重要,但它有助于我们捕捉错误或其他问题。例如,假设消费者由于某种原因提前退出了——可能是因为一个 bug。那么生产者就会把消息发送到空洞中,并可能无限期地卡住。但是,如果消费者关闭了 receive_channel
,生产者就会收到一个 BrokenResourceError
,提醒它应该停止发送消息,因为没有人在接收。
如果你想查看消费者提前退出的效果,可以尝试在 async for
循环中添加一个 break
语句——你应该会看到生产者收到一个 BrokenResourceError
。
Of course we don't generally like it when programs hang. What happened? The problem is that the producer sent 3 messages and then exited, but the consumer has no way to tell that the producer is gone: for all it knows, another message might be coming along any moment. So it hangs forever waiting for the 4th message.
Here's a new version that fixes this: it produces the same output as
the previous version, and then exits cleanly. The only change is the
addition of async with
blocks inside the producer and consumer:
import trio
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
async def producer(send_channel):
async with send_channel:
for i in range(3):
await send_channel.send(f"message {i}")
async def consumer(receive_channel):
async with receive_channel:
async for value in receive_channel:
print(f"got value {value!r}")
trio.run(main)
The really important thing here is the producer's async with
.
When the producer exits, this closes the send_channel
, and that
tells the consumer that no more messages are coming, so it can cleanly
exit its async for
loop. Then the program shuts down because both
tasks have exited.
We also added an async with
to the consumer. This isn't as
important, but it can help us catch mistakes or other problems. For
example, suppose that the consumer exited early for some reason –
maybe because of a bug. Then the producer would be sending messages
into the void, and might get stuck indefinitely. But, if the consumer
closes its receive_channel
, then the producer will get a
BrokenResourceError
to alert it that it should stop sending
messages because no-one is listening.
If you want to see the effect of the consumer exiting early, try
adding a break
statement to the async for
loop – you should
see a BrokenResourceError
from the producer.
管理多个生产者和/或多个消费者¶
Managing multiple producers and/or multiple consumers
你也可以有多个生产者和多个消费者,共享同一个通道。不过,这样做会使得关闭过程稍微复杂一些。
例如,考虑一下我们之前示例的简单扩展,现在有两个生产者和两个消费者:
# This example usually crashes!
import trio
import random
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
# Start two producers
nursery.start_soon(producer, "A", send_channel)
nursery.start_soon(producer, "B", send_channel)
# And two consumers
nursery.start_soon(consumer, "X", receive_channel)
nursery.start_soon(consumer, "Y", receive_channel)
async def producer(name, send_channel):
async with send_channel:
for i in range(3):
await send_channel.send(f"{i} from producer {name}")
# Random sleeps help trigger the problem more reliably
await trio.sleep(random.random())
async def consumer(name, receive_channel):
async with receive_channel:
async for value in receive_channel:
print(f"consumer {name} got value {value!r}")
# Random sleeps help trigger the problem more reliably
await trio.sleep(random.random())
trio.run(main)
这两个生产者 A 和 B 每个发送 3 条消息。然后这些消息会随机分配给两个消费者 X 和 Y。因此,我们希望看到类似这样的输出:
consumer Y got value '0 from producer B'
consumer X got value '0 from producer A'
consumer Y got value '1 from producer A'
consumer Y got value '1 from producer B'
consumer X got value '2 from producer B'
consumer X got value '2 from producer A'
然而,在大多数情况下,结果并非如此——输出的前一部分是正常的,但当程序运行到结束时,会因为 ClosedResourceError
崩溃。如果你运行几次这个程序,你会发现有时 tracebacks 显示是 send
崩溃,有时是 receive
崩溃,甚至有时它根本不会崩溃。
发生了什么呢?假设生产者 A 最先完成。它退出了,并且它的 async with
块关闭了 send_channel
。但是等等!生产者 B 仍在使用这个 send_channel
... 所以下次 B 调用 send
时,它会遇到 ClosedResourceError
。
然而,有时如果我们幸运的话,两个生产者可能同时完成(或者足够接近),这样它们都在关闭 send_channel
之前完成最后一次 send
。
但是,即使那样,我们仍然没有完全解决问题!在生产者退出后,两个消费者会争先恐后地检查 send_channel
是否已关闭。假设 X 赢得了比赛。它退出了 async for
循环,然后退出了 async with
块... 并关闭了 receive_channel
,而 Y 仍在使用它。再次,这会导致崩溃。
我们本可以通过一些复杂的账务处理来确保只有 最后 一个生产者和 最后 一个消费者关闭它们的通道端点... 但那样做会非常繁琐且不可靠。幸运的是,有更好的方法!这是我们上面程序的修复版本:
import trio
import random
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
async with send_channel, receive_channel:
# Start two producers, giving each its own private clone
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel.clone())
# And two consumers, giving each its own private clone
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel.clone())
async def producer(name, send_channel):
async with send_channel:
for i in range(3):
await send_channel.send(f"{i} from producer {name}")
# Random sleeps help trigger the problem more reliably
await trio.sleep(random.random())
async def consumer(name, receive_channel):
async with receive_channel:
async for value in receive_channel:
print(f"consumer {name} got value {value!r}")
# Random sleeps help trigger the problem more reliably
await trio.sleep(random.random())
trio.run(main)
这个示例演示了如何使用 MemorySendChannel.clone 和 MemoryReceiveChannel.clone 方法。它们的作用是创建通道端点的副本,这些副本就像原始端点一样工作——唯一的区别是它们可以独立地关闭。而且,只有在 所有 副本都关闭之后,底层的通道才会关闭。因此,这完全解决了我们的关闭问题,如果你运行这个程序,你会看到它打印六行输出后干净地退出。
请注意我们使用的一个小技巧:main
中的代码创建了克隆对象并将它们传递给所有子任务,然后使用 async with
关闭原始对象。另一种选择是将副本传递给所有任务(除了最后一个),然后将原始对象传递给最后一个任务,如下所示:
# 也有效,但更容易出错:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel)
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel)
但是这样更容易出错,尤其是在你使用循环来启动生产者/消费者时。
请确保不要写:
# 错误,程序将挂起:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel.clone())
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel.clone())
在这里,我们将副本传递给任务,但从未关闭原始对象。这意味着我们有 3 个发送通道对象(原始对象 + 两个副本),但我们只关闭了 2 个它们,因此消费者将永远等待最后一个通道被关闭。
You can also have multiple producers, and multiple consumers, all sharing the same channel. However, this makes shutdown a little more complicated.
For example, consider this naive extension of our previous example, now with two producers and two consumers:
# This example usually crashes!
import trio
import random
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
# Start two producers
nursery.start_soon(producer, "A", send_channel)
nursery.start_soon(producer, "B", send_channel)
# And two consumers
nursery.start_soon(consumer, "X", receive_channel)
nursery.start_soon(consumer, "Y", receive_channel)
async def producer(name, send_channel):
async with send_channel:
for i in range(3):
await send_channel.send(f"{i} from producer {name}")
# Random sleeps help trigger the problem more reliably
await trio.sleep(random.random())
async def consumer(name, receive_channel):
async with receive_channel:
async for value in receive_channel:
print(f"consumer {name} got value {value!r}")
# Random sleeps help trigger the problem more reliably
await trio.sleep(random.random())
trio.run(main)
The two producers, A and B, send 3 messages apiece. These are then randomly distributed between the two consumers, X and Y. So we're hoping to see some output like:
consumer Y got value '0 from producer B'
consumer X got value '0 from producer A'
consumer Y got value '1 from producer A'
consumer Y got value '1 from producer B'
consumer X got value '2 from producer B'
consumer X got value '2 from producer A'
However, on most runs, that's not what happens – the first part of the
output is OK, and then when we get to the end the program crashes with
ClosedResourceError
. If you run the program a few times, you'll
see that sometimes the traceback shows send
crashing, and other
times it shows receive
crashing, and you might even find that on
some runs it doesn't crash at all.
Here's what's happening: suppose that producer A finishes first. It
exits, and its async with
block closes the send_channel
. But
wait! Producer B was still using that send_channel
... so the next
time B calls send
, it gets a ClosedResourceError
.
Sometimes, though if we're lucky, the two producers might finish at
the same time (or close enough), so they both make their last send
before either of them closes the send_channel
.
But, even if that happens, we're not out of the woods yet! After the
producers exit, the two consumers race to be the first to notice that
the send_channel
has closed. Suppose that X wins the race. It
exits its async for
loop, then exits the async with
block...
and closes the receive_channel
, while Y is still using it. Again,
this causes a crash.
We could avoid this by using some complicated bookkeeping to make sure that only the last producer and the last consumer close their channel endpoints... but that would be tiresome and fragile. Fortunately, there's a better way! Here's a fixed version of our program above:
import trio
import random
async def main():
async with trio.open_nursery() as nursery:
send_channel, receive_channel = trio.open_memory_channel(0)
async with send_channel, receive_channel:
# Start two producers, giving each its own private clone
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel.clone())
# And two consumers, giving each its own private clone
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel.clone())
async def producer(name, send_channel):
async with send_channel:
for i in range(3):
await send_channel.send(f"{i} from producer {name}")
# Random sleeps help trigger the problem more reliably
await trio.sleep(random.random())
async def consumer(name, receive_channel):
async with receive_channel:
async for value in receive_channel:
print(f"consumer {name} got value {value!r}")
# Random sleeps help trigger the problem more reliably
await trio.sleep(random.random())
trio.run(main)
This example demonstrates using the MemorySendChannel.clone and MemoryReceiveChannel.clone methods. What these do is create copies of our endpoints, that act just like the original – except that they can be closed independently. And the underlying channel is only closed after all the clones have been closed. So this completely solves our problem with shutdown, and if you run this program, you'll see it print its six lines of output and then exits cleanly.
Notice a small trick we use: the code in main
creates clone
objects to pass into all the child tasks, and then closes the original
objects using async with
. Another option is to pass clones into
all-but-one of the child tasks, and then pass the original object into
the last task, like:
# Also works, but is more finicky:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel)
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel)
But this is more error-prone, especially if you use a loop to spawn the producers/consumers.
Just make sure that you don't write:
# Broken, will cause program to hang:
send_channel, receive_channel = trio.open_memory_channel(0)
nursery.start_soon(producer, "A", send_channel.clone())
nursery.start_soon(producer, "B", send_channel.clone())
nursery.start_soon(consumer, "X", receive_channel.clone())
nursery.start_soon(consumer, "Y", receive_channel.clone())
Here we pass clones into the tasks, but never close the original objects. That means we have 3 send channel objects (the original + two clones), but we only close 2 of them, so the consumers will hang around forever waiting for that last one to be closed.
在通道中缓冲¶
Buffering in channels
当你调用 open_memory_channel()
时,必须指定通道中可以缓冲多少个值。如果缓冲区已满,那么任何调用 send()
的任务都会停止,并等待另一个任务调用 receive()
。这是有用的,因为它会产生 背压:如果通道的生产者运行速度快于消费者,它会迫使生产者放慢速度。
你可以通过 open_memory_channel(0)
完全禁用缓冲。在这种情况下,任何调用 send()
的任务都会等待,直到另一个任务调用 receive()
,反之亦然。这类似于 经典的通信顺序进程模型 中通道的工作方式,并且如果你不确定应该使用多大的缓冲区,这是一个合理的默认值。(这就是我们在上面示例中使用它的原因。)
在另一个极端,你可以通过使用 open_memory_channel(math.inf)
来使缓冲区无限大。在这种情况下,send()
始终 会立即返回。通常,这不是一个好主意。为了看清楚为什么,考虑一个生产者运行速度快于消费者的程序:
# Simulate a producer that generates values 10x faster than the
# consumer can handle them.
import trio
import math
async def producer(send_channel):
count = 0
while True:
# Pretend that we have to do some work to create this message, and it
# takes 0.1 seconds:
await trio.sleep(0.1)
await send_channel.send(count)
print("Sent message:", count)
count += 1
async def consumer(receive_channel):
async for value in receive_channel:
print("Received message:", value)
# Pretend that we have to do some work to handle this message, and it
# takes 1 second
await trio.sleep(1)
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
trio.run(main)
如果你运行这个程序,你会看到类似这样的输出:
Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Sent message: 4
Sent message: 5
Sent message: 6
Sent message: 7
Sent message: 8
Sent message: 9
Received message: 1
Sent message: 10
Sent message: 11
Sent message: 12
...
平均而言,生产者每秒发送十条消息,但消费者每秒只调用一次 receive
。这意味着每秒,通道的内部缓冲区必须增长以容纳额外的九个项。经过一分钟,缓冲区将包含大约 540 条消息;经过一小时,它将增加到大约 32,400 条。最终,程序将耗尽内存。在我们耗尽内存之前,处理单个消息的延迟将变得极其糟糕。例如,在一分钟时,生产者正在发送大约第 600 条消息,但消费者仍在处理第 60 条消息。第 600 条消息将在通道中等待大约 9 分钟,直到消费者赶上并处理它。
现在,尝试将 open_memory_channel(math.inf)
替换为 open_memory_channel(0)
,然后再次运行它。我们会看到类似这样的输出:
Sent message: 0
Received message: 0
Received message: 1
Sent message: 1
Received message: 2
Sent message: 2
Sent message: 3
Received message: 3
...
现在, send
调用会等待 receive
调用完成,这迫使生产者放慢速度以匹配消费者的速度。(可能看起来有些奇怪的是,一些值在报告为 "Sent" 之前先报告为 "Received";这是因为实际的发送/接收操作是同时发生的,因此哪一行先打印是随机的。)
现在,让我们尝试设置一个小但非零的缓冲区大小,如 open_memory_channel(3)
。你认为会发生什么?
我得到的是:
Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Received message: 1
Sent message: 4
Received message: 2
Sent message: 5
...
所以你可以看到,生产者提前发送了 3 条消息,然后停止等待:当消费者读取消息 1 时,它发送消息 4,然后当消费者读取消息 2 时,它发送消息 5,依此类推。一旦它达到稳定状态,这个版本的行为就像我们之前将缓冲区大小设置为 0 的版本一样,只是它使用了更多的内存,并且每条消息在被处理之前在缓冲区中停留的时间更长(即消息的延迟更高)。
当然,真实的生产者和消费者通常比这更复杂,在某些情况下,适量的缓冲可能会提高吞吐量。但过多的缓冲会浪费内存并增加延迟,因此如果你想调整应用程序的性能,应该进行实验,以找出最适合你的缓冲区大小。
那么我们为什么还要支持无限缓冲呢? 好问题!尽管我们上面看到的所有问题,在某些情况下,你确实需要一个无限缓冲区。例如,考虑一个使用通道跟踪所有它仍然想抓取的 URL 的网页爬虫。每个爬虫运行一个循环,它从通道中获取一个 URL,抓取它,检查 HTML 中的外部链接,然后将新的 URL 添加到通道中。这创建了一个 循环流,其中每个消费者也是一个生产者。在这种情况下,如果你的通道缓冲区满了,那么爬虫在尝试将新 URL 添加到通道时会被阻塞,如果所有的爬虫都被阻塞,那么它们就无法从通道中取出任何 URL,因此它们永远会陷入死锁。使用无限通道可以避免这种情况,因为它意味着 send()
永远不会阻塞。
When you call open_memory_channel()
, you have to specify how
many values can be buffered internally in the channel. If the buffer
is full, then any task that calls send()
will stop and wait for another task to call
receive()
. This is useful because it
produces backpressure: if the channel producers are running faster
than the consumers, then it forces the producers to slow down.
You can disable buffering entirely, by doing
open_memory_channel(0)
. In that case any task that calls
send()
will wait until another task calls
receive()
, and vice versa. This is similar to
how channels work in the classic Communicating Sequential Processes
model, and is
a reasonable default if you aren't sure what size buffer to use.
(That's why we used it in the examples above.)
At the other extreme, you can make the buffer unbounded by using
open_memory_channel(math.inf)
. In this case,
send()
always returns immediately.
Normally, this is a bad idea. To see why, consider a program where the
producer runs more quickly than the consumer:
# Simulate a producer that generates values 10x faster than the
# consumer can handle them.
import trio
import math
async def producer(send_channel):
count = 0
while True:
# Pretend that we have to do some work to create this message, and it
# takes 0.1 seconds:
await trio.sleep(0.1)
await send_channel.send(count)
print("Sent message:", count)
count += 1
async def consumer(receive_channel):
async for value in receive_channel:
print("Received message:", value)
# Pretend that we have to do some work to handle this message, and it
# takes 1 second
await trio.sleep(1)
async def main():
send_channel, receive_channel = trio.open_memory_channel(math.inf)
async with trio.open_nursery() as nursery:
nursery.start_soon(producer, send_channel)
nursery.start_soon(consumer, receive_channel)
trio.run(main)
If you run this program, you'll see output like:
Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Sent message: 4
Sent message: 5
Sent message: 6
Sent message: 7
Sent message: 8
Sent message: 9
Received message: 1
Sent message: 10
Sent message: 11
Sent message: 12
...
On average, the producer sends ten messages per second, but the
consumer only calls receive
once per second. That means that each
second, the channel's internal buffer has to grow to hold an extra
nine items. After a minute, the buffer will have ~540 items in it;
after an hour, that grows to ~32,400. Eventually, the program will run
out of memory. And well before we run out of memory, our latency on
handling individual messages will become abysmal. For example, at the
one minute mark, the producer is sending message ~600, but the
consumer is still processing message ~60. Message 600 will have to sit
in the channel for ~9 minutes before the consumer catches up and
processes it.
Now try replacing open_memory_channel(math.inf)
with
open_memory_channel(0)
, and run it again. We get output like:
Sent message: 0
Received message: 0
Received message: 1
Sent message: 1
Received message: 2
Sent message: 2
Sent message: 3
Received message: 3
...
Now the send
calls wait for the receive
calls to finish, which
forces the producer to slow down to match the consumer's speed. (It
might look strange that some values are reported as "Received" before
they're reported as "Sent"; this happens because the actual
send/receive happen at the same time, so which line gets printed first
is random.)
Now, let's try setting a small but nonzero buffer size, like
open_memory_channel(3)
. what do you think will happen?
I get:
Sent message: 0
Received message: 0
Sent message: 1
Sent message: 2
Sent message: 3
Received message: 1
Sent message: 4
Received message: 2
Sent message: 5
...
So you can see that the producer runs ahead by 3 messages, and then stops to wait: when the consumer reads message 1, it sends message 4, then when the consumer reads message 2, it sends message 5, and so on. Once it reaches the steady state, this version acts just like our previous version where we set the buffer size to 0, except that it uses a bit more memory and each message sits in the buffer for a bit longer before being processed (i.e., the message latency is higher).
Of course real producers and consumers are usually more complicated than this, and in some situations, a modest amount of buffering might improve throughput. But too much buffering wastes memory and increases latency, so if you want to tune your application you should experiment to see what value works best for you.
Why do we even support unbounded buffers then? Good question!
Despite everything we saw above, there are times when you actually do
need an unbounded buffer. For example, consider a web crawler that
uses a channel to keep track of all the URLs it still wants to crawl.
Each crawler runs a loop where it takes a URL from the channel,
fetches it, checks the HTML for outgoing links, and then adds the new
URLs to the channel. This creates a circular flow, where each
consumer is also a producer. In this case, if your channel buffer gets
full, then the crawlers will block when they try to add new URLs to
the channel, and if all the crawlers got blocked, then they aren't
taking any URLs out of the channel, so they're stuck forever in a
deadlock. Using an unbounded channel avoids this, because it means
that send()
never blocks.
较低级别的同步原语¶
Lower-level synchronization primitives
就我个人而言,我发现事件和通道通常足够实现我关心的大多数功能,而且它们比本节中讨论的低级原语更易于阅读代码。但如果你需要它们,它们在这里。(如果你发现自己在使用这些原语来实现新的更高级别的同步原语,那么你可能还想查看 trio.lowlevel
中的设施,以便更直接地接触 Trio 的底层同步逻辑。本节中讨论的所有类都是基于 trio.lowlevel
中的公共 API 实现的;它们没有对 Trio 内部的特殊访问权限。)
- class trio.CapacityLimiter(total_tokens)¶
基类:
AsyncContextManagerMixin
An object for controlling access to a resource with limited capacity.
Sometimes you need to put a limit on how many tasks can do something at the same time. For example, you might want to use some threads to run multiple blocking I/O operations in parallel... but if you use too many threads at once, then your system can become overloaded and it'll actually make things slower. One popular solution is to impose a policy like "run up to 40 threads at the same time, but no more". But how do you implement a policy like this?
That's what
CapacityLimiter
is for. You can think of aCapacityLimiter
object as a sack that starts out holding some fixed number of tokens:limit = trio.CapacityLimiter(40)
Then tasks can come along and borrow a token out of the sack:
# Borrow a token: async with limit: # We are holding a token! await perform_expensive_operation() # Exiting the 'async with' block puts the token back into the sack
And crucially, if you try to borrow a token but the sack is empty, then you have to wait for another task to finish what it's doing and put its token back first before you can take it and continue.
Another way to think of it: a
CapacityLimiter
is like a sofa with a fixed number of seats, and if they're all taken then you have to wait for someone to get up before you can sit down.By default,
trio.to_thread.run_sync()
uses aCapacityLimiter
to limit the number of threads running at once; see trio.to_thread.current_default_thread_limiter for details.If you're familiar with semaphores, then you can think of this as a restricted semaphore that's specialized for one common use case, with additional error checking. For a more traditional semaphore, see
Semaphore
.备注
Don't confuse this with the "leaky bucket" or "token bucket" algorithms used to limit bandwidth usage on networks. The basic idea of using tokens to track a resource limit is similar, but this is a very simple sack where tokens aren't automatically created or destroyed over time; they're just borrowed and then put back.
- await acquire()¶
Borrow a token from the sack, blocking if necessary.
- 抛出:
RuntimeError -- if the current task already holds one of this sack's tokens.
- 返回类型:
- acquire_nowait()¶
Borrow a token from the sack, without blocking.
- 抛出:
WouldBlock -- if no tokens are available.
RuntimeError -- if the current task already holds one of this sack's tokens.
- 返回类型:
- await acquire_on_behalf_of(borrower)¶
Borrow a token from the sack on behalf of
borrower
, blocking if necessary.- 参数:
borrower (Task | object) -- A
trio.lowlevel.Task
or arbitrary opaque object used to record who is borrowing this token; seeacquire_on_behalf_of_nowait()
for details.- 抛出:
RuntimeError -- if
borrower
task already holds one of this sack's tokens.- 返回类型:
None
- acquire_on_behalf_of_nowait(borrower)¶
Borrow a token from the sack on behalf of
borrower
, without blocking.- 参数:
borrower (Task | object) -- A
trio.lowlevel.Task
or arbitrary opaque object used to record who is borrowing this token. This is used bytrio.to_thread.run_sync()
to allow threads to "hold tokens", with the intention in the future of using it to allow deadlock detection and other useful things- 抛出:
WouldBlock -- if no tokens are available.
RuntimeError -- if
borrower
already holds one of this sack's tokens.
- 返回类型:
None
- release()¶
Put a token back into the sack.
- 抛出:
RuntimeError -- if the current task has not acquired one of this sack's tokens.
- 返回类型:
- release_on_behalf_of(borrower)¶
Put a token back into the sack on behalf of
borrower
.- 抛出:
RuntimeError -- if the given borrower has not acquired one of this sack's tokens.
- 返回类型:
- statistics()¶
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
CapacityLimiterStatistics
borrowed_tokens
: The number of tokens currently borrowed from the sack.total_tokens
: The total number of tokens in the sack. Usually this will be larger thanborrowed_tokens
, but it's possibly for it to be smaller iftotal_tokens
was recently decreased.borrowers
: A list of all tasks or other entities that currently hold a token.tasks_waiting
: The number of tasks blocked on thisCapacityLimiter
'sacquire()
oracquire_on_behalf_of()
methods.
- property total_tokens: int | float¶
The total capacity available.
You can change
total_tokens
by assigning to this attribute. If you make it larger, then the appropriate number of waiting tasks will be woken immediately to take the new tokens. If you decrease total_tokens below the number of tasks that are currently using the resource, then all current tasks will be allowed to finish as normal, but no new tasks will be allowed in until the total number of tasks drops below the new total_tokens.
- class trio.Semaphore(initial_value, *, max_value=None)¶
基类:
AsyncContextManagerMixin
A semaphore.
A semaphore holds an integer value, which can be incremented by calling
release()
and decremented by callingacquire()
– but the value is never allowed to drop below zero. If the value is zero, thenacquire()
will block until someone callsrelease()
.If you're looking for a
Semaphore
to limit the number of tasks that can access some resource simultaneously, then consider using aCapacityLimiter
instead.This object's interface is similar to, but different from, that of
threading.Semaphore
.A
Semaphore
object can be used as an async context manager; it blocks on entry but not on exit.- 参数:
- await acquire()¶
Decrement the semaphore value, blocking if necessary to avoid letting it drop below zero.
- 返回类型:
- acquire_nowait()¶
Attempt to decrement the semaphore value, without blocking.
- 抛出:
WouldBlock -- if the value is zero.
- 返回类型:
- release()¶
Increment the semaphore value, possibly waking a task blocked in
acquire()
.- 抛出:
ValueError -- if incrementing the value would cause it to exceed
max_value
.- 返回类型:
- statistics()¶
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
ParkingLotStatistics
tasks_waiting
: The number of tasks blocked on this semaphore'sacquire()
method.
- class trio.Lock¶
基类:
_LockImpl
A classic mutex.
This is a non-reentrant, single-owner lock. Unlike
threading.Lock
, only the owner of the lock is allowed to release it.A
Lock
object can be used as an async context manager; it blocks on entry but not on exit.- await acquire()¶
Acquire the lock, blocking if necessary.
- 抛出:
BrokenResourceError -- if the owner of the lock exits without releasing.
- 返回类型:
- acquire_nowait()¶
Attempt to acquire the lock, without blocking.
- 抛出:
WouldBlock -- if the lock is held.
- 返回类型:
- locked()¶
Check whether the lock is currently held.
- 返回:
True if the lock is held, False otherwise.
- 返回类型:
- release()¶
Release the lock.
- 抛出:
RuntimeError -- if the calling task does not hold the lock.
- 返回类型:
- statistics()¶
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
LockStatistics
locked
: boolean indicating whether the lock is held.owner
: thetrio.lowlevel.Task
currently holding the lock, or None if the lock is not held.tasks_waiting
: The number of tasks blocked on this lock'sacquire()
method.
- class trio.StrictFIFOLock¶
基类:
_LockImpl
A variant of
Lock
where tasks are guaranteed to acquire the lock in strict first-come-first-served order.An example of when this is useful is if you're implementing something like
trio.SSLStream
or an HTTP/2 server using h2, where you have multiple concurrent tasks that are interacting with a shared state machine, and at unpredictable moments the state machine requests that a chunk of data be sent over the network. (For example, when using h2 simply reading incoming data can occasionally create outgoing data to send.) The challenge is to make sure that these chunks are sent in the correct order, without being garbled.One option would be to use a regular
Lock
, and wrap it around every interaction with the state machine:# This approach is sometimes workable but often sub-optimal; see below async with lock: state_machine.do_something() if state_machine.has_data_to_send(): await conn.sendall(state_machine.get_data_to_send())
But this can be problematic. If you're using h2 then usually reading incoming data doesn't create the need to send any data, so we don't want to force every task that tries to read from the network to sit and wait a potentially long time for
sendall
to finish. And in some situations this could even potentially cause a deadlock, if the remote peer is waiting for you to read some data before it accepts the data you're sending.StrictFIFOLock
provides an alternative. We can rewrite our example like:# Note: no awaits between when we start using the state machine and # when we block to take the lock! state_machine.do_something() if state_machine.has_data_to_send(): # Notice that we fetch the data to send out of the state machine # *before* sleeping, so that other tasks won't see it. chunk = state_machine.get_data_to_send() async with strict_fifo_lock: await conn.sendall(chunk)
First we do all our interaction with the state machine in a single scheduling quantum (notice there are no
await
s in there), so it's automatically atomic with respect to other tasks. And then if and only if we have data to send, we get in line to send it – andStrictFIFOLock
guarantees that each task will send its data in the same order that the state machine generated it.Currently,
StrictFIFOLock
is identical toLock
, but (a) this may not always be true in the future, especially if Trio ever implements more sophisticated scheduling policies, and (b) the above code is relying on a pretty subtle property of its lock. Using aStrictFIFOLock
acts as an executable reminder that you're relying on this property.
- class trio.Condition(lock=None)¶
基类:
AsyncContextManagerMixin
A classic condition variable, similar to
threading.Condition
.A
Condition
object can be used as an async context manager to acquire the underlying lock; it blocks on entry but not on exit.- 参数:
lock (Lock) -- the lock object to use. If given, must be a
trio.Lock
. If None, a newLock
will be allocated and used.
- await acquire()¶
Acquire the underlying lock, blocking if necessary.
- 抛出:
BrokenResourceError -- if the owner of the underlying lock exits without releasing.
- 返回类型:
- acquire_nowait()¶
Attempt to acquire the underlying lock, without blocking.
- 抛出:
WouldBlock -- if the lock is currently held.
- 返回类型:
- locked()¶
Check whether the underlying lock is currently held.
- 返回:
True if the lock is held, False otherwise.
- 返回类型:
- notify(n=1)¶
Wake one or more tasks that are blocked in
wait()
.- 参数:
n (int) -- The number of tasks to wake.
- 抛出:
RuntimeError -- if the calling task does not hold the lock.
- 返回类型:
- notify_all()¶
Wake all tasks that are currently blocked in
wait()
.- 抛出:
RuntimeError -- if the calling task does not hold the lock.
- 返回类型:
- statistics()¶
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
ConditionStatistics
tasks_waiting
: The number of tasks blocked on this condition'swait()
method.lock_statistics
: The result of calling the underlyingLock
sstatistics()
method.
- await wait()¶
Wait for another task to call
notify()
ornotify_all()
.When calling this method, you must hold the lock. It releases the lock while waiting, and then re-acquires it before waking up.
There is a subtlety with how this method interacts with cancellation: when cancelled it will block to re-acquire the lock before raising
Cancelled
. This may cause cancellation to be less prompt than expected. The advantage is that it makes code like this work:async with condition: await condition.wait()
If we didn't re-acquire the lock before waking up, and
wait()
were cancelled here, then we'd crash incondition.__aexit__
when we tried to release the lock we no longer held.- 抛出:
RuntimeError -- if the calling task does not hold the lock.
BrokenResourceError -- if the owner of the lock exits without releasing, when attempting to re-acquire.
- 返回类型:
这些原语返回可以被检查的统计对象。
- class trio.CapacityLimiterStatistics(borrowed_tokens, total_tokens, borrowers, tasks_waiting)¶
基类:
object
An object containing debugging information.
Currently the following fields are defined:
borrowed_tokens
: The number of tokens currently borrowed from the sack.total_tokens
: The total number of tokens in the sack. Usually this will be larger thanborrowed_tokens
, but it's possibly for it to be smaller iftrio.CapacityLimiter.total_tokens
was recently decreased.borrowers
: A list of all tasks or other entities that currently hold a token.tasks_waiting
: The number of tasks blocked on thisCapacityLimiter
'strio.CapacityLimiter.acquire()
ortrio.CapacityLimiter.acquire_on_behalf_of()
methods.
- class trio.LockStatistics(locked, owner, tasks_waiting)¶
基类:
object
An object containing debugging information for a Lock.
Currently the following fields are defined:
locked
(boolean): indicating whether the lock is held.owner
: thetrio.lowlevel.Task
currently holding the lock, or None if the lock is not held.tasks_waiting
(int): The number of tasks blocked on this lock'strio.Lock.acquire()
method.
- class trio.ConditionStatistics(tasks_waiting, lock_statistics)¶
基类:
object
An object containing debugging information for a Condition.
Currently the following fields are defined:
tasks_waiting
(int): The number of tasks blocked on this condition'strio.Condition.wait()
method.lock_statistics
: The result of calling the underlyingLock
sstatistics()
method.
Personally, I find that events and channels are usually enough to
implement most things I care about, and lead to easier to read code
than the lower-level primitives discussed in this section. But if you
need them, they're here. (If you find yourself reaching for these
because you're trying to implement a new higher-level synchronization
primitive, then you might also want to check out the facilities in
trio.lowlevel
for a more direct exposure of Trio's underlying
synchronization logic. All of classes discussed in this section are
implemented on top of the public APIs in trio.lowlevel
; they
don't have any special access to Trio's internals.)
- class trio.CapacityLimiter(total_tokens)
基类:
AsyncContextManagerMixin
An object for controlling access to a resource with limited capacity.
Sometimes you need to put a limit on how many tasks can do something at the same time. For example, you might want to use some threads to run multiple blocking I/O operations in parallel... but if you use too many threads at once, then your system can become overloaded and it'll actually make things slower. One popular solution is to impose a policy like "run up to 40 threads at the same time, but no more". But how do you implement a policy like this?
That's what
CapacityLimiter
is for. You can think of aCapacityLimiter
object as a sack that starts out holding some fixed number of tokens:limit = trio.CapacityLimiter(40)
Then tasks can come along and borrow a token out of the sack:
# Borrow a token: async with limit: # We are holding a token! await perform_expensive_operation() # Exiting the 'async with' block puts the token back into the sack
And crucially, if you try to borrow a token but the sack is empty, then you have to wait for another task to finish what it's doing and put its token back first before you can take it and continue.
Another way to think of it: a
CapacityLimiter
is like a sofa with a fixed number of seats, and if they're all taken then you have to wait for someone to get up before you can sit down.By default,
trio.to_thread.run_sync()
uses aCapacityLimiter
to limit the number of threads running at once; see trio.to_thread.current_default_thread_limiter for details.If you're familiar with semaphores, then you can think of this as a restricted semaphore that's specialized for one common use case, with additional error checking. For a more traditional semaphore, see
Semaphore
.备注
Don't confuse this with the "leaky bucket" or "token bucket" algorithms used to limit bandwidth usage on networks. The basic idea of using tokens to track a resource limit is similar, but this is a very simple sack where tokens aren't automatically created or destroyed over time; they're just borrowed and then put back.
- await acquire()
Borrow a token from the sack, blocking if necessary.
- 抛出:
RuntimeError -- if the current task already holds one of this sack's tokens.
- 返回类型:
- acquire_nowait()
Borrow a token from the sack, without blocking.
- 抛出:
WouldBlock -- if no tokens are available.
RuntimeError -- if the current task already holds one of this sack's tokens.
- 返回类型:
- await acquire_on_behalf_of(borrower)
Borrow a token from the sack on behalf of
borrower
, blocking if necessary.- 参数:
borrower (
Task
|object
) -- Atrio.lowlevel.Task
or arbitrary opaque object used to record who is borrowing this token; seeacquire_on_behalf_of_nowait()
for details.- 抛出:
RuntimeError -- if
borrower
task already holds one of this sack's tokens.- 返回类型:
- acquire_on_behalf_of_nowait(borrower)
Borrow a token from the sack on behalf of
borrower
, without blocking.- 参数:
borrower (
Task
|object
) -- Atrio.lowlevel.Task
or arbitrary opaque object used to record who is borrowing this token. This is used bytrio.to_thread.run_sync()
to allow threads to "hold tokens", with the intention in the future of using it to allow deadlock detection and other useful things- 抛出:
WouldBlock -- if no tokens are available.
RuntimeError -- if
borrower
already holds one of this sack's tokens.
- 返回类型:
- property borrowed_tokens: int
The amount of capacity that's currently in use.
- release()
Put a token back into the sack.
- 抛出:
RuntimeError -- if the current task has not acquired one of this sack's tokens.
- 返回类型:
- release_on_behalf_of(borrower)
Put a token back into the sack on behalf of
borrower
.- 抛出:
RuntimeError -- if the given borrower has not acquired one of this sack's tokens.
- 返回类型:
- statistics()
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
CapacityLimiterStatistics
borrowed_tokens
: The number of tokens currently borrowed from the sack.total_tokens
: The total number of tokens in the sack. Usually this will be larger thanborrowed_tokens
, but it's possibly for it to be smaller iftotal_tokens
was recently decreased.borrowers
: A list of all tasks or other entities that currently hold a token.tasks_waiting
: The number of tasks blocked on thisCapacityLimiter
'sacquire()
oracquire_on_behalf_of()
methods.
- property total_tokens: int | float
The total capacity available.
You can change
total_tokens
by assigning to this attribute. If you make it larger, then the appropriate number of waiting tasks will be woken immediately to take the new tokens. If you decrease total_tokens below the number of tasks that are currently using the resource, then all current tasks will be allowed to finish as normal, but no new tasks will be allowed in until the total number of tasks drops below the new total_tokens.
- class trio.Semaphore(initial_value, *, max_value=None)
基类:
AsyncContextManagerMixin
A semaphore.
A semaphore holds an integer value, which can be incremented by calling
release()
and decremented by callingacquire()
– but the value is never allowed to drop below zero. If the value is zero, thenacquire()
will block until someone callsrelease()
.If you're looking for a
Semaphore
to limit the number of tasks that can access some resource simultaneously, then consider using aCapacityLimiter
instead.This object's interface is similar to, but different from, that of
threading.Semaphore
.A
Semaphore
object can be used as an async context manager; it blocks on entry but not on exit.- 参数:
- await acquire()
Decrement the semaphore value, blocking if necessary to avoid letting it drop below zero.
- 返回类型:
- acquire_nowait()
Attempt to decrement the semaphore value, without blocking.
- 抛出:
WouldBlock -- if the value is zero.
- 返回类型:
- release()
Increment the semaphore value, possibly waking a task blocked in
acquire()
.- 抛出:
ValueError -- if incrementing the value would cause it to exceed
max_value
.- 返回类型:
- statistics()
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
ParkingLotStatistics
tasks_waiting
: The number of tasks blocked on this semaphore'sacquire()
method.
- property value: int
The current value of the semaphore.
- class trio.Lock
基类:
_LockImpl
A classic mutex.
This is a non-reentrant, single-owner lock. Unlike
threading.Lock
, only the owner of the lock is allowed to release it.A
Lock
object can be used as an async context manager; it blocks on entry but not on exit.- await acquire()
Acquire the lock, blocking if necessary.
- 抛出:
BrokenResourceError -- if the owner of the lock exits without releasing.
- 返回类型:
- acquire_nowait()
Attempt to acquire the lock, without blocking.
- 抛出:
WouldBlock -- if the lock is held.
- 返回类型:
- locked()
Check whether the lock is currently held.
- 返回:
True if the lock is held, False otherwise.
- 返回类型:
- release()
Release the lock.
- 抛出:
RuntimeError -- if the calling task does not hold the lock.
- 返回类型:
- statistics()
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
LockStatistics
locked
: boolean indicating whether the lock is held.owner
: thetrio.lowlevel.Task
currently holding the lock, or None if the lock is not held.tasks_waiting
: The number of tasks blocked on this lock'sacquire()
method.
- class trio.StrictFIFOLock
基类:
_LockImpl
A variant of
Lock
where tasks are guaranteed to acquire the lock in strict first-come-first-served order.An example of when this is useful is if you're implementing something like
trio.SSLStream
or an HTTP/2 server using h2, where you have multiple concurrent tasks that are interacting with a shared state machine, and at unpredictable moments the state machine requests that a chunk of data be sent over the network. (For example, when using h2 simply reading incoming data can occasionally create outgoing data to send.) The challenge is to make sure that these chunks are sent in the correct order, without being garbled.One option would be to use a regular
Lock
, and wrap it around every interaction with the state machine:# This approach is sometimes workable but often sub-optimal; see below async with lock: state_machine.do_something() if state_machine.has_data_to_send(): await conn.sendall(state_machine.get_data_to_send())
But this can be problematic. If you're using h2 then usually reading incoming data doesn't create the need to send any data, so we don't want to force every task that tries to read from the network to sit and wait a potentially long time for
sendall
to finish. And in some situations this could even potentially cause a deadlock, if the remote peer is waiting for you to read some data before it accepts the data you're sending.StrictFIFOLock
provides an alternative. We can rewrite our example like:# Note: no awaits between when we start using the state machine and # when we block to take the lock! state_machine.do_something() if state_machine.has_data_to_send(): # Notice that we fetch the data to send out of the state machine # *before* sleeping, so that other tasks won't see it. chunk = state_machine.get_data_to_send() async with strict_fifo_lock: await conn.sendall(chunk)
First we do all our interaction with the state machine in a single scheduling quantum (notice there are no
await
s in there), so it's automatically atomic with respect to other tasks. And then if and only if we have data to send, we get in line to send it – andStrictFIFOLock
guarantees that each task will send its data in the same order that the state machine generated it.Currently,
StrictFIFOLock
is identical toLock
, but (a) this may not always be true in the future, especially if Trio ever implements more sophisticated scheduling policies, and (b) the above code is relying on a pretty subtle property of its lock. Using aStrictFIFOLock
acts as an executable reminder that you're relying on this property.
- class trio.Condition(lock=None)
基类:
AsyncContextManagerMixin
A classic condition variable, similar to
threading.Condition
.A
Condition
object can be used as an async context manager to acquire the underlying lock; it blocks on entry but not on exit.- 参数:
lock (Lock) -- the lock object to use. If given, must be a
trio.Lock
. If None, a newLock
will be allocated and used.
- await acquire()
Acquire the underlying lock, blocking if necessary.
- 抛出:
BrokenResourceError -- if the owner of the underlying lock exits without releasing.
- 返回类型:
- acquire_nowait()
Attempt to acquire the underlying lock, without blocking.
- 抛出:
WouldBlock -- if the lock is currently held.
- 返回类型:
- locked()
Check whether the underlying lock is currently held.
- 返回:
True if the lock is held, False otherwise.
- 返回类型:
- notify(n=1)
Wake one or more tasks that are blocked in
wait()
.- 参数:
n (int) -- The number of tasks to wake.
- 抛出:
RuntimeError -- if the calling task does not hold the lock.
- 返回类型:
- notify_all()
Wake all tasks that are currently blocked in
wait()
.- 抛出:
RuntimeError -- if the calling task does not hold the lock.
- 返回类型:
- release()
Release the underlying lock.
- 返回类型:
- statistics()
Return an object containing debugging information.
Currently the following fields are defined: :rtype:
ConditionStatistics
tasks_waiting
: The number of tasks blocked on this condition'swait()
method.lock_statistics
: The result of calling the underlyingLock
sstatistics()
method.
- await wait()
Wait for another task to call
notify()
ornotify_all()
.When calling this method, you must hold the lock. It releases the lock while waiting, and then re-acquires it before waking up.
There is a subtlety with how this method interacts with cancellation: when cancelled it will block to re-acquire the lock before raising
Cancelled
. This may cause cancellation to be less prompt than expected. The advantage is that it makes code like this work:async with condition: await condition.wait()
If we didn't re-acquire the lock before waking up, and
wait()
were cancelled here, then we'd crash incondition.__aexit__
when we tried to release the lock we no longer held.- 抛出:
RuntimeError -- if the calling task does not hold the lock.
BrokenResourceError -- if the owner of the lock exits without releasing, when attempting to re-acquire.
- 返回类型:
These primitives return statistics objects that can be inspected.
- class trio.CapacityLimiterStatistics(borrowed_tokens, total_tokens, borrowers, tasks_waiting)
基类:
object
An object containing debugging information.
Currently the following fields are defined:
borrowed_tokens
: The number of tokens currently borrowed from the sack.total_tokens
: The total number of tokens in the sack. Usually this will be larger thanborrowed_tokens
, but it's possibly for it to be smaller iftrio.CapacityLimiter.total_tokens
was recently decreased.borrowers
: A list of all tasks or other entities that currently hold a token.tasks_waiting
: The number of tasks blocked on thisCapacityLimiter
'strio.CapacityLimiter.acquire()
ortrio.CapacityLimiter.acquire_on_behalf_of()
methods.
- class trio.LockStatistics(locked, owner, tasks_waiting)
基类:
object
An object containing debugging information for a Lock.
Currently the following fields are defined:
locked
(boolean): indicating whether the lock is held.owner
: thetrio.lowlevel.Task
currently holding the lock, or None if the lock is not held.tasks_waiting
(int): The number of tasks blocked on this lock'strio.Lock.acquire()
method.
- class trio.ConditionStatistics(tasks_waiting, lock_statistics)
基类:
object
An object containing debugging information for a Condition.
Currently the following fields are defined:
tasks_waiting
(int): The number of tasks blocked on this condition'strio.Condition.wait()
method.lock_statistics
: The result of calling the underlyingLock
sstatistics()
method.