信号¶
Signals
信号(Signals)允许解耦的应用程序在应用的其他部分发生某些操作时接收通知。
Celery 内置了许多信号,您的应用程序可以通过钩入这些信号来增强某些行为。
Signals allow decoupled applications to receive notifications when certain actions occur elsewhere in the application.
Celery ships with many signals that your application can hook into to augment behavior of certain actions.
基础¶
Basics
多种类型的事件都会触发信号,您可以连接这些信号,在其触发时执行相应操作。
以下是连接 after_task_publish 信号的示例:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# 任务信息位于 headers 中(任务消息使用协议版本 2)
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
某些信号还支持按 sender 进行过滤。例如,after_task_publish 信号
使用任务名作为 sender,因此通过为
connect 方法提供 sender 参数,
可以让处理器仅在名称为 "proj.tasks.add" 的任务被发布时被调用:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# 任务信息位于 headers 中(任务消息使用协议版本 2)
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
信号使用与 django.core.dispatch 相同的实现方式。因此,其他关键字参数(如 signal)
默认也会传递给所有信号处理器。
编写信号处理器的最佳实践是接受任意关键字参数(即 **kwargs)。这样做可确保
即使将来的 Celery 版本增加新参数,也不会破坏已有代码。
Several kinds of events trigger signals, you can connect to these signals to perform actions as they trigger.
Example connecting to the after_task_publish signal:
from celery.signals import after_task_publish
@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
Some signals also have a sender you can filter by. For example the
after_task_publish signal uses the task name as a sender, so by
providing the sender argument to
connect you can
connect your handler to be called every time a task with name "proj.tasks.add"
is published:
@after_task_publish.connect(sender='proj.tasks.add')
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
# information about task are located in headers for task messages
# using the task protocol version 2.
info = headers if 'task' in headers else body
print('after_task_publish for task id {info[id]}'.format(
info=info,
))
Signals use the same implementation as django.core.dispatch. As a
result other keyword parameters (e.g., signal) are passed to all signal
handlers by default.
The best practice for signal handlers is to accept arbitrary keyword
arguments (i.e., **kwargs). That way new Celery versions can add additional
arguments without breaking user code.
信号¶
Signals
Task 信号¶
Task Signals
before_task_publish¶
Added in version 3.1.
在任务被发布之前触发该信号。 请注意,这是在发送任务的进程中执行的。
Sender 为被发送的任务名称。
提供的参数包括:
Added in version 3.1.
Dispatched before a task is published. Note that this is executed in the process sending the task.
Sender is the name of the task being sent.
Provides arguments:
bodyTask message body.
This is a mapping containing the task message fields, see 版本 2 and 版本 1 for a reference of possible fields that can be defined.
exchangeName of the exchange to send to or a
Exchangeobject.routing_keyRouting key to use when sending the message.
headersApplication headers mapping (can be modified).
propertiesMessage properties (can be modified)
declareList of entities (
Exchange,Queue, orbindingto declare before publishing the message. Can be modified.retry_policyMapping of retry options. Can be any argument to
kombu.Connection.ensure()and can be modified.
after_task_publish¶
任务被发送到 broker 后将触发该信号。 请注意,这是在发送任务的进程中执行的。
Sender 为被发送的任务名称。
提供的参数包括:
Dispatched when a task has been sent to the broker. Note that this is executed in the process that sent the task.
Sender is the name of the task being sent.
Provides arguments:
task_prerun¶
任务执行前会触发该信号。
Sender 为正在执行的任务对象。
提供的参数包括:
task_id将要执行的任务 ID。
task正在执行的任务对象。
args任务的位置参数。
kwargs任务的关键字参数。
Dispatched before a task is executed.
Sender is the task object being executed.
Provides arguments:
task_idId of the task to be executed.
taskThe task being executed.
argsThe tasks positional arguments.
kwargsThe tasks keyword arguments.
task_postrun¶
任务执行完毕后会触发该信号。
Sender 为已执行的任务对象。
提供的参数包括:
task_id已执行任务的 ID。
task已执行的任务对象。
args任务的位置参数。
kwargs任务的关键字参数。
retval任务的返回值。
state执行结果的状态名称。
Dispatched after a task has been executed.
Sender is the task object executed.
Provides arguments:
task_idId of the task to be executed.
taskThe task being executed.
argsThe tasks positional arguments.
kwargsThe tasks keyword arguments.
retvalThe return value of the task.
stateName of the resulting state.
task_retry¶
任务即将重试时会触发该信号。
Sender 为任务对象。
提供的参数包括:
request当前任务请求对象。
reason重试原因(通常为一个异常实例,但始终可强制转换为
str)。einfo异常的详细信息,包括 traceback (一个
billiard.einfo.ExceptionInfo对象)。
Dispatched when a task will be retried.
Sender is the task object.
Provides arguments:
requestThe current task request.
reasonReason for retry (usually an exception instance, but can always be coerced to
str).einfoDetailed exception information, including traceback (a
billiard.einfo.ExceptionInfoobject).
task_success¶
任务执行成功后会触发该信号。
Sender 为已执行的任务对象。
提供的参数包括:
result任务的返回值。
Dispatched when a task succeeds.
Sender is the task object executed.
Provides arguments
resultReturn value of the task.
task_failure¶
任务执行失败后会触发该信号。
Sender 为已执行的任务对象。
提供的参数包括:
task_id任务 ID。
exception抛出的异常实例。
args调用任务时使用的位置参数。
kwargs调用任务时使用的关键字参数。
traceback异常的堆栈跟踪对象。
einfo一个
billiard.einfo.ExceptionInfo实例。
Dispatched when a task fails.
Sender is the task object executed.
Provides arguments:
task_idId of the task.
exceptionException instance raised.
argsPositional arguments the task was called with.
kwargsKeyword arguments the task was called with.
tracebackStack trace object.
einfoThe
billiard.einfo.ExceptionInfoinstance.
task_internal_error¶
当任务执行期间发生 Celery 内部错误时,会触发该信号。
Sender 为已执行的任务对象。
提供的参数包括:
task_id任务的 ID。
args调用任务时使用的位置参数。
kwargs调用任务时使用的关键字参数。
request原始请求字典。 提供该参数是因为在异常被抛出时,
task.request可能尚未准备好。exception抛出的异常实例。
traceback异常的堆栈跟踪对象。
einfo一个
billiard.einfo.ExceptionInfo实例。
Dispatched when an internal Celery error occurs while executing the task.
Sender is the task object executed.
Provides arguments:
task_idId of the task.
argsPositional arguments the task was called with.
kwargsKeyword arguments the task was called with.
requestThe original request dictionary. This is provided as the
task.requestmay not be ready by the time the exception is raised.exceptionException instance raised.
tracebackStack trace object.
einfoThe
billiard.einfo.ExceptionInfoinstance.
task_received¶
当任务从 broker 接收并准备好执行时,会触发该信号。
Sender 为 consumer 对象。
提供的参数包括:
request这是一个
Request实例,而不是task.request。当使用 prefork 池时,该信号在父进程中触发, 因此无法使用task.request,也不应该使用。请使用该对象, 它们拥有许多相同的字段。
Dispatched when a task is received from the broker and is ready for execution.
Sender is the consumer object.
Provides arguments:
requestThis is a
Requestinstance, and nottask.request. When using the prefork pool this signal is dispatched in the parent process, sotask.requestisn't available and shouldn't be used. Use this object instead, as they share many of the same fields.
task_revoked¶
当任务被 worker 撤销或终止时,会触发该信号。
Sender 为被撤销或终止的任务对象。
提供的参数包括:
request这是一个
Context实例,而不是task.request。当使用 prefork 池时,该信号在父进程中触发, 因此无法使用task.request,也不应该使用。请使用该对象, 它们拥有许多相同的字段。terminated如果任务是被终止的,则为
True。signum用于终止任务的信号编号。如果该值为
None且terminated为True,则应视为收到了TERM信号。expired如果任务已过期,则为
True。
Dispatched when a task is revoked/terminated by the worker.
Sender is the task object revoked/terminated.
Provides arguments:
requestThis is a
Contextinstance, and nottask.request. When using the prefork pool this signal is dispatched in the parent process, sotask.requestisn't available and shouldn't be used. Use this object instead, as they share many of the same fields.terminatedSet to
Trueif the task was terminated.signumSignal number used to terminate the task. If this is
Noneand terminated isTruethenTERMshould be assumed.expiredSet to
Trueif the task expired.
task_unknown¶
当 worker 收到一个未注册任务的消息时,会触发该信号。
Sender 为 worker 的 Consumer 实例。
提供的参数包括:
name未在注册表中找到的任务名称。
id消息中的任务 ID。
message原始消息对象。
exc触发的错误。
Dispatched when a worker receives a message for a task that's not registered.
Sender is the worker Consumer.
Provides arguments:
nameName of task not found in registry.
idThe task id found in the message.
messageRaw message object.
excThe error that occurred.
task_rejected¶
当 worker 收到未知类型的消息,并尝试将其投递到某个任务队列时,会触发该信号。
Sender 为 worker 的 Consumer 实例。
提供的参数包括:
message原始消息对象。
exc触发的错误(如果有)。
Dispatched when a worker receives an unknown type of message to one of its task queues.
Sender is the worker Consumer.
Provides arguments:
messageRaw message object.
excThe error that occurred (if any).
App 信号¶
App Signals
import_modules¶
Worker 信号¶
Worker Signals
celeryd_after_setup¶
该信号在 Worker 实例设置完成之后、调用 run 方法之前发送。这意味着通过 celery worker -Q 选项启用的队列已生效,日志系统也已设置完成,等等。
可用于添加自定义队列,这些队列将始终被消费,而不受 celery worker -Q 选项限制。以下是一个示例,它为每个 Worker 设置了一个直连队列(direct queue),随后可以将任务路由到指定的 Worker:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender 是该 Worker 的节点名称(nodename)
instance.app.amqp.queues.select_add(queue_name)
提供的参数有:
senderWorker 的节点名称。
instance即将初始化的
celery.apps.worker.Worker实例。 注意,此时仅设置了app与hostname`(节点名)属性,其余的 ``__init__`过程尚未执行。conf当前应用的配置对象。
This signal is sent after the worker instance is set up, but before it
calls run. This means that any queues from the celery worker -Q
option is enabled, logging has been set up and so on.
It can be used to add custom queues that should always be consumed
from, disregarding the celery worker -Q option. Here's an example
that sets up a direct queue for each worker, these queues can then be
used to route a task to any specific worker:
from celery.signals import celeryd_after_setup
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
queue_name = '{0}.dq'.format(sender) # sender is the nodename of the worker
instance.app.amqp.queues.select_add(queue_name)
Provides arguments:
senderNode name of the worker.
instanceThis is the
celery.apps.worker.Workerinstance to be initialized. Note that only theappandhostname(nodename) attributes have been set so far, and the rest of__init__hasn't been executed.confThe configuration of the current app.
celeryd_init¶
这是 celery worker 启动时发送的第一个信号。
sender 是 Worker 的主机名,因此可以使用该信号进行特定 Worker 的配置:
from celery.signals import celeryd_init
@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
conf.task_default_rate_limit = '10/m'
若要配置多个 Worker,可在连接时省略指定 sender:
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
if sender in ('worker1@example.com', 'worker2@example.com'):
conf.task_default_rate_limit = '10/m'
if sender == 'worker3@example.com':
conf.worker_prefetch_multiplier = 0
提供的参数有:
senderWorker 的节点名称。
instance即将初始化的
celery.apps.worker.Worker实例。 注意,此时仅设置了app与hostname`(节点名)属性,其余的 ``__init__`过程尚未执行。conf当前应用的配置对象。
options通过命令行传递给 Worker 的选项(包括默认值)。
This is the first signal sent when celery worker starts up.
The sender is the host name of the worker, so this signal can be used
to setup worker specific configuration:
from celery.signals import celeryd_init
@celeryd_init.connect(sender='worker12@example.com')
def configure_worker12(conf=None, **kwargs):
conf.task_default_rate_limit = '10/m'
or to set up configuration for multiple workers you can omit specifying a sender when you connect:
from celery.signals import celeryd_init
@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
if sender in ('worker1@example.com', 'worker2@example.com'):
conf.task_default_rate_limit = '10/m'
if sender == 'worker3@example.com':
conf.worker_prefetch_multiplier = 0
Provides arguments:
senderNodename of the worker.
instanceThis is the
celery.apps.worker.Workerinstance to be initialized. Note that only theappandhostname(nodename) attributes have been set so far, and the rest of__init__hasn't been executed.confThe configuration of the current app.
optionsOptions passed to the worker from command-line arguments (including defaults).
worker_init¶
在 worker 启动之前调度。
Dispatched before the worker is started.
worker_before_create_process¶
在 prefork 模式下,新子进程创建前,在父进程中调度发送。 可用于清理某些在 fork 时表现不佳的实例。
@signals.worker_before_create_process.connect
def clean_channels(**kwargs):
grpc_singleton.clean_channel()
Dispatched in the parent process, just before new child process is created in the prefork pool. It can be used to clean up instances that don't behave well when forking.
@signals.worker_before_create_process.connect
def clean_channels(**kwargs):
grpc_singleton.clean_channel()
worker_ready¶
当 Worker 准备好接收任务时触发。
Dispatched when the worker is ready to accept work.
heartbeat_sent¶
当 Celery 向 Worker 发送心跳时触发。
Sender 是 celery.worker.heartbeat.Heart 实例。
Dispatched when Celery sends a worker heartbeat.
Sender is the celery.worker.heartbeat.Heart instance.
worker_shutting_down¶
当 Worker 开始关闭流程时触发。
提供的参数有:
sig收到的 POSIX 信号。
how关闭方式,可能为 warm 或 cold。
exitcode主进程退出时将使用的退出码。
Dispatched when the worker begins the shutdown process.
Provides arguments:
sigThe POSIX signal that was received.
howThe shutdown method, warm or cold.
exitcodeThe exitcode that will be used when the main process exits.
worker_process_init¶
在所有进程池的子进程启动时触发。
注意:绑定到该信号的处理函数不得阻塞超过 4 秒,否则该子进程会被视为启动失败并被终止。
Dispatched in all pool child processes when they start.
Note that handlers attached to this signal mustn't be blocking for more than 4 seconds, or the process will be killed assuming it failed to start.
worker_process_shutdown¶
在所有进程池的子进程即将退出时触发。
注意:不保证该信号一定会被发送;就像 finally 代码块一样,无法保证在关闭时一定调用处理函数,且即使调用了,也可能会中断。
提供的参数有:
pid即将关闭的子进程的进程 ID。
exitcode子进程退出时将使用的退出码。
Dispatched in all pool child processes just before they exit.
Note: There's no guarantee that this signal will be dispatched,
similarly to finally blocks it's impossible to guarantee that
handlers will be called at shutdown, and if called it may be
interrupted during.
Provides arguments:
pidThe pid of the child process that's about to shutdown.
exitcodeThe exitcode that'll be used when the child process exits.
worker_shutdown¶
当 Worker 即将关闭时触发。
Dispatched when the worker is about to shut down.
Beat 信号¶
Beat Signals
beat_init¶
当 celery beat 启动时(无论是独立运行还是内嵌运行)触发。
Sender 是 celery.beat.Service 实例。
Dispatched when celery beat starts (either standalone or embedded).
Sender is the celery.beat.Service instance.
beat_embedded_init¶
当 celery beat 以内嵌进程方式启动时,除了 beat_init 信号外还会发送此信号。
Sender 是 celery.beat.Service 实例。
Dispatched in addition to the beat_init signal when celery
beat is started as an embedded process.
Sender is the celery.beat.Service instance.
事件信号¶
Eventlet Signals
eventlet_pool_started¶
当 eventlet 进程池启动时发送。
Sender 是 celery.concurrency.eventlet.TaskPool 实例。
Sent when the eventlet pool has been started.
Sender is the celery.concurrency.eventlet.TaskPool instance.
eventlet_pool_preshutdown¶
当 Worker 关闭,并即将请求 eventlet 池等待剩余工作线程时发送。
Sender 是 celery.concurrency.eventlet.TaskPool 实例。
Sent when the worker shutdown, just before the eventlet pool is requested to wait for remaining workers.
Sender is the celery.concurrency.eventlet.TaskPool instance.
eventlet_pool_postshutdown¶
当池已完成 join 操作,Worker 即将关闭时发送。
Sender 是 celery.concurrency.eventlet.TaskPool 实例。
Sent when the pool has been joined and the worker is ready to shutdown.
Sender is the celery.concurrency.eventlet.TaskPool instance.
eventlet_pool_apply¶
每当一个任务被提交到进程池时发送。
Sender 是 celery.concurrency.eventlet.TaskPool 实例。
提供的参数有:
target目标函数。
args位置参数。
kwargs关键字参数。
Sent whenever a task is applied to the pool.
Sender is the celery.concurrency.eventlet.TaskPool instance.
Provides arguments:
targetThe target function.
argsPositional arguments.
kwargsKeyword arguments.
日志信号¶
Logging Signals
setup_logging¶
如果连接了此信号,Celery 将不会配置日志记录器,因此你可以使用此信号完全覆盖默认的日志配置。
如果你只是希望在 Celery 的日志配置基础上进行增强,可以使用 after_setup_logger 与 after_setup_task_logger 信号。
提供的参数有:
loglevel日志对象的日志级别。
logfile日志文件的文件名。
format日志格式字符串。
colorize指定日志消息是否使用颜色。
Celery won't configure the loggers if this signal is connected, so you can use this to completely override the logging configuration with your own.
If you'd like to augment the logging configuration setup by
Celery then you can use the after_setup_logger and
after_setup_task_logger signals.
Provides arguments:
loglevelThe level of the logging object.
logfileThe name of the logfile.
formatThe log format string.
colorizeSpecify if log messages are colored or not.
after_setup_logger¶
在每个全局日志记录器(不包括任务日志记录器)设置完成后发送。 用于增强日志配置。
提供的参数有:
logger日志记录器对象。
loglevel日志对象的日志级别。
logfile日志文件的文件名。
format日志格式字符串。
colorize指定日志消息是否使用颜色。
Sent after the setup of every global logger (not task loggers). Used to augment logging configuration.
Provides arguments:
loggerThe logger object.
loglevelThe level of the logging object.
logfileThe name of the logfile.
formatThe log format string.
colorizeSpecify if log messages are colored or not.
after_setup_task_logger¶
在每一个任务日志记录器完成设置后发送。 用于增强日志配置。
提供的参数有:
logger日志记录器对象。
loglevel日志对象的日志级别。
logfile日志文件的文件名。
format日志格式字符串。
colorize指定日志消息是否使用颜色。
Sent after the setup of every single task logger. Used to augment logging configuration.
Provides arguments:
loggerThe logger object.
loglevelThe level of the logging object.
logfileThe name of the logfile.
formatThe log format string.
colorizeSpecify if log messages are colored or not.
命令信号¶
Command signals
user_preload_options¶
该信号在任何 Celery 命令行程序完成用户 preload 选项解析后发送。
可用于为 celery 主命令添加额外的命令行参数:
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
'--monitoring', action='store_true',
help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
if options['monitoring']:
enable_monitoring()
Sender 是 Command 实例,其具体值取决于被调用的程序(例如,对于主命令,它将是 CeleryCommand 对象)。
提供的参数有:
app应用实例。
options已解析的用户 preload 选项的映射(包含默认值)。
This signal is sent after any of the Celery command line programs are finished parsing the user preload options.
It can be used to add additional command-line arguments to the celery umbrella command:
from celery import Celery
from celery import signals
from celery.bin.base import Option
app = Celery()
app.user_options['preload'].add(Option(
'--monitoring', action='store_true',
help='Enable our external monitoring utility, blahblah',
))
@signals.user_preload_options.connect
def handle_preload_options(options, **kwargs):
if options['monitoring']:
enable_monitoring()
Sender is the Command instance, and the value depends
on the program that was called (e.g., for the umbrella command it'll be
a CeleryCommand) object).
Provides arguments:
appThe app instance.
optionsMapping of the parsed user preload options (with default values).
弃用信号¶
Deprecated Signals
task_sent¶
此信号已弃用,请改用 after_task_publish。
This signal is deprecated, please use after_task_publish instead.