API 参考

aio_pika.AMQPException

AMQPError 的别名

class aio_pika.Channel(connection: AbstractConnection, channel_number: int | None = None, publisher_confirms: bool = True, on_return_raises: bool = False)[源代码]

Channel abstraction

参数:
  • connection -- aio_pika.adapter.AsyncioConnection instance

  • loop -- Event loop (asyncio.get_event_loop() when None)

  • future_store -- aio_pika.common.FutureStore instance

  • publisher_confirms -- False if you don't need delivery confirmations (in pursuit of performance)

EXCHANGE_CLASS

Exchange 的别名

QUEUE_CLASS

Queue 的别名

async declare_exchange(name: str, type: ExchangeType | str = ExchangeType.DIRECT, *, durable: bool = False, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) AbstractExchange[源代码]

Declare an exchange.

参数:
  • name -- string with exchange name or aio_pika.exchange.Exchange instance

  • type -- Exchange type. Enum ExchangeType value or string. String values must be one of 'fanout', 'direct', 'topic', 'headers', 'x-delayed-message', 'x-consistent-hash'.

  • durable -- Durability (exchange survive broker restart)

  • auto_delete -- Delete queue when channel will be closed.

  • internal -- Do not send it to broker just create an object

  • passive -- Do not fail when entity was declared previously but has another params. Raises aio_pika.exceptions.ChannelClosed when exchange doesn't exist.

  • arguments -- additional arguments

  • timeout -- execution timeout

返回:

aio_pika.exchange.Exchange instance

async declare_queue(name: str | None = None, *, durable: bool = False, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) AbstractQueue[源代码]
参数:
  • name -- queue name

  • durable -- Durability (queue survive broker restart)

  • exclusive -- Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

  • passive -- Do not fail when entity was declared previously but has another params. Raises aio_pika.exceptions.ChannelClosed when queue doesn't exist.

  • auto_delete -- Delete queue when channel will be closed.

  • arguments -- additional arguments

  • timeout -- execution timeout

返回:

aio_pika.queue.Queue instance

Raises:

aio_pika.exceptions.ChannelClosed instance

async get_exchange(name: str, *, ensure: bool = True) AbstractExchange[源代码]

With ensure=True, it's a shortcut for .declare_exchange(..., passive=True); otherwise, it returns an exchange instance without checking its existence.

When the exchange does not exist, if ensure=True, will raise aio_pika.exceptions.ChannelClosed.

Use this method in a separate channel (or as soon as channel created). This is only a way to get an exchange without declaring a new one.

参数:
  • name -- exchange name

  • ensure -- ensure that the exchange exists

返回:

aio_pika.exchange.Exchange instance

Raises:

aio_pika.exceptions.ChannelClosed instance

async get_queue(name: str, *, ensure: bool = True) AbstractQueue[源代码]

With ensure=True, it's a shortcut for .declare_queue(..., passive=True); otherwise, it returns a queue instance without checking its existence.

When the queue does not exist, if ensure=True, will raise aio_pika.exceptions.ChannelClosed.

Use this method in a separate channel (or as soon as channel created). This is only a way to get a queue without declaring a new one.

参数:
  • name -- queue name

  • ensure -- ensure that the queue exists

返回:

aio_pika.queue.Queue instance

Raises:

aio_pika.exceptions.ChannelClosed instance

property is_closed: bool

Returns True when the channel has been closed from the broker side or after the close() method has been called.

property is_initialized: bool

Returns True when the channel has been opened and ready for interaction

class aio_pika.Connection(url: URL, loop: AbstractEventLoop | None = None, ssl_context: SSLContext | None = None, **kwargs: Any)[源代码]

Connection abstraction

CHANNEL_CLASS

Channel 的别名

channel(channel_number: int | None = None, publisher_confirms: bool = True, on_return_raises: bool = False) AbstractChannel[源代码]

Coroutine which returns new instance of Channel.

Example:

import aio_pika

async def main(loop):
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

    channel1 = connection.channel()
    await channel1.close()

    # Creates channel with specific channel number
    channel42 = connection.channel(42)
    await channel42.close()

    # For working with transactions
    channel_no_confirms = await connection.channel(
        publisher_confirms=False
    )
    await channel_no_confirms.close()

Also available as an asynchronous context manager:

import aio_pika

async def main(loop):
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

    async with connection.channel() as channel:
        # channel is open and available

    # channel is now closed
