celery.worker.consumer

Worker consumer.

class celery.worker.consumer.Agent(c, **kwargs)[源代码]

Agent starts https://pypi.org/project/cell/ actors.

conditional = True
create(c)[源代码]

Create the step.

name = 'celery.worker.consumer.agent.Agent'
requires = (step:celery.worker.consumer.connection.Connection{()},)
class celery.worker.consumer.Connection(c, **kwargs)[源代码]

Service managing the consumer broker connection.

info(c)[源代码]
name = 'celery.worker.consumer.connection.Connection'
shutdown(c)[源代码]
start(c)[源代码]
class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)[源代码]

Consumer blueprint.

class Blueprint(steps=None, name=None, on_start=None, on_close=None, on_stopped=None)[源代码]

Consumer blueprint.

default_steps = ['celery.worker.consumer.connection:Connection', 'celery.worker.consumer.mingle:Mingle', 'celery.worker.consumer.events:Events', 'celery.worker.consumer.gossip:Gossip', 'celery.worker.consumer.heart:Heart', 'celery.worker.consumer.control:Control', 'celery.worker.consumer.tasks:Tasks', 'celery.worker.consumer.delayed_delivery:DelayedDelivery', 'celery.worker.consumer.consumer:Evloop', 'celery.worker.consumer.agent:Agent']
name = 'Consumer'
shutdown(parent)[源代码]
Strategies

dict 的别名

add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, **options)[源代码]
apply_eta_task(task)[源代码]

Method called by the timer to apply a task with an ETA/countdown.

bucket_for_task(type)[源代码]
call_soon(p, *args, **kwargs)[源代码]
cancel_all_unacked_requests()[源代码]

Cancel all active requests that either do not require late acknowledgments or, if they do, have not been acknowledged yet.

cancel_task_queue(queue)[源代码]
connect()[源代码]

Establish the broker connection used for consuming tasks.

Retries establishing the connection if the broker_connection_retry setting is enabled

connection_for_read(heartbeat=None)[源代码]
connection_for_write(url=None, heartbeat=None)[源代码]
create_task_handler(promise=<class 'vine.promises.promise'>)[源代码]
ensure_connected(conn)[源代码]
first_connection_attempt = True

This flag will be turned off after the first failed connection attempt.

init_callback = None

Optional callback called the first time the worker is ready to receive tasks.

loop_args()[源代码]
property max_prefetch_count
on_close()[源代码]
on_connection_error_after_connected(exc)[源代码]
on_connection_error_before_connected(exc)[源代码]
on_decode_error(message, exc)[源代码]

Callback called if an error occurs while decoding a message.

Simply logs the error and acknowledges the message so it doesn't enter a loop.

参数:
  • message (kombu.Message) -- The message received.

  • exc (Exception) -- The exception being handled.

on_invalid_task(body, message, exc)[源代码]
on_ready()[源代码]
on_send_event_buffered()[源代码]
on_unknown_message(body, message)[源代码]
on_unknown_task(body, message, exc)[源代码]
perform_pending_operations()[源代码]
pool = None

The current worker pool instance.

register_with_event_loop(hub)[源代码]
reset_rate_limits()[源代码]
restart_count = -1
shutdown()[源代码]
start()[源代码]
stop()[源代码]
timer = None

A timer used for high-priority internal tasks, such as sending heartbeats.

update_strategies()[源代码]
class celery.worker.consumer.Control(c, **kwargs)[源代码]

Remote control command service.

include_if(c)[源代码]

Return true if bootstep should be included.

You can define this as an optional predicate that decides whether this step should be created.

name = 'celery.worker.consumer.control.Control'
requires = (step:celery.worker.consumer.tasks.Tasks{(step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)},)
class celery.worker.consumer.Events(c, task_events=True, without_heartbeat=False, without_gossip=False, **kwargs)[源代码]

Service used for sending monitoring events.

name = 'celery.worker.consumer.events.Events'
requires = (step:celery.worker.consumer.connection.Connection{()},)
shutdown(c)[源代码]
start(c)[源代码]
stop(c)[源代码]
class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, heartbeat_interval=2.0, **kwargs)[源代码]

Bootstep consuming events from other workers.

This keeps the logical clock value up to date.

call_task(task)[源代码]
compatible_transport(app)[源代码]
compatible_transports = {'amqp', 'redis'}
election(id, topic, action=None)[源代码]
get_consumers(channel)[源代码]
label = 'Gossip'
name = 'celery.worker.consumer.gossip.Gossip'
on_elect(event)[源代码]
on_elect_ack(event)[源代码]
on_message(prepare, message)[源代码]
on_node_join(worker)[源代码]
on_node_leave(worker)[源代码]
on_node_lost(worker)[源代码]
periodic()[源代码]
register_timer()[源代码]
requires = (step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)
start(c)[源代码]
class celery.worker.consumer.Heart(c, without_heartbeat=False, heartbeat_interval=None, **kwargs)[源代码]

Bootstep sending event heartbeats.

This service sends a worker-heartbeat message every n seconds.

备注

Not to be confused with AMQP protocol level heartbeats.

name = 'celery.worker.consumer.heart.Heart'
requires = (step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)
shutdown(c)
start(c)[源代码]
stop(c)[源代码]
class celery.worker.consumer.Mingle(c, without_mingle=False, **kwargs)[源代码]

Bootstep syncing state with neighbor workers.

At startup, or upon consumer restart, this will:

  • Sync logical clocks.

  • Sync revoked tasks.

compatible_transport(app)[源代码]
compatible_transports = {'amqp', 'gcpubsub', 'redis'}
label = 'Mingle'
name = 'celery.worker.consumer.mingle.Mingle'
on_clock_event(c, clock)[源代码]
on_node_reply(c, nodename, reply)[源代码]
on_revoked_received(c, revoked)[源代码]
requires = (step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)
send_hello(c)[源代码]
start(c)[源代码]
sync(c)[源代码]
sync_with_node(c, clock=None, revoked=None, **kwargs)[源代码]
class celery.worker.consumer.Tasks(c, **kwargs)[源代码]

Bootstep starting the task message consumer.

info(c)[源代码]

Return task consumer info.

name = 'celery.worker.consumer.tasks.Tasks'
qos_global(c) bool[源代码]

Determine if global QoS should be applied.

Additional information:

https://www.rabbitmq.com/docs/consumer-prefetch https://www.rabbitmq.com/docs/quorum-queues#global-qos

requires = (step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)
shutdown(c)[源代码]

Shutdown task consumer.

start(c)[源代码]

Start task consumer.

stop(c)[源代码]

Stop task consumer.