任务/Tasks¶
Tasks
任务是 Celery 应用的构建模块。
任务是一个类,可以由任何可调用对象创建。它同时扮演双重角色: 既定义了调用任务时发生的行为(发送消息),也定义了 worker 接收到该消息后发生的行为。
每个任务类都有一个唯一的名称,该名称会在消息中被引用, 以便 worker 能够找到要执行的正确函数。
任务消息在被 worker 确认 之前不会从队列中移除。 worker 可以提前预留多个消息,即使 worker 被杀死(如电源故障或其他原因), 消息也会被重新投递给另一个 worker。
理想情况下,任务函数应具备 幂等性:即 即使以相同参数调用多次,也不会产生非预期的副作用。 由于 worker 无法检测你的任务是否幂等,默认行为是在任务执行前 立即确认消息,以避免已经开始执行的任务被再次执行。
如果你的任务是幂等的,可以设置 acks_late
选项,
使 worker 在任务返回 之后 再确认消息。另见 FAQ 条目 我应该使用 retry 还是 acks_late?。
请注意,即使启用了 acks_late
,如果执行任务的子进程终止
(无论是任务调用了 sys.exit()
,还是收到信号),worker 仍然会确认该消息。
此行为是有意为之,原因如下:
我们不希望重新执行那些导致内核发送
SIGSEGV
(段错误)或类似信号的任务。我们假设系统管理员主动杀死任务,是不希望它自动重启。
占用大量内存的任务可能会触发内核的 OOM 杀手(Out-Of-Memory Killer), 同样的情况可能再次发生。
若任务在重新投递时总是失败,可能会造成高频消息循环,进而拖垮系统。
如果你确实希望在上述情形下重新投递任务,可以考虑启用
task_reject_on_worker_lost
配置项。
警告
阻塞任务若无限期运行,最终可能导致 worker 实例无法处理其他任务。
如果你的任务涉及 I/O 操作,务必为这些操作添加超时, 比如使用 https://pypi.org/project/requests/ 库进行 Web 请求时添加超时参数:
connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
时间限制 是确保所有任务及时返回的便捷手段, 但时间限制触发时会强制杀死进程,因此应仅用于那些尚未配置手动超时的场景。
在早期版本中,默认的 prefork 池调度器对长时间运行的任务不太友好,
因此如果你有运行数分钟或数小时的任务,建议在启动 celery worker 时
启用命令行选项 -Ofair
。
不过,从 4.0 版本起,-Ofair 已成为默认调度策略。
参见 预取限制 以获取更多信息,
若需获得最佳性能,建议将长任务与短任务路由到专用的 worker 上
(参见 自动路由)。
如果你的 worker 出现挂起现象,请先调查哪些任务正在运行再提交问题, 因为挂起通常是某些任务在执行网络操作时阻塞所致。
Tasks are the building blocks of Celery applications.
A task is a class that can be created out of any callable. It performs dual roles in that it defines both what happens when a task is called (sends a message), and what happens when a worker receives that message.
Every task class has a unique name, and this name is referenced in messages so the worker can find the right function to execute.
A task message is not removed from the queue until that message has been acknowledged by a worker. A worker can reserve many messages in advance and even if the worker is killed -- by power failure or some other reason -- the message will be redelivered to another worker.
Ideally task functions should be idempotent: meaning the function won't cause unintended effects even if called multiple times with the same arguments. Since the worker cannot detect if your tasks are idempotent, the default behavior is to acknowledge the message in advance, just before it's executed, so that a task invocation that already started is never executed again.
If your task is idempotent you can set the acks_late
option
to have the worker acknowledge the message after the task returns
instead. See also the FAQ entry 我应该使用 retry 还是 acks_late?.
Note that the worker will acknowledge the message if the child process executing
the task is terminated (either by the task calling sys.exit()
, or by signal)
even when acks_late
is enabled. This behavior is intentional
as...
We don't want to rerun tasks that forces the kernel to send a
SIGSEGV
(segmentation fault) or similar signals to the process.We assume that a system administrator deliberately killing the task does not want it to automatically restart.
A task that allocates too much memory is in danger of triggering the kernel OOM killer, the same may happen again.
A task that always fails when redelivered may cause a high-frequency message loop taking down the system.
If you really want a task to be redelivered in these scenarios you should
consider enabling the task_reject_on_worker_lost
setting.
警告
A task that blocks indefinitely may eventually stop the worker instance from doing any other work.
If your task does I/O then make sure you add timeouts to these operations, like adding a timeout to a web request using the https://pypi.org/project/requests/ library:
connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))
Time limits are convenient for making sure all tasks return in a timely manner, but a time limit event will actually kill the process by force so only use them to detect cases where you haven't used manual timeouts yet.
In previous versions, the default prefork pool scheduler was not friendly
to long-running tasks, so if you had tasks that ran for minutes/hours, it
was advised to enable the -Ofair
command-line
argument to the celery worker. However, as of version 4.0,
-Ofair is now the default scheduling strategy. See 预取限制
for more information, and for the best performance route long-running and
short-running tasks to dedicated workers (自动路由).
If your worker hangs then please investigate what tasks are running before submitting an issue, as most likely the hanging is caused by one or more tasks hanging on a network operation.
基础知识¶
Basics
你可以使用 app.task()
装饰器轻松地将任何可调用对象创建为一个任务:
from .models import User
@app.task
def create_user(username, password):
User.objects.create(username=username, password=password)
任务还可以设置多个 选项, 这些选项可以作为参数传递给装饰器:
@app.task(serializer='json')
def create_user(username, password):
User.objects.create(username=username, password=password)
You can easily create a task from any callable by using
the app.task()
decorator:
from .models import User
@app.task
def create_user(username, password):
User.objects.create(username=username, password=password)
There are also many options that can be set for the task, these can be specified as arguments to the decorator:
@app.task(serializer='json')
def create_user(username, password):
User.objects.create(username=username, password=password)
如何导入任务装饰器?¶
How do I import the task decorator?
任务装饰器是在你的 Celery
应用实例上可用的,
如果你还不了解它,请先阅读 使用 Celery 的第一步。
如果你在使用 Django(参见 使用 Django 的第一步),
或者你是一个库的作者,那你很可能会想使用 shared_task()
装饰器:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
The task decorator is available on your Celery
application instance,
if you don't know what this is then please read 使用 Celery 的第一步.
If you're using Django (see 使用 Django 的第一步), or you're the author
of a library then you probably want to use the shared_task()
decorator:
from celery import shared_task
@shared_task
def add(x, y):
return x + y
多个装饰器¶
Multiple decorators
当你将多个装饰器与任务装饰器组合使用时,
必须确保 task
装饰器是最后应用的
(奇怪的是,在 Python 中这意味着它必须出现在列表的最上方):
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
When using multiple decorators in combination with the task decorator you must make sure that the task decorator is applied last (oddly, in Python this means it must be first in the list):
@app.task
@decorator2
@decorator1
def add(x, y):
return x + y
绑定任务¶
Bound tasks
一个任务被绑定(bound)意味着该任务的第一个参数
将始终是任务实例本身( self
),
就像 Python 中绑定方法的行为一样:
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
logger.info(self.request.id)
绑定任务在执行重试(通过 app.Task.retry()
)、
访问当前任务请求信息,以及你为自定义任务基类添加其他功能时是必须的。
A task being bound means the first argument to the task will always
be the task instance (self
), just like Python bound methods:
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
logger.info(self.request.id)
Bound tasks are needed for retries (using app.Task.retry()
),
for accessing information about the current task request, and for any
additional functionality you add to custom task base classes.
任务继承¶
Task inheritance
base
参数用于指定任务所继承的基类:
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask)
def add(x, y):
raise KeyError()
The base
argument to the task decorator specifies the base class of the task:
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@app.task(base=MyTask)
def add(x, y):
raise KeyError()
名称¶
Names
每个任务都必须拥有一个唯一的名称。
如果没有显式指定名称,任务装饰器会自动为你生成一个名称, 该名称基于以下两个因素:1)定义任务的模块名,2)任务函数的名称。
显式指定任务名称的示例:
>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
... return x + y
>>> add.name
'sum-of-two-numbers'
一个推荐的最佳实践是使用模块名作为命名空间, 这样可以避免任务名称在多个模块之间发生冲突:
>>> @app.task(name='tasks.add')
>>> def add(x, y):
... return x + y
你可以通过查看任务的 .name
属性来获取它的名称:
>>> add.name
'tasks.add'
这里指定的名称(tasks.add
)恰好是任务定义在名为
tasks.py
的模块中时会自动生成的名称:
tasks.py
:
@app.task
def add(x, y):
return x + y
>>> from tasks import add
>>> add.name
'tasks.add'
备注
你可以在 worker 中使用 inspect 命令查看所有已注册任务的名称。 参见用户指南中 命令行管理实用程序(inspect/control) 部分的 inspect registered 命令。
Every task must have a unique name.
If no explicit name is provided the task decorator will generate one for you, and this name will be based on 1) the module the task is defined in, and 2) the name of the task function.
Example setting explicit name:
>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
... return x + y
>>> add.name
'sum-of-two-numbers'
A best practice is to use the module name as a name-space, this way names won't collide if there's already a task with that name defined in another module.
>>> @app.task(name='tasks.add')
>>> def add(x, y):
... return x + y
You can tell the name of the task by investigating its .name
attribute:
>>> add.name
'tasks.add'
The name we specified here (tasks.add
) is exactly the name that would've
been automatically generated for us if the task was defined in a module
named tasks.py
:
tasks.py
:
@app.task
def add(x, y):
return x + y
>>> from tasks import add
>>> add.name
'tasks.add'
备注
You can use the inspect command in a worker to view the names of all registered tasks. See the inspect registered command in the 命令行管理实用程序(inspect/control) section of the User Guide.
更改自动命名行为¶
Changing the automatic naming behavior
Added in version 4.0.
在某些情况下,默认的自动命名方式并不适用。 设想你有许多任务分别定义在多个模块中:
project/
/__init__.py
/celery.py
/moduleA/
/__init__.py
/tasks.py
/moduleB/
/__init__.py
/tasks.py
使用默认自动命名时,每个任务会生成如下名称:
moduleA.tasks.taskA、moduleA.tasks.taskB、moduleB.tasks.test,等等。
你可能不希望所有任务名称中都包含 tasks。
如上所述,你可以显式为所有任务指定名称,
或者你也可以通过重写 app.gen_task_name()
来更改自动命名行为。
继续以上示例,celery.py 中可能包含:
from celery import Celery
class MyCelery(Celery):
def gen_task_name(self, name, module):
if module.endswith('.tasks'):
module = module[:-6]
return super().gen_task_name(name, module)
app = MyCelery('main')
这样,每个任务将拥有名称如 moduleA.taskA、moduleA.taskB 和 moduleB.test。
警告
请确保你的 app.gen_task_name()
是纯函数:
即对于相同的输入,必须始终返回相同的输出。
There are some cases when the default automatic naming isn't suitable. Consider having many tasks within many different modules:
project/
/__init__.py
/celery.py
/moduleA/
/__init__.py
/tasks.py
/moduleB/
/__init__.py
/tasks.py
Using the default automatic naming, each task will have a generated name
like moduleA.tasks.taskA, moduleA.tasks.taskB, moduleB.tasks.test,
and so on. You may want to get rid of having tasks in all task names.
As pointed above, you can explicitly give names for all tasks, or you
can change the automatic naming behavior by overriding
app.gen_task_name()
. Continuing with the example, celery.py
may contain:
from celery import Celery
class MyCelery(Celery):
def gen_task_name(self, name, module):
if module.endswith('.tasks'):
module = module[:-6]
return super().gen_task_name(name, module)
app = MyCelery('main')
So each task will have a name like moduleA.taskA, moduleA.taskB and moduleB.test.
警告
Make sure that your app.gen_task_name()
is a pure function: meaning
that for the same input it must always return the same output.
任务请求¶
Task Request
app.Task.request
包含与当前执行的任务相关的信息与状态。
该请求对象定义了以下属性:
- id:
当前执行任务的唯一 ID。
- group:
如果任务属于某个 group,则为该组的唯一 ID。
- chord:
如果任务是某个 chord 的一部分(处于 header 中),则为该 chord 的唯一 ID。
- correlation_id:
用于去重等用途的自定义 ID。
- args:
位置参数。
- kwargs:
关键字参数。
- origin:
发送该任务的主机名。
- retries:
当前任务已重试的次数,整数,初始值为 0。
- is_eager:
若任务是在客户端本地执行而非由 worker 执行,则为
True
。- eta:
任务的原始 ETA(若有)。为 UTC 时间(受
enable_utc
设置影响)。- expires:
任务的原始过期时间(若有)。为 UTC 时间(受
enable_utc
设置影响)。- hostname:
正在执行该任务的 worker 实例的节点名称。
- delivery_info:
其他消息投递信息。为一个映射,包含用于投递该任务的 exchange 和 routing key。 例如
app.Task.retry()
使用该信息将任务重新发送到相同的队列。 该字典中的键是否可用取决于所使用的消息代理。- reply-to:
回复应发送到的队列名称(例如用于 RPC 结果后端)。
- called_directly:
如果任务不是由 worker 执行的,此标志将被设置为 True。
- timelimit:
当前任务的
(soft, hard)
时间限制(如果有)。- callbacks:
任务成功返回时要调用的签名(signature)列表。
- errbacks:
任务失败时要调用的签名列表。
- utc:
若调用者启用了 UTC,则为 True(参见
enable_utc
)。
Added in version 3.1.
- headers:
附带此任务消息发送的消息头映射(可能为
None
)。- reply_to:
要将回复发送到的位置(队列名称)。
- correlation_id:
通常与任务 ID 相同,在 amqp 中常用于追踪回复对应的请求。
Added in version 4.0.
- root_id:
当前任务所属工作流中的第一个任务的唯一 ID(如果存在)。
- parent_id:
调用当前任务的父任务的唯一 ID(如果存在)。
- chain:
构成任务链的任务列表(反转顺序,如果存在)。 该列表中的最后一个元素是当前任务完成后要接着执行的任务。 如果使用的是任务协议的第一版,链式任务将存在于
request.callbacks
中。
Added in version 5.2.
- properties:
接收该任务消息时附带的消息属性映射(可能为
None
或{}
)- replaced_task_nesting:
当前任务被替换的嵌套层级数(如果有,可能为
0
)
app.Task.request
contains information and state
related to the currently executing task.
The request defines the following attributes:
- id:
The unique id of the executing task.
- group:
The unique id of the task's group, if this task is a member.
- chord:
The unique id of the chord this task belongs to (if the task is part of the header).
- correlation_id:
Custom ID used for things like de-duplication.
- args:
Positional arguments.
- kwargs:
Keyword arguments.
- origin:
Name of host that sent this task.
- retries:
How many times the current task has been retried. An integer starting at 0.
- is_eager:
Set to
True
if the task is executed locally in the client, not by a worker.- eta:
The original ETA of the task (if any). This is in UTC time (depending on the
enable_utc
setting).- expires:
The original expiry time of the task (if any). This is in UTC time (depending on the
enable_utc
setting).- hostname:
Node name of the worker instance executing the task.
- delivery_info:
Additional message delivery information. This is a mapping containing the exchange and routing key used to deliver this task. Used by for example
app.Task.retry()
to resend the task to the same destination queue. Availability of keys in this dict depends on the message broker used.- reply-to:
Name of queue to send replies back to (used with RPC result backend for example).
- called_directly:
This flag is set to true if the task wasn't executed by the worker.
- timelimit:
A tuple of the current
(soft, hard)
time limits active for this task (if any).- callbacks:
A list of signatures to be called if this task returns successfully.
- errbacks:
A list of signatures to be called if this task fails.
- utc:
Set to true the caller has UTC enabled (
enable_utc
).
Added in version 3.1.
- headers:
Mapping of message headers sent with this task message (may be
None
).- reply_to:
Where to send reply to (queue name).
- correlation_id:
Usually the same as the task id, often used in amqp to keep track of what a reply is for.
Added in version 4.0.
- root_id:
The unique id of the first task in the workflow this task is part of (if any).
- parent_id:
The unique id of the task that called this task (if any).
- chain:
Reversed list of tasks that form a chain (if any). The last item in this list will be the next task to succeed the current task. If using version one of the task protocol the chain tasks will be in
request.callbacks
instead.
Added in version 5.2.
- properties:
Mapping of message properties received with this task message (may be
None
or{}
)- replaced_task_nesting:
How many times the task was replaced, if at all. (may be
0
)
示例¶
Example
一个访问上下文信息的任务示例如下:
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
self.request))
bind 参数表示该函数将成为一个“绑定方法”,因此你可以访问任务类型实例上的属性和方法。
An example task accessing information in the context is:
@app.task(bind=True)
def dump_context(self, x, y):
print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
self.request))
The bind
argument means that the function will be a "bound method" so
that you can access attributes and methods on the task type instance.
日志记录¶
Logging
Worker 会自动为你配置日志记录,当然你也可以手动配置。
Celery 提供了一个名为 "celery.task" 的特殊日志记录器,你可以继承该日志记录器来自动在日志中包含任务名和唯一 ID。
最佳实践是在模块顶部为所有任务创建一个通用日志记录器:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
Celery 使用的是标准的 Python 日志库,相关文档可参见 这里
。
你也可以使用 print()
,因为写入标准输出/错误的内容会被重定向到日志系统(你可以禁用此行为,参见 worker_redirect_stdouts
)。
备注
如果你在任务或任务模块中创建日志记录器实例,worker 不会更新输出重定向设置。
如果你希望将 sys.stdout
和 sys.stderr
重定向到自定义日志记录器,你需要手动启用,例如:
import sys
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
old_outs = sys.stdout, sys.stderr
rlevel = self.app.conf.worker_redirect_stdouts_level
try:
self.app.log.redirect_stdouts_to_logger(logger, rlevel)
print('Adding {0} + {1}'.format(x, y))
return x + y
finally:
sys.stdout, sys.stderr = old_outs
备注
如果你需要的某个 Celery 日志记录器没有输出日志,应该检查该记录器是否正确传播(propagate)。以下示例启用了 "celery.app.trace",以便输出 "succeeded in" 之类的日志:
import celery
import logging
@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
logger = logging.getLogger('celery')
logger.propagate = True
logger = logging.getLogger('celery.app.trace')
logger.propagate = True
备注
如果你希望完全禁用 Celery 的日志配置,可以使用 setup_logging
信号:
import celery
@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
pass
The worker will automatically set up logging for you, or you can configure logging manually.
A special logger is available named "celery.task", you can inherit from this logger to automatically get the task name and unique id as part of the logs.
The best practice is to create a common logger for all of your tasks at the top of your module:
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task
def add(x, y):
logger.info('Adding {0} + {1}'.format(x, y))
return x + y
Celery uses the standard Python logger library,
and the documentation can be found here
.
You can also use print()
, as anything written to standard
out/-err will be redirected to the logging system (you can disable this,
see worker_redirect_stdouts
).
备注
The worker won't update the redirection if you create a logger instance somewhere in your task or task module.
If you want to redirect sys.stdout
and sys.stderr
to a custom
logger you have to enable this manually, for example:
import sys
logger = get_task_logger(__name__)
@app.task(bind=True)
def add(self, x, y):
old_outs = sys.stdout, sys.stderr
rlevel = self.app.conf.worker_redirect_stdouts_level
try:
self.app.log.redirect_stdouts_to_logger(logger, rlevel)
print('Adding {0} + {1}'.format(x, y))
return x + y
finally:
sys.stdout, sys.stderr = old_outs
备注
If a specific Celery logger you need is not emitting logs, you should check that the logger is propagating properly. In this example "celery.app.trace" is enabled so that "succeeded in" logs are emitted:
import celery
import logging
@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
logger = logging.getLogger('celery')
logger.propagate = True
logger = logging.getLogger('celery.app.trace')
logger.propagate = True
备注
If you want to completely disable Celery logging configuration,
use the setup_logging
signal:
import celery
@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
pass
参数检查¶
Argument checking
Added in version 4.0.
Celery 在你调用任务时会验证传入的参数,就像 Python 调用普通函数一样:
>>> @app.task
... def add(x, y):
... return x + y
# 使用两个参数调用任务是可行的:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
# 仅使用一个参数调用任务则会报错:
>>> add.delay(8)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 376, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 485, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)
你可以通过将任务的 typing
属性设置为 False
来禁用参数检查:
>>> @app.task(typing=False)
... def add(x, y):
... return x + y
# 在本地调用有效,但当 worker 接收到任务时会报错。
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
Added in version 4.0.
Celery will verify the arguments passed when you call the task, just like Python does when calling a normal function:
>>> @app.task
... def add(x, y):
... return x + y
# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "celery/app/task.py", line 376, in delay
return self.apply_async(args, kwargs)
File "celery/app/task.py", line 485, in apply_async
check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)
You can disable the argument checking for any task by setting its
typing
attribute to False
:
>>> @app.task(typing=False)
... def add(x, y):
... return x + y
# Works locally, but the worker receiving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>
在参数中隐藏敏感信息¶
Hiding sensitive information in arguments
Added in version 4.0.
当使用 task_protocol
为 2 或更高版本时(自 4.0 起默认),你可以通过 argsrepr 和 kwargsrepr 调用参数来自定义在日志和监控事件中位置参数和关键字参数的显示方式:
>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
>>> charge.s(account, card='1234 5678 1234 5678').set(
... kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()
警告
如果有人能够从消息代理(broker)读取任务消息,或以其他方式拦截消息,那么敏感信息仍然是可访问的。
因此,如果你的消息中包含敏感信息,建议对其进行加密;以信用卡号为例,实际号码应加密存储在安全存储中,并在任务中检索并解密。
Added in version 4.0.
When using task_protocol
2 or higher (default since 4.0), you can
override how positional arguments and keyword arguments are represented in logs
and monitoring events using the argsrepr
and kwargsrepr
calling
arguments:
>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')
>>> charge.s(account, card='1234 5678 1234 5678').set(
... kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()
警告
Sensitive information will still be accessible to anyone able to read your task message from the broker, or otherwise able intercept it.
For this reason you should probably encrypt your message if it contains sensitive information, or in this example with a credit card number the actual number could be stored encrypted in a secure store that you retrieve and decrypt in the task itself.
重试¶
Retrying
可以使用 app.Task.retry()
方法重新执行任务,例如在遇到可恢复的错误时。
调用 retry 时会发送一条新消息,使用相同的 task-id,并确保该消息被投递到与原始任务相同的队列中。
任务重试也会被记录为一种任务状态,因此你可以通过结果实例跟踪任务进度(参见 状态)。
下面是一个使用 retry 的示例:
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
备注
app.Task.retry()
的调用会抛出一个异常,因此其后的代码不会被执行。该异常为 Retry
,它不会被视为错误,而是一个“半谓词”,表示该任务应被重试,以便在启用了结果后端时记录正确的状态。
这是正常行为,除非将 throw
参数设置为 False
,否则总会发生此行为。
task 装饰器中的 bind 参数会使任务获得对 self (任务类型实例)的访问权限。
exc 参数用于传递异常信息,该信息会被用于日志记录和存储任务结果。 异常及其回溯信息将在任务状态中可用(如果启用了结果后端)。
如果任务定义了 max_retries 值,在超过最大重试次数时会重新抛出当前异常,但在以下两种情况下不会抛出原始异常:
未提供 exc 参数:
在这种情况下,将抛出
MaxRetriesExceededError
异常。当前没有异常可用:
如果没有原始异常可以重新抛出,则会使用 exc 参数作为替代,例如:
self.retry(exc=Twitter.LoginError())
将会抛出提供的 exc 参数所指定的异常。
app.Task.retry()
can be used to re-execute the task,
for example in the event of recoverable errors.
When you call retry
it'll send a new message, using the same
task-id, and it'll take care to make sure the message is delivered
to the same queue as the originating task.
When a task is retried this is also recorded as a task state, so that you can track the progress of the task using the result instance (see 状态).
Here's an example using retry
:
@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
try:
twitter = Twitter(oauth)
twitter.update_status(tweet)
except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
raise self.retry(exc=exc)
备注
The app.Task.retry()
call will raise an exception so any
code after the retry won't be reached. This is the Retry
exception, it isn't handled as an error but rather as a semi-predicate
to signify to the worker that the task is to be retried,
so that it can store the correct state when a result backend is enabled.
This is normal operation and always happens unless the
throw
argument to retry is set to False
.
The bind argument to the task decorator will give access to self
(the
task type instance).
The exc
argument is used to pass exception information that's
used in logs, and when storing task results.
Both the exception and the traceback will
be available in the task state (if a result backend is enabled).
If the task has a max_retries
value the current exception
will be re-raised if the max number of retries has been exceeded,
but this won't happen if:
An
exc
argument wasn't given.In this case the
MaxRetriesExceededError
exception will be raised.There's no current exception
If there's no original exception to re-raise the
exc
argument will be used instead, so:self.retry(exc=Twitter.LoginError())
will raise the
exc
argument given.
使用自定义重试延迟¶
Using a custom retry delay
当任务被重试时,可以在重试前等待一段时间,默认的延迟时间由 default_retry_delay
属性定义。默认值为 3 分钟。请注意,该值的单位为秒(int 或 float)。
你也可以通过在调用 retry()
时传入 countdown 参数来覆盖默认值。
@app.task(bind=True, default_retry_delay=30 * 60) # 30 分钟后重试。
def add(self, x, y):
try:
something_raising()
except Exception as exc:
# 覆盖默认延迟时间,设置为 1 分钟后重试
raise self.retry(exc=exc, countdown=60)
When a task is to be retried, it can wait for a given amount of time
before doing so, and the default delay is defined by the
default_retry_delay
attribute. By default this is set to 3 minutes. Note that the
unit for setting the delay is in seconds (int or float).
You can also provide the countdown argument to retry()
to
override this default.
@app.task(bind=True, default_retry_delay=30 * 60) # retry in 30 minutes.
def add(self, x, y):
try:
something_raising()
except Exception as exc:
# overrides the default delay to retry after 1 minute
raise self.retry(exc=exc, countdown=60)
已知异常的自动重试¶
Automatic retry for known exceptions
有时候你可能希望在遇到某个特定异常时自动重试任务。
幸运的是,你可以通过 app.task()
装饰器中的 autoretry_for 参数告诉 Celery 自动重试任务:
from twitter.exceptions import FailWhaleError
@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
return twitter.refresh_timeline(user)
如果你希望为内部的 retry()
调用指定自定义参数,可以通过 app.task()
装饰器传递 retry_kwargs 参数:
@app.task(autoretry_for=(FailWhaleError,),
retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
return twitter.refresh_timeline(user)
这是一种替代手动处理异常的方法。上述示例等价于将任务主体包裹在 try
... except
语句中:
@app.task
def refresh_timeline(user):
try:
twitter.refresh_timeline(user)
except FailWhaleError as exc:
raise refresh_timeline.retry(exc=exc, max_retries=5)
如果你希望在任意错误发生时都自动重试,可以这样写:
@app.task(autoretry_for=(Exception,))
def x():
...
Added in version 4.2.
如果你的任务依赖其他服务,例如向某个 API 发送请求,那么使用 指数退避机制 是个不错的选择,可以避免请求过多压垮服务。幸运的是,Celery 的自动重试机制对此提供了良好支持。只需指定 retry_backoff
参数即可,例如:
from requests.exceptions import RequestException
@app.task(autoretry_for=(RequestException,), retry_backoff=True)
def x():
...
默认情况下,该指数退避机制还会引入随机抖动(jitter),以避免所有任务同时运行。最大的退避延迟默认设置为 10 分钟。你可以根据下方文档说明对这些行为进行自定义。
Added in version 4.4.
你也可以在基于类的任务中设置 autoretry_for、 max_retries、 retry_backoff、 retry_backoff_max 和 retry_jitter 等选项:
class BaseTaskWithRetry(Task):
autoretry_for = (TypeError,)
max_retries = 5
retry_backoff = True
retry_backoff_max = 700
retry_jitter = False
- Task.autoretry_for¶
异常类组成的列表或元组。如果任务在执行期间抛出了这些异常之一,则会自动重试。 默认情况下,不会对任何异常进行自动重试。
- Task.max_retries¶
一个整数。表示放弃前的最大重试次数。设置为
None
表示无限重试。 默认值为3
。
- Task.retry_backoff¶
布尔值或整数。如果设置为
True
,自动重试将遵循 指数退避机制 的规则。 第一次重试延迟 1 秒,第二次延迟 2 秒,第三次延迟 4 秒,第四次延迟 8 秒,以此类推。 (但该延迟值可能会受到retry_jitter
启用时的影响。) 如果设置为一个整数,则该值将作为延迟因子使用。例如,若设为3
,第一次重试延迟 3 秒,第二次为 6 秒,第三次为 12 秒,第四次为 24 秒,依此类推。 默认值为False
,即不启用重试延迟。
- Task.retry_backoff_max¶
一个整数。如果启用了
retry_backoff
,则该选项用于设置任务自动重试之间的最大延迟时间(单位:秒)。 默认值为600
(即 10 分钟)。
- Task.retry_jitter¶
布尔值。抖动(jitter) 用于在指数退避延迟中引入随机性,以避免队列中的所有任务同时被调度执行。 如果设置为
True
,则由retry_backoff
计算出的延迟值将被视为最大值,实际延迟时间将在 0 到该最大值之间随机选取。 默认值为True
。
Added in version 5.3.0.
- Task.dont_autoretry_for¶
异常类组成的列表或元组。指定的异常将不会触发自动重试。 这允许你从
autoretry_for
中排除某些不希望自动重试的异常。
Added in version 4.0.
Sometimes you just want to retry a task whenever a particular exception is raised.
Fortunately, you can tell Celery to automatically retry a task using
autoretry_for argument in the app.task()
decorator:
from twitter.exceptions import FailWhaleError
@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
return twitter.refresh_timeline(user)
If you want to specify custom arguments for an internal retry()
call, pass retry_kwargs argument to app.task()
decorator:
@app.task(autoretry_for=(FailWhaleError,),
retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
return twitter.refresh_timeline(user)
This is provided as an alternative to manually handling the exceptions,
and the example above will do the same as wrapping the task body
in a try
... except
statement:
@app.task
def refresh_timeline(user):
try:
twitter.refresh_timeline(user)
except FailWhaleError as exc:
raise refresh_timeline.retry(exc=exc, max_retries=5)
If you want to automatically retry on any error, simply use:
@app.task(autoretry_for=(Exception,))
def x():
...
Added in version 4.2.
If your tasks depend on another service, like making a request to an API,
then it's a good idea to use exponential backoff to avoid overwhelming the
service with your requests. Fortunately, Celery's automatic retry support
makes it easy. Just specify the retry_backoff
argument, like this:
from requests.exceptions import RequestException
@app.task(autoretry_for=(RequestException,), retry_backoff=True)
def x():
...
By default, this exponential backoff will also introduce random jitter to avoid having all the tasks run at the same moment. It will also cap the maximum backoff delay to 10 minutes. All these settings can be customized via options documented below.
Added in version 4.4.
You can also set autoretry_for, max_retries, retry_backoff, retry_backoff_max and retry_jitter options in class-based tasks:
class BaseTaskWithRetry(Task):
autoretry_for = (TypeError,)
max_retries = 5
retry_backoff = True
retry_backoff_max = 700
retry_jitter = False
- Task.autoretry_for
A list/tuple of exception classes. If any of these exceptions are raised during the execution of the task, the task will automatically be retried. By default, no exceptions will be autoretried.
- Task.max_retries
A number. Maximum number of retries before giving up. A value of
None
means task will retry forever. By default, this option is set to3
.
- Task.retry_backoff
A boolean, or a number. If this option is set to
True
, autoretries will be delayed following the rules of exponential backoff. The first retry will have a delay of 1 second, the second retry will have a delay of 2 seconds, the third will delay 4 seconds, the fourth will delay 8 seconds, and so on. (However, this delay value is modified byretry_jitter
, if it is enabled.) If this option is set to a number, it is used as a delay factor. For example, if this option is set to3
, the first retry will delay 3 seconds, the second will delay 6 seconds, the third will delay 12 seconds, the fourth will delay 24 seconds, and so on. By default, this option is set toFalse
, and autoretries will not be delayed.
- Task.retry_backoff_max
A number. If
retry_backoff
is enabled, this option will set a maximum delay in seconds between task autoretries. By default, this option is set to600
, which is 10 minutes.
- Task.retry_jitter
A boolean. Jitter is used to introduce randomness into exponential backoff delays, to prevent all tasks in the queue from being executed simultaneously. If this option is set to
True
, the delay value calculated byretry_backoff
is treated as a maximum, and the actual delay value will be a random number between zero and that maximum. By default, this option is set toTrue
.
Added in version 5.3.0.
- Task.dont_autoretry_for
A list/tuple of exception classes. These exceptions won't be autoretried. This allows to exclude some exceptions that match
autoretry_for
but for which you don't want a retry.
使用 Pydantic 进行参数验证¶
Argument validation with Pydantic
Added in version 5.5.0.
你可以通过传入 pydantic=True
来使用 Pydantic 对参数进行校验与转换,
以及基于类型提示对返回结果进行序列化。
备注
参数校验仅涵盖任务端的参数和返回值。你在使用 delay()
或 apply_async()
调用任务时仍需自行序列化参数。
例如:
from pydantic import BaseModel
class ArgModel(BaseModel):
value: int
class ReturnModel(BaseModel):
value: str
@app.task(pydantic=True)
def x(arg: ArgModel) -> ReturnModel:
# 使用 Pydantic 模型进行类型提示的参数会被自动转换
assert isinstance(arg, ArgModel)
# 返回的模型会被自动转换为字典
return ReturnModel(value=f"example: {arg.value}")
随后,你可以使用一个符合模型的字典调用该任务,并获得经 BaseModel.model_dump()
序列化后的返回结果:
>>> result = x.delay({'value': 1})
>>> result.get(timeout=1)
{'value': 'example: 1'}
Added in version 5.5.0.
You can use Pydantic to validate and convert arguments as well as serializing
results based on typehints by passing pydantic=True
.
备注
Argument validation only covers arguments/return values on the task side. You still have
serialize arguments yourself when invoking a task with delay()
or apply_async()
.
For example:
from pydantic import BaseModel
class ArgModel(BaseModel):
value: int
class ReturnModel(BaseModel):
value: str
@app.task(pydantic=True)
def x(arg: ArgModel) -> ReturnModel:
# args/kwargs type hinted as Pydantic model will be converted
assert isinstance(arg, ArgModel)
# The returned model will be converted to a dict automatically
return ReturnModel(value=f"example: {arg.value}")
The task can then be called using a dict matching the model, and you'll receive
the returned model "dumped" (serialized using BaseModel.model_dump()
):
>>> result = x.delay({'value': 1})
>>> result.get(timeout=1)
{'value': 'example: 1'}
联合类型、泛型参数¶
Union types, arguments to generics
联合类型(例如 Union[SomeModel, OtherModel]
)或用于泛型的参数类型(例如
list[SomeModel]
) 不 受支持。
如果你希望支持列表或类似类型,建议使用 pydantic.RootModel
。
Union types (e.g. Union[SomeModel, OtherModel]
) or arguments to generics (e.g.
list[SomeModel]
) are not supported.
In case you want to support a list or similar types, it is recommended to use
pydantic.RootModel
.
可选参数/返回值¶
Optional parameters/return values
可选参数和可选返回值也会被正确处理。例如,给定如下任务:
from typing import Optional
# 模型与上方相同
@app.task(pydantic=True)
def x(arg: Optional[ArgModel] = None) -> Optional[ReturnModel]:
if arg is None:
return None
return ReturnModel(value=f"example: {arg.value}")
你将会获得如下行为:
>>> result = x.delay()
>>> result.get(timeout=1) is None
True
>>> result = x.delay({'value': 1})
>>> result.get(timeout=1)
{'value': 'example: 1'}
Optional parameters or return values are also handled properly. For example, given this task:
from typing import Optional
# models are the same as above
@app.task(pydantic=True)
def x(arg: Optional[ArgModel] = None) -> Optional[ReturnModel]:
if arg is None:
return None
return ReturnModel(value=f"example: {arg.value}")
You'll get the following behavior:
>>> result = x.delay()
>>> result.get(timeout=1) is None
True
>>> result = x.delay({'value': 1})
>>> result.get(timeout=1)
{'value': 'example: 1'}
返回值处理¶
Return value handling
只有当返回的模型实例与注解完全匹配时,返回值才会被序列化。如果你返回的是一个不同类型的模型实例,它将 不会 被序列化。 mypy
应该能够捕捉到这类错误,因此你应当据此修正类型提示。
Return values will only be serialized if the returned model matches the annotation. If you pass a
model instance of a different type, it will not be serialized. mypy
should already catch such
errors and you should fix your typehints then.
Pydantic 参数¶
Pydantic parameters
还有一些额外的选项可以影响 Pydantic 的行为:
- Task.pydantic_strict¶
默认情况下, strict mode 是关闭的。你可以传入
True
以启用严格的模型校验。
- Task.pydantic_context¶
在 Pydantic 模型校验过程中传入 额外的校验上下文。 默认情况下,上下文中已经包含了应用对象(
celery_app
)和任务名称(celery_task_name
)。
- Task.pydantic_dump_kwargs¶
在序列化结果时,将这些额外参数传递给
dump_kwargs()
。默认情况下,仅传入mode='json'
。
There are a few more options influencing Pydantic behavior:
- Task.pydantic_strict
By default, strict mode is disabled. You can pass
True
to enable strict model validation.
- Task.pydantic_context
Pass additional validation context during Pydantic model validation. The context already includes the application object as
celery_app
and the task name ascelery_task_name
by default.
- Task.pydantic_dump_kwargs
When serializing a result, pass these additional arguments to
dump_kwargs()
. By default, onlymode='json'
is passed.
选项列表¶
List of Options
任务装饰器可以接受多个选项,用以改变任务的行为。例如,你可以使用 rate_limit
选项来设置任务的速率限制。
传递给任务装饰器的任意关键字参数,实际上都会被设为最终任务类的属性,以下是内置属性的列表。
The task decorator can take a number of options that change the way
the task behaves, for example you can set the rate limit for a task
using the rate_limit
option.
Any keyword argument passed to the task decorator will actually be set as an attribute of the resulting task class, and this is a list of the built-in attributes.
常规¶
General
- Task.max_retries¶
仅在任务调用
self.retry
或装饰器使用了 autoretry_for 参数时适用。表示在放弃之前最多允许重试的次数。 如果重试次数超过此值,则会引发
MaxRetriesExceededError
异常。备注
必须手动调用
retry()
, 它不会在异常发生时自动重试。默认值为
3
。 若设置为None
,则表示禁用重试限制,任务将无限重试直到成功。
- Task.throws¶
一个可选的异常类型元组,表示这些错误类型不应视为真正的错误。
属于该列表中的错误仍会被报告为任务失败(发送到结果后端), 但 worker 不会将其记录为错误,也不会包含 traceback。
示例:
@task(throws=(KeyError, HttpNotFound)): def get_foo(): something()
错误类型行为如下:
预期错误(出现在
Task.throws
中)使用
INFO
等级记录日志,不包含 traceback。未预期错误
使用
ERROR
等级记录日志,并包含 traceback。
- Task.rate_limit¶
设置该任务类型的速率限制(即在特定时间段内允许运行的任务数量)。 即使启用了速率限制,任务仍然会完成,只是可能会被延迟启动。
如果设置为
None
,则不启用速率限制。 如果设置为整数或浮点数,则被解释为「每秒任务数」。可通过附加 "/s"、"/m" 或 "/h" 来指定速率单位为秒、分钟或小时。 任务将在指定时间范围内均匀分布。
示例: "100/m" (每分钟最多 100 个任务)。这将强制两个任务之间至少间隔 600 毫秒。
默认值取自
task_default_rate_limit
设置项: 若未设置,则默认不启用任务速率限制。注意,该限制是 每个 worker 实例 的限制,而非全局限制。 若需实施全局速率限制(例如 API 请求的最大频率),应限制任务到特定队列中。
- Task.time_limit¶
该任务的强制时间限制(以秒为单位)。 若未设置,则使用 worker 的默认值。
- Task.soft_time_limit¶
该任务的软性时间限制。 若未设置,则使用 worker 的默认值。
- Task.ignore_result¶
不存储任务状态。请注意,这意味着你无法使用
AsyncResult
来检查任务是否完成, 或获取其返回值。注意:禁用任务结果存储会影响某些功能的使用。 更多详情请查阅 Canvas 文档。
- Task.store_errors_even_if_ignored¶
若设置为
True
,即使任务配置为忽略结果,错误也会被记录。
- Task.serializer¶
指定默认使用的序列化方法(字符串形式)。默认为
task_serializer
设置项的值。 可选值包括 pickle、json、yaml,或使用kombu.serialization.registry
注册的自定义序列化方法。更多信息请参见 序列化器。
- Task.compression¶
指定默认使用的压缩方案(字符串形式)。
默认为
task_compression
设置项的值。 可选值包括 gzip、bzip2,或使用kombu.compression
注册的自定义压缩方法。更多信息请参见 压缩。
- Task.backend¶
指定该任务使用的结果存储后端。应为 celery.backends 中某个后端类的实例。 默认使用 app.backend,由
result_backend
设置项定义。
- Task.acks_late¶
若设置为
True
,该任务的消息将在任务执行**之后**被确认; 而默认行为是在任务开始执行前确认消息。注意:这意味着如果 worker 在执行中崩溃,任务可能会被多次执行。 因此确保你的任务是 idempotent 的至关重要。
全局默认值可通过
task_acks_late
设置项覆盖。
- Task.track_started¶
若设置为
True
,任务在被 worker 执行时会报告其状态为 "started"。 默认值为False
,即任务状态仅包括 pending、finished 或 waiting-to-retry。 对于长时间运行的任务,该状态有助于报告当前运行的任务。执行该任务的 worker 的主机名与进程 ID 会包含在状态元数据中(如:result.info['pid'])。
全局默认值可通过
task_track_started
设置项覆盖。
参见
Task
的 API 参考文档。
- Task.name
The name the task is registered as.
You can set this name manually, or a name will be automatically generated using the module and class name.
See also 名称.
- Task.request
If the task is being executed this will contain information about the current request. Thread local storage is used.
See 任务请求.
- Task.max_retries
Only applies if the task calls
self.retry
or if the task is decorated with the autoretry_for argument.The maximum number of attempted retries before giving up. If the number of retries exceeds this value a
MaxRetriesExceededError
exception will be raised.备注
You have to call
retry()
manually, as it won't automatically retry on exception..The default is
3
. A value ofNone
will disable the retry limit and the task will retry forever until it succeeds.
- Task.throws
Optional tuple of expected error classes that shouldn't be regarded as an actual error.
Errors in this list will be reported as a failure to the result backend, but the worker won't log the event as an error, and no traceback will be included.
Example:
@task(throws=(KeyError, HttpNotFound)): def get_foo(): something()
Error types:
Expected errors (in
Task.throws
)Logged with severity
INFO
, traceback excluded.Unexpected errors
Logged with severity
ERROR
, with traceback included.
- Task.default_retry_delay
- Task.rate_limit
Set the rate limit for this task type (limits the number of tasks that can be run in a given time frame). Tasks will still complete when a rate limit is in effect, but it may take some time before it's allowed to start.
If this is
None
no rate limit is in effect. If it is an integer or float, it is interpreted as "tasks per second".The rate limits can be specified in seconds, minutes or hours by appending "/s", "/m" or "/h" to the value. Tasks will be evenly distributed over the specified time frame.
Example: "100/m" (hundred tasks a minute). This will enforce a minimum delay of 600ms between starting two tasks on the same worker instance.
Default is the
task_default_rate_limit
setting: if not specified means rate limiting for tasks is disabled by default.Note that this is a per worker instance rate limit, and not a global rate limit. To enforce a global rate limit (e.g., for an API with a maximum number of requests per second), you must restrict to a given queue.
- Task.time_limit
The hard time limit, in seconds, for this task. When not set the workers default is used.
- Task.soft_time_limit
The soft time limit for this task. When not set the workers default is used.
- Task.ignore_result
Don't store task state. Note that this means you can't use
AsyncResult
to check if the task is ready, or get its return value.Note: Certain features will not work if task results are disabled. For more details check the Canvas documentation.
- Task.store_errors_even_if_ignored
If
True
, errors will be stored even if the task is configured to ignore results.
- Task.serializer
A string identifying the default serialization method to use. Defaults to the
task_serializer
setting. Can be pickle, json, yaml, or any custom serialization methods that have been registered withkombu.serialization.registry
.Please see 序列化器 for more information.
- Task.compression
A string identifying the default compression scheme to use.
Defaults to the
task_compression
setting. Can be gzip, or bzip2, or any custom compression schemes that have been registered with thekombu.compression
registry.Please see 压缩 for more information.
- Task.backend
The result store backend to use for this task. An instance of one of the backend classes in celery.backends. Defaults to app.backend, defined by the
result_backend
setting.
- Task.acks_late
If set to
True
messages for this task will be acknowledged after the task has been executed, not just before (the default behavior).Note: This means the task may be executed multiple times should the worker crash in the middle of execution. Make sure your tasks are idempotent.
The global default can be overridden by the
task_acks_late
setting.
- Task.track_started
If
True
the task will report its status as "started" when the task is executed by a worker. The default value isFalse
as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried. Having a "started" status can be useful for when there are long running tasks and there's a need to report what task is currently running.The host name and process id of the worker executing the task will be available in the state meta-data (e.g., result.info['pid'])
The global default can be overridden by the
task_track_started
setting.
参见
The API reference for Task
.
状态¶
States
Celery 可以跟踪任务的当前状态。状态信息中还包含了 成功任务的返回结果,或者失败任务的异常与回溯(traceback)信息。
Celery 提供了多种 结果后端 (result backend)供选择, 每种后端都有其各自的优缺点(参见 结果后端)。
在任务的生命周期中,它将经历多个可能的状态变迁,
且每个状态都可以附加任意的元数据(meta-data)。
当任务进入一个新的状态时,之前的状态将被遗忘,
但某些状态变迁可以被推断出来(例如,
如果一个任务当前处于 FAILED
状态,
可以推断出它之前曾处于 STARTED
状态)。
同时,Celery 还定义了一些状态集合,
比如 FAILURE_STATES
集合和 READY_STATES
集合。
客户端可以通过判断任务状态是否属于这些集合来决定后续行为,
例如,是否需要重新抛出异常(如果任务状态属于 PROPAGATE_STATES
),
或者是否可以缓存该任务状态(如果任务已准备就绪,则可以缓存)。
此外,你也可以定义 自定义状态 来扩展状态管理。
Celery can keep track of the tasks current state. The state also contains the result of a successful task, or the exception and traceback information of a failed task.
There are several result backends to choose from, and they all have different strengths and weaknesses (see 结果后端).
During its lifetime a task will transition through several possible states,
and each state may have arbitrary meta-data attached to it. When a task
moves into a new state the previous state is
forgotten about, but some transitions can be deduced, (e.g., a task now
in the FAILED
state, is implied to have been in the
STARTED
state at some point).
There are also sets of states, like the set of
FAILURE_STATES
, and the set of READY_STATES
.
The client uses the membership of these sets to decide whether
the exception should be re-raised (PROPAGATE_STATES
), or whether
the state can be cached (it can if the task is ready).
You can also define 自定义状态.
结果后端¶
Result Backends
如果你希望跟踪任务或需要获取返回值,那么 Celery 必须将状态存储或发送到某个地方,以便之后能够检索。
Celery 提供了多种内置的结果后端供选择:SQLAlchemy/Django ORM、Memcached、RabbitMQ/QPid(rpc
)和 Redis —— 当然你也可以自定义自己的后端。
没有任何一个后端能在所有使用场景中都表现良好。 你应当了解每种后端的优缺点,并根据自己的需求选择最合适的方案。
警告
后端在存储和传输结果时会占用资源。为了确保资源能够及时释放,
你必须在调用任务之后,对每一个返回的 AsyncResult
实例,
最终调用一次 get()
或 forget()
方法。
参见
If you want to keep track of tasks or need the return values, then Celery
must store or send the states somewhere so that they can be retrieved later.
There are several built-in result backends to choose from: SQLAlchemy/Django ORM,
Memcached, RabbitMQ/QPid (rpc
), and Redis -- or you can define your own.
No backend works well for every use case. You should read about the strengths and weaknesses of each backend, and choose the most appropriate for your needs.
警告
Backends use resources to store and transmit results. To ensure
that resources are released, you must eventually call
get()
or forget()
on
EVERY AsyncResult
instance returned after calling
a task.
参见
RPC 结果后端 (RabbitMQ/QPid)¶
RPC Result Backend (RabbitMQ/QPid)
RPC 结果后端( rpc://
)比较特殊,因为它实际上并不 存储 状态,
而是将其作为消息发送出去。这一点非常重要,因为它意味着:
一个结果 只能被检索一次。
只能由发起该任务的客户端 进行检索。
两个不同的进程无法同时等待同一个结果。
尽管有这些限制,如果你需要实时接收状态变更信息,RPC 后端依然是一个非常优秀的选择。 使用消息传递机制意味着客户端无需轮询即可接收新状态。
默认情况下,消息是瞬时(非持久化)的,因此如果代理(broker)重启,结果将会丢失。
你可以通过设置 result_persistent
来配置后端发送持久化消息。
The RPC result backend (rpc://) is special as it doesn't actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once, and only by the client that initiated the task. Two different processes can't wait for the same result.
Even with that limitation, it is an excellent choice if you need to receive state changes in real-time. Using messaging means the client doesn't have to poll for new states.
The messages are transient (non-persistent) by default, so the results will
disappear if the broker restarts. You can configure the result backend to send
persistent messages using the result_persistent
setting.
数据库结果后端¶
Database Result Backend
将状态存储在数据库中对许多人来说非常方便,特别是当已有数据库基础设施(例如 Web 应用)时, 但这种方式也存在一些局限性:
轮询数据库以获取新状态的代价很高,因此你应当增加操作的轮询间隔,比如在调用 result.get() 时。
有些数据库的默认事务隔离级别并不适合用于轮询数据表以检测变化。
以 MySQL 为例,其默认的事务隔离级别是 REPEATABLE-READ: 这意味着在当前事务提交之前,该事务无法看到其他事务所做的更改。
因此,推荐将事务隔离级别修改为 READ-COMMITTED。
Keeping state in the database can be convenient for many, especially for web applications with a database already in place, but it also comes with limitations.
Polling the database for new states is expensive, and so you should increase the polling intervals of operations, such as result.get().
Some databases use a default transaction isolation level that isn't suitable for polling tables for changes.
In MySQL the default transaction isolation level is REPEATABLE-READ: meaning the transaction won't see changes made by other transactions until the current transaction is committed.
Changing that to the READ-COMMITTED isolation level is recommended.
内置状态¶
Built-in States
PENDING¶
任务正在等待执行或未知. 任何未知的任务 ID 都暗示处于待处理状态.
Task is waiting for execution or unknown. Any task id that's not known is implied to be in the pending state.
STARTED¶
任务已开始.
默认情况下不报告,要启用请参阅 app.Task.track_started
。
- meta-data:
执行任务的工作进程的 pid 和 hostname 。
Task has been started.
Not reported by default, to enable please see app.Task.track_started
.
- meta-data:
pid and hostname of the worker process executing the task.
SUCCESS¶
任务已成功执行。
- meta-data:
result 包含任务的返回值。
- propagates:
Yes
- ready:
Yes
Task has been successfully executed.
- meta-data:
result contains the return value of the task.
- propagates:
Yes
- ready:
Yes
FAILURE¶
任务执行失败.
- meta-data:
result 包含发生的异常, traceback 包含引发异常时堆栈的回溯。
- propagates:
Yes
Task execution resulted in failure.
- meta-data:
result contains the exception occurred, and traceback contains the backtrace of the stack at the point when the exception was raised.
- propagates:
Yes
RETRY¶
任务正在重试.
- meta-data:
result 包含导致重试的异常, traceback 包含引发异常时堆栈的回溯。
- propagates:
No
Task is being retried.
- meta-data:
result contains the exception that caused the retry, and traceback contains the backtrace of the stack at the point when the exceptions was raised.
- propagates:
No
REVOKED¶
任务已被撤销.
- propagates:
Yes
Task has been revoked.
- propagates:
Yes
自定义状态¶
Custom states
你可以轻松地定义自己的任务状态,只需要一个唯一的名称即可。
状态名通常是一个大写字符串。例如,你可以参考 可中止任务
,
其中定义了一个自定义的 ABORTED
状态。
使用 update_state()
方法可以更新任务的状态:
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
在上面的例子中,我创建了状态 "PROGRESS", 用于告知了解此状态的应用程序当前任务正在进行中, 并通过 current 和 total 这两个计数值来表示任务的进度。 例如,可以基于这些元数据来创建进度条显示。
You can easily define your own states, all you need is a unique name.
The name of the state is usually an uppercase string. As an example
you could have a look at the abortable tasks
which defines a custom ABORTED
state.
Use update_state()
to update a task's state:.
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta={'current': i, 'total': len(filenames)})
Here I created the state "PROGRESS", telling any application aware of this state that the task is currently in progress, and also where it is in the process by having current and total counts as part of the state meta-data. This can then be used to create progress bars for example.
创建可 pickle 的异常¶
Creating pickleable exceptions
一个鲜为人知的 Python 事实是:异常要能被 pickle 模块序列化,必须遵循一些简单的规则。
如果任务抛出了无法被 pickle 序列化的异常,那么在使用 Pickle 作为序列化器时,任务将无法正常工作。
为了确保异常对象可以被 pickle 正确序列化,
异常类 必须 在其 .args
属性中保存创建实例时的原始参数。
最简单的方式就是在自定义异常的构造函数中调用 Exception.__init__
方法。
下面是一些可以正常工作的示例,以及一个错误示例:
# 正确示例:
class HttpError(Exception):
pass
# 错误示例:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
# 正确示例:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
Exception.__init__(self, status_code) # <-- 必须调用
因此,规则总结如下:
如果异常类支持自定义参数 *args
,
则必须在构造函数中调用 Exception.__init__(self, *args)
。
需要注意的是: 目前对于 关键字参数(keyword arguments) 并没有特别的支持, 因此如果希望在反序列化(unpickle)时保留关键字参数, 必须将它们作为普通的位置参数(args)传递:
class HttpError(Exception):
def __init__(self, status_code, headers=None, body=None):
self.status_code = status_code
self.headers = headers
self.body = body
super(HttpError, self).__init__(status_code, headers, body)
A rarely known Python fact is that exceptions must conform to some simple rules to support being serialized by the pickle module.
Tasks that raise exceptions that aren't pickleable won't work properly when Pickle is used as the serializer.
To make sure that your exceptions are pickleable the exception
MUST provide the original arguments it was instantiated
with in its .args
attribute. The simplest way
to ensure this is to have the exception call Exception.__init__
.
Let's look at some examples that work, and one that doesn't:
# OK:
class HttpError(Exception):
pass
# BAD:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
# OK:
class HttpError(Exception):
def __init__(self, status_code):
self.status_code = status_code
Exception.__init__(self, status_code) # <-- REQUIRED
So the rule is:
For any exception that supports custom arguments *args
,
Exception.__init__(self, *args)
must be used.
There's no special support for keyword arguments, so if you want to preserve keyword arguments when the exception is unpickled you have to pass them as regular args:
class HttpError(Exception):
def __init__(self, status_code, headers=None, body=None):
self.status_code = status_code
self.headers = headers
self.body = body
super(HttpError, self).__init__(status_code, headers, body)
任务半谓词¶
Semipredicates
worker 将任务包装在一个跟踪函数中,用于记录任务的最终状态。可以使用多种异常来通知此函数,以改变其处理任务返回的方式。
译注: Semipredicates
在 Celery 的上下文中,"task semipredicates" 是一个相对专业的术语,其翻译需要结合技术语义和中文表达习惯。以下是针对该术语的详细解析和推荐译法:
术语解析
Semipredicate(半谓词):
计算机科学中的原意:指既返回结果又返回状态(如成功/失败)的函数,例如 C 语言的
fopen()
在失败时返回NULL
同时设置errno
。在 Celery 中的延伸:可能指任务(task)在执行时既产生返回值,又隐含状态信息(如
SUCCESS
、FAILURE
或重试状态)。
Task Semipredicates:
指 Celery 任务设计中同时承载业务逻辑结果和自身执行状态的特性,例如:
@app.task(bind=True) def my_task(self, x, y): try: return x / y # 业务结果 except ZeroDivisionError: self.retry(countdown=60) # 状态控制
推荐翻译方案
根据上下文可选择以下译法:
英文术语 |
推荐中文翻译 |
适用场景 |
Task Semipredicates |
任务半谓词 |
强调技术实现(学术/底层文档) |
任务状态复合体 |
强调状态与结果的结合(设计文档) |
|
双态任务 |
简洁表达(非正式场合) |
使用示例
(1) 技术文档中的翻译
原文:
Celery tasks act as semipredicates, returning both computed values and their own execution status.
译文:
Celery 任务作为任务半谓词,既返回计算结果,又携带自身执行状态。
(2) 设计文档中的翻译
原文:
The semipredicate nature of tasks allows for robust error handling.
译文:
任务的状态复合体特性使其支持健壮的错误处理。
注意事项
一致性:在同一个项目中保持术语翻译统一。
注释说明:首次出现时可添加英文原词和简要解释,例如:
任务半谓词(Task Semipredicates,指同时返回结果和状态的任务)
受众适配:面向开发者可直接用英文术语,面向非技术读者建议意译。
相关术语对照表
英文术语 |
中文翻译 |
Task |
任务 |
Predicate Function |
谓词函数 |
Stateful Task |
有状态任务 |
Idempotent Task |
幂等任务 |
通过以上方式,可以准确传达 "task semipredicates" 在 Celery 中的技术内涵。
The worker wraps the task in a tracing function that records the final state of the task. There are a number of exceptions that can be used to signal this function to change how it treats the return of the task.
忽略¶
Ignore
任务可以抛出 Ignore
异常以强制 Worker 忽略该任务。
这意味着任务的状态不会被记录,但消息仍会被确认(从队列中移除)。
这可用于实现自定义的类似撤销(revoke)的功能,或者手动存储任务结果。
示例:将撤销的任务保存在 Redis 的集合中:
from celery.exceptions import Ignore
@app.task(bind=True)
def some_task(self):
if redis.ismember('tasks.revoked', self.request.id):
raise Ignore()
示例:手动存储任务结果:
from celery import states
from celery.exceptions import Ignore
@app.task(bind=True)
def get_tweets(self, user):
timeline = twitter.get_timeline(user)
if not self.request.called_directly:
self.update_state(state=states.SUCCESS, meta=timeline)
raise Ignore()
The task may raise Ignore
to force the worker to ignore the
task. This means that no state will be recorded for the task, but the
message is still acknowledged (removed from queue).
This can be used if you want to implement custom revoke-like functionality, or manually store the result of a task.
Example keeping revoked tasks in a Redis set:
from celery.exceptions import Ignore
@app.task(bind=True)
def some_task(self):
if redis.ismember('tasks.revoked', self.request.id):
raise Ignore()
Example that stores results manually:
from celery import states
from celery.exceptions import Ignore
@app.task(bind=True)
def get_tweets(self, user):
timeline = twitter.get_timeline(user)
if not self.request.called_directly:
self.update_state(state=states.SUCCESS, meta=timeline)
raise Ignore()
拒绝¶
Reject
任务可以抛出 Reject
异常,以使用 AMQP 的 basic_reject
方法拒绝任务消息。
除非启用了 Task.acks_late
,否则此操作不会产生任何效果。
拒绝一条消息的效果与确认(ack)类似,但某些消息代理还可能实现了其他功能可供利用。 例如,RabbitMQ 支持 死信交换器(Dead Letter Exchanges) 的概念, 可以将队列配置为使用一个死信交换器,拒绝的消息会被重新投递到该交换器。
拒绝还可以用于重新排队消息,但请务必谨慎使用,因为这可能很容易导致无限的消息循环。
示例:当任务导致内存溢出时使用 reject:
import errno
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def render_scene(self, path):
file = get_file(path)
try:
renderer.render_scene(file)
# 如果文件太大无法载入内存,
# 则拒绝该任务,使其被投递到死信交换器,
# 以便我们手动检查具体情况。
except MemoryError as exc:
raise Reject(exc, requeue=False)
except OSError as exc:
if exc.errno == errno.ENOMEM:
raise Reject(exc, requeue=False)
# 对于其他错误,10 秒后重试。
except Exception as exc:
raise self.retry(exc, countdown=10)
示例:重新排队消息:
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def requeues(self):
if not self.request.delivery_info['redelivered']:
raise Reject('no reason', requeue=True)
print('received two times')
有关 basic_reject
方法的更多信息,请参阅所使用消息代理的文档。
The task may raise Reject
to reject the task message using
AMQPs basic_reject
method. This won't have any effect unless
Task.acks_late
is enabled.
Rejecting a message has the same effect as acking it, but some brokers may implement additional functionality that can be used. For example RabbitMQ supports the concept of Dead Letter Exchanges where a queue can be configured to use a dead letter exchange that rejected messages are redelivered to.
Reject can also be used to re-queue messages, but please be very careful when using this as it can easily result in an infinite message loop.
Example using reject when a task causes an out of memory condition:
import errno
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def render_scene(self, path):
file = get_file(path)
try:
renderer.render_scene(file)
# if the file is too big to fit in memory
# we reject it so that it's redelivered to the dead letter exchange
# and we can manually inspect the situation.
except MemoryError as exc:
raise Reject(exc, requeue=False)
except OSError as exc:
if exc.errno == errno.ENOMEM:
raise Reject(exc, requeue=False)
# For any other error we retry after 10 seconds.
except Exception as exc:
raise self.retry(exc, countdown=10)
Example re-queuing the message:
from celery.exceptions import Reject
@app.task(bind=True, acks_late=True)
def requeues(self):
if not self.request.delivery_info['redelivered']:
raise Reject('no reason', requeue=True)
print('received two times')
Consult your broker documentation for more details about the basic_reject
method.
重试¶
Retry
自定义任务类¶
Custom task classes
实例化¶
Instantiation
任务在每次请求时 不会 重新实例化,而是以全局实例的形式注册在任务注册表中。
这意味着 __init__
构造函数在每个进程中只会被调用一次,并且任务类在语义上更接近于一个 Actor 模型。
如果你定义了如下任务:
from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
并且将每次请求都路由到同一个进程中执行,那么它将在请求之间保持状态。
这对于缓存资源也很有用,例如以下用于缓存数据库连接的 Task 基类:
from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
A task is not instantiated for every request, but is registered in the task registry as a global instance.
This means that the __init__
constructor will only be called
once per process, and that the task class is semantically closer to an
Actor.
If you have a task,
from celery import Task
class NaiveAuthenticateServer(Task):
def __init__(self):
self.users = {'george': 'password'}
def run(self, username, password):
try:
return self.users[username] == password
except KeyError:
return False
And you route every request to the same process, then it will keep state between requests.
This can also be useful to cache resources, For example, a base Task class that caches a database connection:
from celery import Task
class DatabaseTask(Task):
_db = None
@property
def db(self):
if self._db is None:
self._db = Database.connect()
return self._db
每个任务的使用¶
Per task usage
可以像下面这样将上述基类应用于每个任务中:
from celery.app import task
@app.task(base=DatabaseTask, bind=True)
def process_rows(self: task):
for row in self.db.table.all():
process_row(row)
这样, process_rows
任务中的 db
属性在每个进程中将始终保持一致。
The above can be added to each task like this:
from celery.app import task
@app.task(base=DatabaseTask, bind=True)
def process_rows(self: task):
for row in self.db.table.all():
process_row(row)
The db
attribute of the process_rows
task will then
always stay the same in each process.
App范围的使用¶
App-wide usage
你还可以通过在实例化应用时传入 task_cls
参数,指定整个 Celery 应用使用你自定义的任务类。
该参数可以是表示任务类的 Python 路径字符串,或是类对象本身:
from celery import Celery
app = Celery('tasks', task_cls='your.module.path:DatabaseTask')
这样,所有使用装饰器语法声明的任务都将使用你自定义的 DatabaseTask
类,并且都将具备一个 db
属性。
默认值是 Celery 提供的类: 'celery.app.task:Task'
。
You can also use your custom class in your whole Celery app by passing it as
the task_cls
argument when instantiating the app. This argument should be
either a string giving the python path to your Task class or the class itself:
from celery import Celery
app = Celery('tasks', task_cls='your.module.path:DatabaseTask')
This will make all your tasks declared using the decorator syntax within your
app to use your DatabaseTask
class and will all have a db
attribute.
The default value is the class provided by Celery: 'celery.app.task:Task'
.
处理程序¶
Handlers
- before_start(self, task_id, args, kwargs)¶
任务在开始执行前由 Worker 调用。
Added in version 5.2.
- 参数:
task_id -- 待执行任务的唯一标识符。
args -- 待执行任务的原始位置参数。
kwargs -- 待执行任务的原始关键字参数。
此处理器的返回值将被忽略。
- after_return(self, status, retval, task_id, args, kwargs, einfo)¶
任务返回后调用的处理器。
- 参数:
status -- 当前任务的状态。
retval -- 任务的返回值或异常对象。
task_id -- 任务的唯一标识符。
args -- 已返回任务的原始位置参数。
kwargs -- 已返回任务的原始关键字参数。
- 关键字参数:
einfo --
ExceptionInfo
实例,包含 traceback(若存在)。
此处理器的返回值将被忽略。
- on_failure(self, exc, task_id, args, kwargs, einfo)¶
当任务失败时由 Worker 调用。
- 参数:
exc -- 任务抛出的异常。
task_id -- 失败任务的唯一标识符。
args -- 失败任务的原始位置参数。
kwargs -- 失败任务的原始关键字参数。
- 关键字参数:
einfo --
ExceptionInfo
实例,包含 traceback。
此处理器的返回值将被忽略。
- on_retry(self, exc, task_id, args, kwargs, einfo)¶
当任务即将重试时由 Worker 调用。
- 参数:
exc -- 传递给
retry()
的异常。task_id -- 被重试任务的唯一标识符。
args -- 被重试任务的原始位置参数。
kwargs -- 被重试任务的原始关键字参数。
- 关键字参数:
einfo --
ExceptionInfo
实例,包含 traceback。
此处理器的返回值将被忽略。
- on_success(self, retval, task_id, args, kwargs)¶
当任务成功执行时由 Worker 调用。
- 参数:
retval -- 任务的返回值。
task_id -- 已执行任务的唯一标识符。
args -- 已执行任务的原始位置参数。
kwargs -- 已执行任务的原始关键字参数。
此处理器的返回值将被忽略。
- before_start(self, task_id, args, kwargs)
Run by the worker before the task starts executing.
Added in version 5.2.
- 参数:
task_id -- Unique id of the task to execute.
args -- Original arguments for the task to execute.
kwargs -- Original keyword arguments for the task to execute.
The return value of this handler is ignored.
- after_return(self, status, retval, task_id, args, kwargs, einfo)
Handler called after the task returns.
- 参数:
status -- Current task state.
retval -- Task return value/exception.
task_id -- Unique id of the task.
args -- Original arguments for the task that returned.
kwargs -- Original keyword arguments for the task that returned.
- 关键字参数:
einfo --
ExceptionInfo
instance, containing the traceback (if any).
The return value of this handler is ignored.
- on_failure(self, exc, task_id, args, kwargs, einfo)
This is run by the worker when the task fails.
- 参数:
exc -- The exception raised by the task.
task_id -- Unique id of the failed task.
args -- Original arguments for the task that failed.
kwargs -- Original keyword arguments for the task that failed.
- 关键字参数:
einfo --
ExceptionInfo
instance, containing the traceback.
The return value of this handler is ignored.
- on_retry(self, exc, task_id, args, kwargs, einfo)
This is run by the worker when the task is to be retried.
- 参数:
exc -- The exception sent to
retry()
.task_id -- Unique id of the retried task.
args -- Original arguments for the retried task.
kwargs -- Original keyword arguments for the retried task.
- 关键字参数:
einfo --
ExceptionInfo
instance, containing the traceback.
The return value of this handler is ignored.
- on_success(self, retval, task_id, args, kwargs)
Run by the worker if the task executes successfully.
- 参数:
retval -- The return value of the task.
task_id -- Unique id of the executed task.
args -- Original arguments for the executed task.
kwargs -- Original keyword arguments for the executed task.
The return value of this handler is ignored.
请求和自定义请求¶
Requests and custom requests
在收到执行任务的消息时, worker 会创建一个
request
来表示这种需求。
自定义任务类可以通过更改属性 celery.app.task.Task.Request
来重写使用的请求类。你可以直接分配自定义请求类本身,或者
它的完全限定名称。
请求有多个职责。自定义请求类应覆盖所有这些职责——它们负责实际
执行和跟踪任务。我们强烈推荐继承 celery.worker.request.Request
。
当使用 pre-forking worker 时,on_timeout()
和
on_failure()
方法将在主工作进程中执行。
应用程序可以利用这一功能来检测 celery.app.task.Task.on_failure()
未检测到的失败。
例如,以下自定义请求类会检测并记录硬时间限制和其他失败。
import logging
from celery import Task
from celery.worker.request import Request
logger = logging.getLogger('my.package')
class MyRequest(Request):
'一个最小的自定义请求,用于记录失败和硬时间限制。'
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'任务 %s 强制执行了硬超时',
self.task.name
)
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
super().on_failure(
exc_info,
send_failed_event=send_failed_event,
return_ok=return_ok
)
logger.warning(
'任务 %s 检测到失败',
self.task.name
)
class MyTask(Task):
Request = MyRequest # 你可以使用 FQN 'my.package:MyRequest'
@app.task(base=MyTask)
def some_longrunning_task():
# 发挥你的想象力
Upon receiving a message to run a task, the worker
creates a request
to represent such
demand.
Custom task classes may override which request class to use by changing the
attribute celery.app.task.Task.Request
. You may either assign the
custom request class itself, or its fully qualified name.
The request has several responsibilities. Custom request classes should cover
them all -- they are responsible to actually run and trace the task. We
strongly recommend to inherit from celery.worker.request.Request
.
When using the pre-forking worker, the methods
on_timeout()
and
on_failure()
are executed in the main
worker process. An application may leverage such facility to detect failures
which are not detected using celery.app.task.Task.on_failure()
.
As an example, the following custom request detects and logs hard time limits, and other failures.
import logging
from celery import Task
from celery.worker.request import Request
logger = logging.getLogger('my.package')
class MyRequest(Request):
'A minimal custom request to log failures and hard time limits.'
def on_timeout(self, soft, timeout):
super(MyRequest, self).on_timeout(soft, timeout)
if not soft:
logger.warning(
'A hard timeout was enforced for task %s',
self.task.name
)
def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
super().on_failure(
exc_info,
send_failed_event=send_failed_event,
return_ok=return_ok
)
logger.warning(
'Failure detected for task %s',
self.task.name
)
class MyTask(Task):
Request = MyRequest # you can use a FQN 'my.package:MyRequest'
@app.task(base=MyTask)
def some_longrunning_task():
# use your imagination
工作原理¶
How it works
以下是技术细节。虽然这部分内容不是你必须了解的,但你可能会感兴趣。
所有定义的任务都会列在一个注册表中。注册表包含任务名称和对应的任务类的列表。 你可以自己检查这个注册表:
>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
<@task: celery.chord_unlock>,
'celery.backend_cleanup':
<@task: celery.backend_cleanup>,
'celery.chord':
<@task: celery.chord>}
这是 Celery 内置任务的列表。请注意,只有在定义任务的模块被导入时, 这些任务才会被注册。
默认加载器会导入 imports
配置项中列出的所有模块。
app.task()
装饰器负责将你的任务注册到应用程序的任务注册表中。
当任务被发送时,实际上并不会发送函数代码,只会发送任务名称以便执行。 当 Worker 收到消息时,它可以在任务注册表中查找该名称,找到执行代码。
这意味着你的 Worker 应该始终与客户端使用相同的软件版本。这是一个缺点, 但替代方案是一个尚未解决的技术挑战。
Here come the technical details. This part isn't something you need to know, but you may be interested.
All defined tasks are listed in a registry. The registry contains a list of task names and their task classes. You can investigate this registry yourself:
>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
<@task: celery.chord_unlock>,
'celery.backend_cleanup':
<@task: celery.backend_cleanup>,
'celery.chord':
<@task: celery.chord>}
This is the list of tasks built into Celery. Note that tasks will only be registered when the module they're defined in is imported.
The default loader imports any modules listed in the
imports
setting.
The app.task()
decorator is responsible for registering your task
in the applications task registry.
When tasks are sent, no actual function code is sent with it, just the name of the task to execute. When the worker then receives the message it can look up the name in its task registry to find the execution code.
This means that your workers should always be updated with the same software as the client. This is a drawback, but the alternative is a technical challenge that's yet to be solved.
技巧和最佳实践¶
Tips and Best Practices
忽略不需要的结果¶
Ignore results you don't want
如果你不关心任务的结果,务必设置 ignore_result
选项,
因为存储结果会浪费时间和资源。
@app.task(ignore_result=True)
def mytask():
something()
也可以使用 task_ignore_result
配置项全局禁用结果存储。
通过在调用 apply_async
时传递 ignore_result
布尔参数,可以在每次执行时启用/禁用结果存储。
@app.task
def mytask(x, y):
return x + y
# 不会存储结果
result = mytask.apply_async((1, 2), ignore_result=True)
print(result.get()) # -> None
# 会存储结果
result = mytask.apply_async((1, 2), ignore_result=False)
print(result.get()) # -> 3
默认情况下,当配置了结果后端时,任务会*不忽略结果*(ignore_result=False
)。
选项优先级顺序如下:
任务执行选项
ignore_result
If you don't care about the results of a task, be sure to set the
ignore_result
option, as storing results
wastes time and resources.
@app.task(ignore_result=True)
def mytask():
something()
Results can even be disabled globally using the task_ignore_result
setting.
Results can be enabled/disabled on a per-execution basis, by passing the ignore_result
boolean parameter,
when calling apply_async
.
@app.task
def mytask(x, y):
return x + y
# No result will be stored
result = mytask.apply_async((1, 2), ignore_result=True)
print(result.get()) # -> None
# Result will be stored
result = mytask.apply_async((1, 2), ignore_result=False)
print(result.get()) # -> 3
By default tasks will not ignore results (ignore_result=False
) when a result backend is configured.
The option precedence order is the following:
Global
task_ignore_result
ignore_result
optionTask execution option
ignore_result
更多优化技巧¶
More optimization tips
您可以在 优化指南 中找到更多优化技巧。
You find additional optimization tips in the Optimizing Guide.
避免启动同步子任务¶
Avoid launching synchronous subtasks
让一个任务等待另一个任务的结果是非常低效的, 如果工作进程池耗尽,甚至可能导致死锁。
相反,应该使你的设计异步化,例如使用 回调。
不推荐的做法:
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
推荐的做法:
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
在这里,我通过将不同的 signature()
任务链接起来创建了任务链。
你可以在 Canvas: 设计工作流 中阅读有关任务链和其他强大构造的信息。
默认情况下,Celery 不允许在任务内同步运行子任务, 但在极少数或极端情况下,你可能需要这样做。 警告: 启用子任务同步执行不推荐!
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get(disable_sync_subtasks=False)
info = parse_page.delay(page).get(disable_sync_subtasks=False)
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
Having a task wait for the result of another task is really inefficient, and may even cause a deadlock if the worker pool is exhausted.
Make your design asynchronous instead, for example by using callbacks.
Bad:
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
Good:
def update_page_info(url):
# fetch_page -> parse_page -> store_page
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
Here I instead created a chain of tasks by linking together
different signature()
's.
You can read about chains and other powerful constructs
at Canvas: 设计工作流.
By default Celery will not allow you to run subtasks synchronously within a task, but in rare or extreme cases you might need to do so. WARNING: enabling subtasks to run synchronously is not recommended!
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get(disable_sync_subtasks=False)
info = parse_page.delay(page).get(disable_sync_subtasks=False)
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
性能和策略¶
Performance and Strategies
粒度¶
Granularity
任务粒度是每个子任务所需的计算量。 通常来说,将问题拆分成许多小任务比将其拆分成少数几个长时间运行的任务要好。
使用较小的任务,你可以并行处理更多的任务,并且这些任务的运行时间不会长到阻塞工作进程,进而影响其它等待任务的处理。
然而,执行任务确实会有开销。需要发送消息,数据可能不在本地等等。所以,如果任务过于细粒度,添加的开销可能会消除任何潜在的好处。
The task granularity is the amount of computation needed by each subtask. In general it is better to split the problem up into many small tasks rather than have a few long running tasks.
With smaller tasks you can process more tasks in parallel and the tasks won't run long enough to block the worker from processing other waiting tasks.
However, executing a task does have overhead. A message needs to be sent, data may not be local, etc. So if the tasks are too fine-grained the overhead added probably removes any benefit.
参见
The book Art of Concurrency has a section dedicated to the topic of task granularity [AOC1].
数据本地性¶
Data locality
处理任务的工作进程应该尽可能接近数据。最好的情况是数据存在内存中,最坏的情况是需要从另一个大陆进行完全传输。
如果数据距离较远,可以尝试在该位置运行另一个工作进程,或者如果不可能的话,可以缓存经常使用的数据,或者预加载你知道将会使用的数据。
在工作进程之间共享数据的最简单方法是使用分布式缓存系统,比如 memcached。
参见
Jim Gray 的论文 分布式计算经济 是关于数据本地化主题的极好介绍。
The worker processing the task should be as close to the data as possible. The best would be to have a copy in memory, the worst would be a full transfer from another continent.
If the data is far away, you could try to run another worker at location, or if that's not possible - cache often used data, or preload data you know is going to be used.
The easiest way to share data between workers is to use a distributed cache system, like memcached.
参见
The paper Distributed Computing Economics by Jim Gray is an excellent introduction to the topic of data locality.
状态¶
State
由于 Celery 是一个分布式系统,你无法知道任务将在哪个进程或机器上执行。你甚至无法知道任务是否会及时运行。
古老的异步格言告诉我们:“确认世界的状态是任务的责任”。这意味着,世界观可能在任务请求之后发生变化,因此任务有责任确保世界是应该有的样子;如果你有一个重新索引搜索引擎的任务,并且该搜索引擎应该每隔最多 5 分钟才重新索引一次,那么它必须由任务来确认,而不是调用者。
另一个问题是 Django 模型对象。它们不应该作为任务的参数传递。通常最好在任务运行时重新从数据库中获取对象,因为使用旧数据可能会导致竞争条件。
设想以下场景,你有一篇文章和一个任务,该任务自动扩展其中的一些缩写:
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('MyCorp', 'My Corporation')
article.save()
首先,一个作者创建并保存了一篇文章,然后该作者点击按钮,启动了缩写扩展任务:
>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)
现在,队列非常忙碌,因此任务将在 2 分钟后才执行。 与此同时,另一个作者对文章进行了修改,因此,当任务最终执行时,文章的正文被恢复到了旧版本,因为任务参数中的文章正文是旧的。
解决竞争条件很容易,只需改用文章的 ID,并在任务体内重新获取文章:
@app.task
def expand_abbreviations(article_id):
article = Article.objects.get(id=article_id)
article.body.replace('MyCorp', 'My Corporation')
article.save()
>>> expand_abbreviations.delay(article_id)
这种方法甚至可能带来性能上的好处,因为发送大消息可能非常昂贵。
Since Celery is a distributed system, you can't know which process, or on what machine the task will be executed. You can't even know if the task will run in a timely manner.
The ancient async sayings tells us that “asserting the world is the responsibility of the task”. What this means is that the world view may have changed since the task was requested, so the task is responsible for making sure the world is how it should be; If you have a task that re-indexes a search engine, and the search engine should only be re-indexed at maximum every 5 minutes, then it must be the tasks responsibility to assert that, not the callers.
Another gotcha is Django model objects. They shouldn't be passed on as arguments to tasks. It's almost always better to re-fetch the object from the database when the task is running instead, as using old data may lead to race conditions.
Imagine the following scenario where you have an article and a task that automatically expands some abbreviations in it:
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('MyCorp', 'My Corporation')
article.save()
First, an author creates an article and saves it, then the author clicks on a button that initiates the abbreviation task:
>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)
Now, the queue is very busy, so the task won't be run for another 2 minutes. In the meantime another author makes changes to the article, so when the task is finally run, the body of the article is reverted to the old version because the task had the old body in its argument.
Fixing the race condition is easy, just use the article id instead, and re-fetch the article in the task body:
@app.task
def expand_abbreviations(article_id):
article = Article.objects.get(id=article_id)
article.body.replace('MyCorp', 'My Corporation')
article.save()
>>> expand_abbreviations.delay(article_id)
There might even be performance benefits to this approach, as sending large messages may be expensive.
数据库事务¶
Database transactions
我们来看另一个例子:
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
return HttpResponseRedirect('/articles/')
这是一个 Django 视图,它在数据库中创建了一个文章对象, 然后将主键传递给任务。它使用了 transaction.atomic 装饰器,当视图返回时,事务会被提交,或者如果视图抛出异常,事务会被回滚。
这里有一个竞争条件,因为事务是原子的。这意味着文章对象在视图函数返回响应之前不会持久化到数据库。如果异步任务在事务提交之前开始执行,它可能会尝试在文章对象不存在时查询该对象。为了解决这个问题,我们需要确保在触发任务之前提交事务。
解决方法是使用 delay_on_commit()
:
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay_on_commit(article.pk)
return HttpResponseRedirect('/articles/')
该方法在 Celery 5.4 中添加。它是一个快捷方式,使用 Django 的 on_commit
回调,在所有事务成功提交后启动你的 Celery 任务。
Let's have a look at another example:
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
return HttpResponseRedirect('/articles/')
This is a Django view creating an article object in the database, then passing the primary key to a task. It uses the transaction.atomic decorator, that will commit the transaction when the view returns, or roll back if the view raises an exception.
There is a race condition because transactions are atomic. This means the article object is not persisted to the database until after the view function returns a response. If the asynchronous task starts executing before the transaction is committed, it may attempt to query the article object before it exists. To prevent this, we need to ensure that the transaction is committed before triggering the task.
The solution is to use delay_on_commit()
instead:
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay_on_commit(article.pk)
return HttpResponseRedirect('/articles/')
This method was added in Celery 5.4. It's a shortcut that uses Django's
on_commit
callback to launch your Celery task once all transactions
have been committed successfully.
使用 Celery 5.4 及以下版本¶
With Celery <5.4
如果你使用的是较旧版本的 Celery,可以直接使用 Django 的回调来复制这种行为,如下所示:
import functools
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
transaction.on_commit(
functools.partial(expand_abbreviations.delay, article.pk)
)
return HttpResponseRedirect('/articles/')
备注
on_commit
在 Django 1.9 及以上版本中可用。如果你使用的是更早版本,则可以通过 django-transaction-hooks 库来添加对其的支持。
If you're using an older version of Celery, you can replicate this behaviour using the Django callback directly as follows:
import functools
from django.db import transaction
from django.http import HttpResponseRedirect
@transaction.atomic
def create_article(request):
article = Article.objects.create()
transaction.on_commit(
functools.partial(expand_abbreviations.delay, article.pk)
)
return HttpResponseRedirect('/articles/')
备注
on_commit
is available in Django 1.9 and above, if you are using a
version prior to that then the django-transaction-hooks library
adds support for this.
示例¶
Example
让我们看一个实际的例子:一个博客,评论需要进行垃圾邮件过滤。当评论被创建时,垃圾邮件过滤器在后台运行,因此用户无需等待其完成。
我有一个 Django 博客应用,允许对博客文章进行评论。下面是我为这个应用描述的部分模型、视图和任务。
Let's take a real world example: a blog where comments posted need to be filtered for spam. When the comment is created, the spam filter runs in the background, so the user doesn't have to wait for it to finish.
I have a Django blog application allowing comments on blog posts. I'll describe parts of the models/views and tasks for this application.
blog/models.py
¶
评论模型如下所示:
from django.db import models
from django.utils.translation import ugettext_lazy as _
class Comment(models.Model):
name = models.CharField(_('name'), max_length=64)
email_address = models.EmailField(_('email address'))
homepage = models.URLField(_('home page'),
blank=True, verify_exists=False)
comment = models.TextField(_('comment'))
pub_date = models.DateTimeField(_('Published date'),
editable=False, auto_add_now=True)
is_spam = models.BooleanField(_('spam?'),
default=False, editable=False)
class Meta:
verbose_name = _('comment')
verbose_name_plural = _('comments')
在发表评论的视图中,我首先将评论写入数据库,然后在后台启动垃圾邮件过滤任务。
The comment model looks like this:
from django.db import models
from django.utils.translation import ugettext_lazy as _
class Comment(models.Model):
name = models.CharField(_('name'), max_length=64)
email_address = models.EmailField(_('email address'))
homepage = models.URLField(_('home page'),
blank=True, verify_exists=False)
comment = models.TextField(_('comment'))
pub_date = models.DateTimeField(_('Published date'),
editable=False, auto_add_now=True)
is_spam = models.BooleanField(_('spam?'),
default=False, editable=False)
class Meta:
verbose_name = _('comment')
verbose_name_plural = _('comments')
In the view where the comment is posted, I first write the comment to the database, then I launch the spam filter task in the background.
blog/views.py
¶
from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response
from blog import tasks
from blog.models import Comment
class CommentForm(forms.ModelForm):
class Meta:
model = Comment
def add_comment(request, slug, template_name='comments/create.html'):
post = get_object_or_404(Entry, slug=slug)
remote_addr = request.META.get('REMOTE_ADDR')
if request.method == 'post':
form = CommentForm(request.POST, request.FILES)
if form.is_valid():
comment = form.save()
# 异步检查垃圾邮件。
tasks.spam_filter.delay(comment_id=comment.id,
remote_addr=remote_addr)
return HttpResponseRedirect(post.get_absolute_url())
else:
form = CommentForm()
context = RequestContext(request, {'form': form})
return render_to_response(template_name, context_instance=context)
为了过滤评论中的垃圾邮件,我使用了 Akismet ,这是一个用于过滤 Wordpress 免费博客平台中发布评论的垃圾邮件服务。 Akismet 对个人使用是免费的,但商业使用需要付费。你需要注册该服务以获得 API 密钥。
为了调用 Akismet 的 API,我使用了 Michael Foord 编写的 akismet.py 库。
from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response
from blog import tasks
from blog.models import Comment
class CommentForm(forms.ModelForm):
class Meta:
model = Comment
def add_comment(request, slug, template_name='comments/create.html'):
post = get_object_or_404(Entry, slug=slug)
remote_addr = request.META.get('REMOTE_ADDR')
if request.method == 'post':
form = CommentForm(request.POST, request.FILES)
if form.is_valid():
comment = form.save()
# Check spam asynchronously.
tasks.spam_filter.delay(comment_id=comment.id,
remote_addr=remote_addr)
return HttpResponseRedirect(post.get_absolute_url())
else:
form = CommentForm()
context = RequestContext(request, {'form': form})
return render_to_response(template_name, context_instance=context)
To filter spam in comments I use Akismet, the service used to filter spam in comments posted to the free blog platform Wordpress. Akismet is free for personal use, but for commercial use you need to pay. You have to sign up to their service to get an API key.
To make API calls to Akismet I use the akismet.py library written by Michael Foord.
blog/tasks.py
¶
from celery import Celery
from akismet import Akismet
from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site
from blog.models import Comment
app = Celery(broker='amqp://')
@app.task
def spam_filter(comment_id, remote_addr=None):
logger = spam_filter.get_logger()
logger.info('Running spam filter for comment %s', comment_id)
comment = Comment.objects.get(pk=comment_id)
current_domain = Site.objects.get_current().domain
akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
if not akismet.verify_key():
raise ImproperlyConfigured('Invalid AKISMET_KEY')
is_spam = akismet.comment_check(user_ip=remote_addr,
comment_content=comment.comment,
comment_author=comment.name,
comment_author_email=comment.email_address)
if is_spam:
comment.is_spam = True
comment.save()
return is_spam