参数:
  • channel_number -- specify the channel number explicit

  • publisher_confirms -- if True the aio_pika.Exchange.publish() method will be return bool after publish is complete. Otherwise the aio_pika.Exchange.publish() method will be return None

  • on_return_raises -- raise an aio_pika.exceptions.DeliveryError when mandatory message will be returned

async connect(timeout: float | int | None = None) None[源代码]

Connect to AMQP server. This method should be called after aio_pika.connection.Connection.__init__()

备注

This method is called by connect(). You shouldn't call it explicitly.

class aio_pika.DeliveryMode(value)[源代码]

An enumeration.

class aio_pika.Exchange(channel: AbstractChannel, name: str, type: ExchangeType | str = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Dict[str, FieldValue] | None = None)[源代码]

Exchange abstraction

async bind(exchange: AbstractExchange | str, routing_key: str = '', *, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) BindOk[源代码]

A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.

Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we're going to call it a binding key.

client = await connect()

routing_key = 'simple_routing_key'
src_exchange_name = "source_exchange"
dest_exchange_name = "destination_exchange"

channel = await client.channel()
src_exchange = await channel.declare_exchange(
    src_exchange_name, auto_delete=True
)
dest_exchange = await channel.declare_exchange(
    dest_exchange_name, auto_delete=True
)
queue = await channel.declare_queue(auto_delete=True)

await queue.bind(dest_exchange, routing_key)
await dest_exchange.bind(src_exchange, routing_key)
参数:
  • exchange -- aio_pika.exchange.Exchange instance

  • routing_key -- routing key

  • arguments -- additional arguments

  • timeout -- execution timeout

返回:

None

async delete(if_unused: bool = False, timeout: float | int | None = None) DeleteOk[源代码]

Delete the queue

参数:
  • timeout -- operation timeout

  • if_unused -- perform deletion when queue has no bindings.

async publish(message: AbstractMessage, routing_key: str, *, mandatory: bool = True, immediate: bool = False, timeout: float | int | None = None) Ack | Nack | Reject | None[源代码]

Publish the message to the queue. aio-pika uses publisher confirms extension for message delivery.

async unbind(exchange: AbstractExchange | str, routing_key: str = '', arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) UnbindOk[源代码]

Remove exchange-to-exchange binding for this Exchange instance

参数:
  • exchange -- aio_pika.exchange.Exchange instance

  • routing_key -- routing key

  • arguments -- additional arguments

  • timeout -- execution timeout

返回:

None

class aio_pika.ExchangeType(value)[源代码]

An enumeration.

class aio_pika.IncomingMessage(message: DeliveredMessage, no_ack: bool = False)[源代码]

Incoming message is seems like Message but has additional methods for message acknowledgement.

Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit ("manual") client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:

  • basic.ack is used for positive acknowledgements

  • basic.nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1)

  • basic.reject is used for negative acknowledgements but has one limitation compared to basic.nack

Positive acknowledgements simply instruct RabbitMQ to record a message as delivered. Negative acknowledgements with basic.reject have the same effect. The difference is primarily in the semantics: positive acknowledgements assume a message was successfully processed while their negative counterpart suggests that a delivery wasn't processed but still should be deleted.

Create an instance of IncomingMessage

async ack(multiple: bool = False) None[源代码]

Send basic.ack is used for positive acknowledgements

备注

This method looks like a blocking-method, but actually it just sends bytes to the socket and doesn't require any responses from the broker.

参数:

multiple -- If set to True, the message's delivery tag is treated as "up to and including", so that multiple messages can be acknowledged with a single method. If set to False, the ack refers to a single message.

返回:

None

info() MessageInfo[源代码]

Method returns dict representation of the message

process(requeue: bool = False, reject_on_redelivered: bool = False, ignore_processed: bool = False) AbstractProcessContext[源代码]

Context manager for processing the message

>>> async def on_message_received(message: IncomingMessage):
...    async with message.process():
...        # When exception will be raised
...        # the message will be rejected
...        print(message.body)

Example with ignore_processed=True

>>> async def on_message_received(message: IncomingMessage):
...    async with message.process(ignore_processed=True):
...        # Now (with ignore_processed=True) you may reject
...        # (or ack) message manually too
...        if True:  # some reasonable condition here
...            await message.reject()
...        print(message.body)
参数:
  • requeue -- Requeue message when exception.

  • reject_on_redelivered -- When True message will be rejected only when message was redelivered.

  • ignore_processed -- Do nothing if message already processed

