Sending/Receiving Messages (Kombu integration).

AMQP

class celery.app.amqp.AMQP(app)[源代码]

App AMQP API: app.amqp.

Connection

Broker connection class used. Default is kombu.Connection.

Consumer

Base Consumer class used. Default is kombu.Consumer.

Producer

Base Producer class used. Default is kombu.Producer.

queues

All currently defined task queues (a Queues instance).

argsrepr_maxsize

Max size of positional argument representation used for logging purposes. Default is 1024.

kwargsrepr_maxsize

Max size of keyword argument representation used for logging purposes. Default is 1024.

Queues(queues, create_missing=None, autoexchange=None, max_priority=None)[源代码]
Router(queues=None, create_missing=None)[源代码]

Return the current task router.

flush_routes()[源代码]
create_task_message
send_task_message
default_queue
default_exchange
producer_pool
router
routes

Queues

class celery.app.amqp.Queues(queues=None, default_exchange=None, create_missing=True, autoexchange=None, max_priority=None, default_routing_key=None)[源代码]

Queue name⇒ declaration mapping.

参数:
  • queues (Iterable) -- Initial list/tuple or dict of queues.

  • create_missing (bool) -- By default any unknown queues will be added automatically, but if this flag is disabled the occurrence of unknown queues in wanted will raise KeyError.

  • max_priority (int) -- Default x-max-priority for queues with none set.

add(queue, **kwargs)[源代码]

Add new queue.

The first argument can either be a kombu.Queue instance, or the name of a queue. If the former the rest of the keyword arguments are ignored, and options are simply taken from the queue instance.

参数:
  • queue (kombu.Queue, str) -- Queue to add.

  • exchange (kombu.Exchange, str) -- if queue is str, specifies exchange name.

  • routing_key (str) -- if queue is str, specifies binding key.

  • exchange_type (str) -- if queue is str, specifies type of exchange.

  • **options (Any) -- Additional declaration options used when queue is a str.

add_compat(name, **options)[源代码]
property consume_from
deselect(exclude)[源代码]

Deselect queues so that they won't be consumed from.

参数:

exclude (Sequence[str], str) -- Names of queues to avoid consuming from.

format(indent=0, indent_first=True)[源代码]

Format routing table into string for log dumps.

new_missing(name)[源代码]
select(include)[源代码]

Select a subset of currently defined queues to consume from.

参数:

include (Sequence[str], str) -- Names of queues to consume from.

select_add(queue, **kwargs)[源代码]

Add new task queue that'll be consumed from.

The queue will be active even when a subset has been selected using the celery worker -Q option.