async reject(requeue: bool = False) None[源代码]

When requeue=True the message will be returned to queue. Otherwise, message will be dropped.

备注

This method looks like a blocking-method, but actually it just sends bytes to the socket and doesn't require any responses from the broker.

参数:

requeue -- bool

class aio_pika.Message(body: bytes, *, headers: Dict[str, bool | bytes | bytearray | Decimal | List[bool | bytes | bytearray | Decimal | List[FieldValue] | Dict[str, FieldValue] | float | int | None | str | datetime] | Dict[str, bool | bytes | bytearray | Decimal | List[FieldValue] | Dict[str, FieldValue] | float | int | None | str | datetime] | float | int | None | str | datetime] | None = None, content_type: str | None = None, content_encoding: str | None = None, delivery_mode: DeliveryMode | int | None = None, priority: int | None = None, correlation_id: str | None = None, reply_to: str | None = None, expiration: int | datetime | float | timedelta | None = None, message_id: str | None = None, timestamp: int | datetime | float | timedelta | None = None, type: str | None = None, user_id: str | None = None, app_id: str | None = None)[源代码]

AMQP message abstraction

Creates a new instance of Message

参数:
  • body -- message body

  • headers -- message headers

  • content_type -- content type

  • content_encoding -- content encoding

  • delivery_mode -- delivery mode

  • priority -- priority

  • correlation_id -- correlation id

  • reply_to -- reply to

  • expiration -- expiration in seconds (or datetime or timedelta)

  • message_id -- message id

  • timestamp -- timestamp

  • type -- type

  • user_id -- user id

  • app_id -- app id

lock() None[源代码]

Set lock flag to True

property locked: bool

is message locked

返回:

bool

property properties: Properties

Build aiormq.spec.Basic.Properties object

exception aio_pika.MessageProcessError[源代码]
class aio_pika.Queue(channel: AbstractChannel, name: str | None, durable: bool, exclusive: bool, auto_delete: bool, arguments: Dict[str, FieldValue] | None, passive: bool = False)[源代码]

AMQP queue abstraction

async bind(exchange: AbstractExchange | str, routing_key: str | None = None, *, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) BindOk[源代码]

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we're going to call it a binding key.

参数:
  • exchange -- aio_pika.exchange.Exchange instance

  • routing_key -- routing key

  • arguments -- additional arguments

  • timeout -- execution timeout

抛出:

asyncio.TimeoutError -- when the binding timeout period has elapsed.

返回:

None

async cancel(consumer_tag: str, timeout: float | int | None = None, nowait: bool = False) CancelOk[源代码]

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.

参数:
  • consumer_tag -- consumer tag returned by consume()

  • timeout -- execution timeout

  • nowait (bool) -- Do not expect a Basic.CancelOk response

返回:

Basic.CancelOk when operation completed successfully

async consume(callback: Callable[[AbstractIncomingMessage], Awaitable[Any]], no_ack: bool = False, exclusive: bool = False, arguments: Dict[str, FieldValue] | None = None, consumer_tag: str | None = None, timeout: float | int | None = None) str[源代码]

Start to consuming the Queue.

参数:
  • timeout -- asyncio.TimeoutError will be raises when the Future was not finished after this time.

  • callback -- Consuming callback. Should be a coroutine function.

  • no_ack -- if True you don't need to call aio_pika.message.IncomingMessage.ack()

  • exclusive -- Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

  • arguments -- additional arguments

  • consumer_tag -- optional consumer tag

抛出:

asyncio.TimeoutError -- when the consuming timeout period has elapsed.

Return str:

consumer tag str

async declare(timeout: float | int | None = None) DeclareOk[源代码]

Declare queue.

参数:
  • timeout -- execution timeout

  • passive -- Only check to see if the queue exists.

返回:

None

async delete(*, if_unused: bool = True, if_empty: bool = True, timeout: float | int | None = None) DeleteOk[源代码]

Delete the queue.

参数:
  • if_unused -- Perform delete only when unused

  • if_empty -- Perform delete only when empty

  • timeout -- execution timeout

返回:

None

async get(*, no_ack: bool = False, fail: Literal[True] = True, timeout: float | int | None = 5) IncomingMessage[源代码]
async get(*, no_ack: bool = False, fail: Literal[False] = True, timeout: float | int | None = 5) IncomingMessage | None

Get message from the queue.

参数:
  • no_ack -- if True you don't need to call aio_pika.message.IncomingMessage.ack()

  • timeout -- execution timeout

  • fail -- Should return None instead of raise an exception aio_pika.exceptions.QueueEmpty.

返回:

aio_pika.message.IncomingMessage

iterator(**kwargs: Any) AbstractQueueIterator[源代码]

Returns an iterator for async for expression.

Full example:

import aio_pika

async def main():
    connection = await aio_pika.connect()

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue('test')

        async with queue.iterator() as q:
            async for message in q:
                print(message.body)

When your program runs with run_forever the iterator will be closed in background. In this case the context processor for iterator might be skipped and the queue might be used in the "async for" expression directly.

import aio_pika

async def main():
    connection = await aio_pika.connect()

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue('test')

        async for message in queue:
            print(message.body)
返回:

QueueIterator

async purge(no_wait: bool = False, timeout: float | int | None = None) PurgeOk[源代码]

Purge all messages from the queue.

参数:
  • no_wait -- no wait response

  • timeout -- execution timeout

返回:

None

async unbind(exchange: AbstractExchange | str, routing_key: str | None = None, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) UnbindOk[源代码]

Remove binding from exchange for this Queue instance

参数:
  • exchange -- aio_pika.exchange.Exchange instance

  • routing_key -- routing key

  • arguments -- additional arguments

  • timeout -- execution timeout

抛出:

asyncio.TimeoutError -- when the unbinding timeout period has elapsed.

返回:

None

class aio_pika.RobustChannel(connection: AbstractConnection, channel_number: int | None = None, publisher_confirms: bool = True, on_return_raises: bool = False)[源代码]

Channel abstraction

参数:
  • connection -- aio_pika.adapter.AsyncioConnection instance

  • loop -- Event loop (asyncio.get_event_loop() when None)

  • future_store -- aio_pika.common.FutureStore instance

  • publisher_confirms -- False if you don't need delivery confirmations (in pursuit of performance)

EXCHANGE_CLASS

RobustExchange 的别名

QUEUE_CLASS

RobustQueue 的别名

async declare_exchange(name: str, type: ExchangeType | str = ExchangeType.DIRECT, durable: bool = False, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: Dict[str, Any] | None = None, timeout: float | int | None = None, robust: bool = True) AbstractRobustExchange[源代码]

Declare an exchange.

参数:
  • name -- string with exchange name or aio_pika.exchange.Exchange instance

  • type -- Exchange type. Enum ExchangeType value or string. String values must be one of 'fanout', 'direct', 'topic', 'headers', 'x-delayed-message', 'x-consistent-hash'.

  • durable -- Durability (exchange survive broker restart)

  • auto_delete -- Delete queue when channel will be closed.

  • internal -- Do not send it to broker just create an object

  • passive -- Do not fail when entity was declared previously but has another params. Raises aio_pika.exceptions.ChannelClosed when exchange doesn't exist.

  • arguments -- additional arguments

  • timeout -- execution timeout

返回:

aio_pika.exchange.Exchange instance

async declare_queue(name: str | None = None, *, durable: bool = False, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: Dict[str, Any] | None = None, timeout: float | int | None = None, robust: bool = True) AbstractRobustQueue[源代码]
参数:
  • name -- queue name

  • durable -- Durability (queue survive broker restart)

  • exclusive -- Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

  • passive -- Do not fail when entity was declared previously but has another params. Raises aio_pika.exceptions.ChannelClosed when queue doesn't exist.

  • auto_delete -- Delete queue when channel will be closed.

  • arguments -- additional arguments

  • timeout -- execution timeout

返回:

aio_pika.queue.Queue instance

Raises:

aio_pika.exceptions.ChannelClosed instance

class aio_pika.RobustConnection(url: URL, loop: AbstractEventLoop | None = None, **kwargs: Any)[源代码]

Robust connection

CHANNEL_CLASS

RobustChannel 的别名

channel(channel_number: int | None = None, publisher_confirms: bool = True, on_return_raises: bool = False) AbstractRobustChannel[源代码]

Coroutine which returns new instance of Channel.

Example:

import aio_pika

async def main(loop):
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

    channel1 = connection.channel()
    await channel1.close()

    # Creates channel with specific channel number
    channel42 = connection.channel(42)
    await channel42.close()

    # For working with transactions
    channel_no_confirms = await connection.channel(
        publisher_confirms=False
    )
    await channel_no_confirms.close()

Also available as an asynchronous context manager:

import aio_pika

async def main(loop):
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

    async with connection.channel() as channel:
        # channel is open and available

    # channel is now closed
参数:
  • channel_number -- specify the channel number explicit

  • publisher_confirms -- if True the aio_pika.Exchange.publish() method will be return bool after publish is complete. Otherwise the aio_pika.Exchange.publish() method will be return None

  • on_return_raises -- raise an aio_pika.exceptions.DeliveryError when mandatory message will be returned

async connect(timeout: float | int | None = None) None[源代码]

Connect to AMQP server. This method should be called after aio_pika.connection.Connection.__init__()

备注

This method is called by connect(). You shouldn't call it explicitly.

class aio_pika.RobustExchange(channel: AbstractChannel, name: str, type: ExchangeType | str = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Dict[str, FieldValue] | None = None)[源代码]

Exchange abstraction

async bind(exchange: AbstractExchange | str, routing_key: str = '', *, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None, robust: bool = True) BindOk[源代码]

A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.

Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we're going to call it a binding key.

client = await connect()

routing_key = 'simple_routing_key'
src_exchange_name = "source_exchange"
dest_exchange_name = "destination_exchange"

channel = await client.channel()
src_exchange = await channel.declare_exchange(
    src_exchange_name, auto_delete=True
)
dest_exchange = await channel.declare_exchange(
    dest_exchange_name, auto_delete=True
)
queue = await channel.declare_queue(auto_delete=True)

await queue.bind(dest_exchange, routing_key)
await dest_exchange.bind(src_exchange, routing_key)
参数:
  • exchange -- aio_pika.exchange.Exchange instance

  • routing_key -- routing key

  • arguments -- additional arguments

  • timeout -- execution timeout

返回:

None

async unbind(exchange: AbstractExchange | str, routing_key: str = '', arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) UnbindOk[源代码]

Remove exchange-to-exchange binding for this Exchange instance

参数:
  • exchange -- aio_pika.exchange.Exchange instance

  • routing_key -- routing key

  • arguments -- additional arguments

  • timeout -- execution timeout

返回:

None

class aio_pika.RobustQueue(channel: AbstractChannel, name: str | None, durable: bool = False, exclusive: bool = False, auto_delete: bool = False, arguments: Dict[str, FieldValue] | None = None, passive: bool = False)[源代码]
async bind(exchange: AbstractExchange | str, routing_key: str | None = None, *, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None, robust: bool = True) BindOk[源代码]

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we're going to call it a binding key.

参数:
  • exchange -- aio_pika.exchange.Exchange instance

  • routing_key -- routing key

  • arguments -- additional arguments

  • timeout -- execution timeout

抛出:

asyncio.TimeoutError -- when the binding timeout period has elapsed.

返回:

None

async cancel(consumer_tag: str, timeout: float | int | None = None, nowait: bool = False) CancelOk[源代码]

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.

参数:
  • consumer_tag -- consumer tag returned by consume()

  • timeout -- execution timeout

  • nowait (bool) -- Do not expect a Basic.CancelOk response

返回:

Basic.CancelOk when operation completed successfully

async consume(callback: Callable[[AbstractIncomingMessage], Awaitable[Any]], no_ack: bool = False, exclusive: bool = False, arguments: Dict[str, FieldValue] | None = None, consumer_tag: str | None = None, timeout: float | int | None = None, robust: bool = True) str[源代码]

Start to consuming the Queue.

参数:
  • timeout -- asyncio.TimeoutError will be raises when the Future was not finished after this time.

  • callback -- Consuming callback. Should be a coroutine function.

  • no_ack -- if True you don't need to call aio_pika.message.IncomingMessage.ack()

  • exclusive -- Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.

  • arguments -- additional arguments

  • consumer_tag -- optional consumer tag

抛出:

asyncio.TimeoutError -- when the consuming timeout period has elapsed.

Return str:

consumer tag str

iterator(**kwargs: Any) AbstractQueueIterator[源代码]

Returns an iterator for async for expression.

Full example:

import aio_pika

async def main():
    connection = await aio_pika.connect()

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue('test')

        async with queue.iterator() as q:
            async for message in q:
                print(message.body)

When your program runs with run_forever the iterator will be closed in background. In this case the context processor for iterator might be skipped and the queue might be used in the "async for" expression directly.

import aio_pika

async def main():
    connection = await aio_pika.connect()

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue('test')

        async for message in queue:
            print(message.body)
返回:

QueueIterator

async unbind(exchange: AbstractExchange | str, routing_key: str | None = None, arguments: Dict[str, FieldValue] | None = None, timeout: float | int | None = None) UnbindOk[源代码]

Remove binding from exchange for this Queue instance

参数:
  • exchange -- aio_pika.exchange.Exchange instance

  • routing_key -- routing key

  • arguments -- additional arguments

  • timeout -- execution timeout

抛出:

asyncio.TimeoutError -- when the unbinding timeout period has elapsed.

返回:

None

async aio_pika.connect(url: str | ~yarl.URL | None = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: ~asyncio.events.AbstractEventLoop | None = None, ssl_options: ~aio_pika.abc.SSLOptions | None = None, ssl_context: ~ssl.SSLContext | None = None, timeout: float | int | None = None, client_properties: ~typing.Dict[str, FieldValue] | None = None, connection_class: ~typing.Type[~aio_pika.abc.AbstractConnection] = <class 'aio_pika.connection.Connection'>, **kwargs: ~typing.Any) AbstractConnection[源代码]

Make connection to the broker.

Example:

import aio_pika

async def main():
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

Connect to localhost with default credentials:

import aio_pika

async def main():
    connection = await aio_pika.connect()

备注

The available keys for ssl_options parameter are:
  • cert_reqs

  • certfile

  • keyfile

  • ssl_version

For an information on what the ssl_options can be set to reference the official Python documentation .

Set connection name for RabbitMQ admin panel:

# As URL parameter method
read_connection = await connect(
    "amqp://guest:guest@localhost/?name=Read%20connection"
)

write_connection = await connect(
    client_properties={
        'connection_name': 'Write connection'
    }
)

URL string might be containing ssl parameters e.g. amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem

参数:
  • client_properties -- add custom client capability.

  • url -- RFC3986 formatted broker address. When None will be used keyword arguments.

  • host -- hostname of the broker

  • port -- broker port 5672 by default

  • login -- username string. 'guest' by default.

  • password -- password string. 'guest' by default.

  • virtualhost -- virtualhost parameter. '/' by default

  • ssl -- use SSL for connection. Should be used with addition kwargs.

  • ssl_options -- A dict of values for the SSL connection.

  • timeout -- connection timeout in seconds

  • loop -- Event loop (asyncio.get_event_loop() when None)

  • ssl_context -- ssl.SSLContext instance

  • connection_class -- Factory of a new connection

  • kwargs -- addition parameters which will be passed to the connection.

返回:

aio_pika.connection.Connection

async aio_pika.connect_robust(url: str | ~yarl.URL | None = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: ~asyncio.events.AbstractEventLoop | None = None, ssl_options: ~aio_pika.abc.SSLOptions | None = None, ssl_context: ~ssl.SSLContext | None = None, timeout: float | int | None = None, client_properties: ~typing.Dict[str, FieldValue] | None = None, connection_class: ~typing.Type[~aio_pika.abc.AbstractRobustConnection] = <class 'aio_pika.robust_connection.RobustConnection'>, **kwargs: ~typing.Any) AbstractRobustConnection[源代码]

Make connection to the broker.

Example:

import aio_pika

async def main():
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

Connect to localhost with default credentials:

import aio_pika

async def main():
    connection = await aio_pika.connect()

备注

The available keys for ssl_options parameter are:
  • cert_reqs

  • certfile

  • keyfile

  • ssl_version

For an information on what the ssl_options can be set to reference the official Python documentation .

Set connection name for RabbitMQ admin panel:

# As URL parameter method
read_connection = await connect(
    "amqp://guest:guest@localhost/?name=Read%20connection"
)

# keyword method
write_connection = await connect(
    client_properties={
        'connection_name': 'Write connection'
    }
)

URL string might contain ssl parameters e.g. amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem

参数:
  • client_properties -- add custom client capability.

  • url -- RFC3986 formatted broker address. When None will be used keyword arguments.

  • host -- hostname of the broker

  • port -- broker port 5672 by default

  • login -- username string. 'guest' by default.

  • password -- password string. 'guest' by default.

  • virtualhost -- virtualhost parameter. '/' by default

  • ssl -- use SSL for connection. Should be used with addition kwargs.

  • ssl_options -- A dict of values for the SSL connection.

  • timeout -- connection timeout in seconds

  • loop -- Event loop (asyncio.get_event_loop() when None)

  • ssl_context -- ssl.SSLContext instance

  • connection_class -- Factory of a new connection

  • kwargs -- addition parameters which will be passed to the connection.

返回:

aio_pika.connection.Connection

aio_pika.patterns.base

<module 'aio_pika.patterns.base' from '/opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/aio_pika/patterns/base.py'> 的别名

class aio_pika.patterns.Master(channel: AbstractChannel, requeue: bool = True, reject_on_redelivered: bool = False)[源代码]

Implements Master/Worker pattern. Usage example:

worker.py

master = Master(channel)
worker = await master.create_worker('test_worker', lambda x: print(x))

master.py

master = Master(channel)
await master.proxy.test_worker('foo')

Creates a new Master instance.

参数:

channel -- Initialized instance of aio_pika.Channel

async create_task(channel_name: str, kwargs: Mapping[str, Any] = mappingproxy({}), **message_kwargs: Any) Ack | Nack | Reject | None[源代码]

Creates a new task for the worker

async create_worker(queue_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Worker[源代码]

Creates a new Worker instance.

deserialize(data: bytes) Any[源代码]

Deserialize data from bytes. Uses pickle by default. You should overlap this method when you want to change serializer

参数:

data -- Data which will be deserialized

返回:

Any

serialize(data: Any) bytes[源代码]

Serialize data to the bytes. Uses pickle by default. You should overlap this method when you want to change serializer

参数:

data -- Data which will be serialized

返回:

bytes

class aio_pika.patterns.Worker(queue: AbstractQueue, consumer_tag: str, loop: AbstractEventLoop)[源代码]
close() Awaitable[None][源代码]

Cancel subscription to the channel

返回:

asyncio.Task

class aio_pika.patterns.RPC(channel: AbstractChannel, host_exceptions: bool = False)[源代码]

Remote Procedure Call helper.

Create an instance

rpc = await RPC.create(channel, host_exceptions=False)

Registering python function

# RPC instance passes only keyword arguments
def multiply(*, x, y):
    return x * y

await rpc.register("multiply", multiply)

Call function through proxy

assert await rpc.proxy.multiply(x=2, y=3) == 6

Call function explicit

assert await rpc.call('multiply', dict(x=2, y=3)) == 6

Show exceptions on remote side

rpc = await RPC.create(channel, host_exceptions=True)
async call(method_name: str, kwargs: Dict[str, Any] | None = None, *, expiration: int | None = None, priority: int = 5, delivery_mode: DeliveryMode = DeliveryMode.NOT_PERSISTENT) Any[源代码]

Call remote method and awaiting result.

参数:
  • method_name -- Name of method

  • kwargs -- Methos kwargs

  • expiration -- If not None messages which staying in queue longer will be returned and asyncio.TimeoutError will be raised.

  • priority -- Message priority

  • delivery_mode -- Call message delivery mode

抛出:
  • asyncio.TimeoutError -- when message expired

  • CancelledError -- when called RPC.cancel()

  • RuntimeError -- internal error

async classmethod create(channel: AbstractChannel, **kwargs: Any) RPC[源代码]

Creates a new instance of aio_pika.patterns.RPC. You should use this method instead of __init__(), because create() returns coroutine and makes async initialize

参数:

channel -- initialized instance of aio_pika.Channel

返回:

RPC

async execute(func: Callable[[...], Awaitable[T]], payload: Dict[str, Any]) T[源代码]

Executes rpc call. Might be overlapped.

async register(method_name: str, func: Callable[[...], Awaitable[T]], **kwargs: Any) Any[源代码]

Method creates a queue with name which equal of method_name argument. Then subscribes this queue.

参数:
  • method_name -- Method name

  • func -- target function. Function MUST accept only keyword arguments.

  • kwargs -- arguments which will be passed to queue_declare

抛出:

RuntimeError -- Function already registered in this RPC instance or method_name already used.

serialize_exception(exception: Exception) Any[源代码]

Make python exception serializable

async unregister(func: Callable[[...], Awaitable[T]]) None[源代码]

Cancels subscription to the method-queue.

参数:

func -- Function