消息协议¶
Message Protocol
Task 消息¶
Task messages
版本 2¶
Version 2
定义¶
Definition
properties = {
'correlation_id': uuid task_id,
'content_type': string mimetype,
'content_encoding': string encoding,
# optional
'reply_to': string queue_or_url,
}
headers = {
'lang': string 'py'
'task': string task,
'id': uuid task_id,
'root_id': uuid root_id,
'parent_id': uuid parent_id,
'group': uuid group_id,
# optional
'meth': string method_name,
'shadow': string alias_name,
'eta': iso8601 ETA,
'expires': iso8601 expires,
'retries': int retries,
'timelimit': (soft, hard),
'argsrepr': str repr(args),
'kwargsrepr': str repr(kwargs),
'origin': str nodename,
'replaced_task_nesting': int
}
body = (
object[] args,
Mapping kwargs,
Mapping embed {
'callbacks': Signature[] callbacks,
'errbacks': Signature[] errbacks,
'chain': Signature[] chain,
'chord': Signature chord_callback,
}
)
示例¶
Example
此示例使用协议版本 2 发送任务消息:
This example sends a task message using version 2 of the protocol:
# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8
import json
import os
import socket
task_id = uuid()
args = (2, 2)
kwargs = {}
basic_publish(
message=json.dumps((args, kwargs, None)),
application_headers={
'lang': 'py',
'task': 'proj.tasks.add',
'argsrepr': repr(args),
'kwargsrepr': repr(kwargs),
'origin': '@'.join([os.getpid(), socket.gethostname()])
}
properties={
'correlation_id': task_id,
'content_type': 'application/json',
'content_encoding': 'utf-8',
}
)
与版本 1 的变更¶
Changes from version 1
通过
task
消息头检测协议版本。通过
lang
消息头支持多语言。 Worker 可以将消息重定向给支持该语言的其他 worker。元数据迁移至消息头(headers)。 这意味着 worker 或中间件可以在不解码消息主体(payload)的情况下 基于消息头内容做出决策,例如 payload 使用了 Python 特有的 pickle 序列化方式。
始终使用 UTC 时间 不再使用
utc
标志,因此任何缺失时区信息的时间值将被视为 UTC 时间。消息体(body)仅用于语言相关的数据。 - Python 将 args/kwargs 和嵌入的签名存储于消息体中。 - 如果消息使用原始编码(raw encoding),则原始数据会作为单一参数传递给函数。 - Java/C 等语言可以使用 Thrift/protobuf 文档作为消息体内容。
origin
表示发送任务的节点名称。基于
task
与meth
消息头分发给对应 actor。meth
在 Python 中未使用,但将来可能用于指定类+方法对。Chain(链式调用)获得了专用字段。 之前将 chain 递归地嵌套在
callbacks
参数中, 会在递归层级过深时导致问题。新消息协议通过指定一个签名列表来解决此问题, 每个任务在发送下一个消息时从列表中弹出一个签名:
execute_task(message) chain = embed['chain'] if chain: sig = maybe_signature(chain.pop()) sig.apply_async(chain=chain)
correlation_id
替代了原先的task_id
字段。root_id
和parent_id
字段用于追踪完整的工作流链路。shadow
允许为日志指定不同的任务名称, 监控工具可用于表示某些以函数作为参数调用的任务概念:from celery.utils.imports import qualname class PickleTask(Task): def unpack_args(self, fun, args=()): return fun, args def apply_async(self, args, kwargs, **options): fun, real_args = self.unpack_args(*args) return super().apply_async( (fun, real_args, kwargs), shadow=qualname(fun), **options ) @app.task(base=PickleTask) def call(fun, args, kwargs): return fun(*args, **kwargs)
Protocol version detected by the presence of a
task
message header.- Support for multiple languages via the
lang
header. Worker may redirect the message to a worker that supports the language.
- Support for multiple languages via the
- Meta-data moved to headers.
This means that workers/intermediates can inspect the message and make decisions based on the headers without decoding the payload (that may be language specific, for example serialized by the Python specific pickle serializer).
- Always UTC
There's no
utc
flag anymore, so any time information missing timezone will be expected to be in UTC time.
- Body is only for language specific data.
Python stores args/kwargs and embedded signatures in body.
If a message uses raw encoding then the raw data
will be passed as a single argument to the function.
Java/C, etc. can use a Thrift/protobuf document as the body
origin
is the name of the node sending the task.- Dispatches to actor based on
task
,meth
headers meth
is unused by Python, but may be used in the future to specify class+method pairs.
- Dispatches to actor based on
- Chain gains a dedicated field.
Reducing the chain into a recursive
callbacks
argument causes problems when the recursion limit is exceeded.This is fixed in the new message protocol by specifying a list of signatures, each task will then pop a task off the list when sending the next message:
execute_task(message) chain = embed['chain'] if chain: sig = maybe_signature(chain.pop()) sig.apply_async(chain=chain)
correlation_id
replacestask_id
field.root_id
andparent_id
fields helps keep track of work-flows.shadow
lets you specify a different name for logs, monitors can be used for concepts like tasks that calls a function specified as argument:from celery.utils.imports import qualname class PickleTask(Task): def unpack_args(self, fun, args=()): return fun, args def apply_async(self, args, kwargs, **options): fun, real_args = self.unpack_args(*args) return super().apply_async( (fun, real_args, kwargs), shadow=qualname(fun), **options ) @app.task(base=PickleTask) def call(fun, args, kwargs): return fun(*args, **kwargs)
版本 1¶
Version 1
在协议的第 1 版中,所有字段都存储在消息体中: 这意味着 worker 和中间消费者必须反序列化 payload 才能读取字段内容。
In version 1 of the protocol all fields are stored in the message body: meaning workers and intermediate consumers must deserialize the payload to read the fields.
消息正文¶
Message body
task
- string:
任务名称。必需
id
- string:
任务的唯一标识(UUID)。必需
args
- list:
参数列表。如果未提供则为一个空列表。
kwargs
- dictionary:
关键字参数字典。如果未提供则为一个空字典。
retries
- int:
当前任务已重试的次数。如果未指定则默认为 0。
eta
- string (ISO 8601):
预计到达时间,采用 ISO 8601 格式的日期和时间。 如果未提供,该消息不会被调度,而是尽快执行。
expires
- string (ISO 8601):
Added in version 2.0.2.
过期时间,采用 ISO 8601 格式的日期和时间。 如果未提供,该消息将永不过期。 当消息被接收时若已超出过期时间,则该消息将被视为已过期。
taskset
- string:
此任务所属的任务组(如果有)。
chord
- Signature:
Added in version 2.3.
表示该任务是 chord 的 header 部分之一。 该字段的值是 chord 的 body, 它将在所有 header 中的任务完成后被执行。
utc
- bool:
Added in version 2.5.
若为 true,则时间使用 UTC 时区;否则应使用当前本地时区。
callbacks
- <list>Signature:
Added in version 3.0.
当任务成功完成时将调用的签名列表。
errbacks
- <list>Signature:
Added in version 3.0.
当任务执行出错时将调用的签名列表。
timelimit
- <tuple>(float, float):
Added in version 3.1.
任务执行时间限制配置。此字段是一个包含软/硬超时的元组 (可为 int/float,或使用
None
表示不限制)。以下示例表示软时间限制为 3 秒,硬时间限制为 10 秒:
{'timelimit': (3.0, 10.0)}
task
- string:
Name of the task. required
id
- string:
Unique id of the task (UUID). required
args
- list:
List of arguments. Will be an empty list if not provided.
kwargs
- dictionary:
Dictionary of keyword arguments. Will be an empty dictionary if not provided.
retries
- int:
Current number of times this task has been retried. Defaults to 0 if not specified.
eta
- string (ISO 8601):
Estimated time of arrival. This is the date and time in ISO 8601 format. If not provided the message isn't scheduled, but will be executed asap.
expires
- string (ISO 8601):
Added in version 2.0.2.
Expiration date. This is the date and time in ISO 8601 format. If not provided the message will never expire. The message will be expired when the message is received and the expiration date has been exceeded.
taskset
- string:
The group this task is part of (if any).
chord
- Signature:
Added in version 2.3.
Signifies that this task is one of the header parts of a chord. The value of this key is the body of the cord that should be executed when all of the tasks in the header has returned.
utc
- bool:
Added in version 2.5.
If true time uses the UTC timezone, if not the current local timezone should be used.
callbacks
- <list>Signature:
Added in version 3.0.
A list of signatures to call if the task exited successfully.
errbacks
- <list>Signature:
Added in version 3.0.
A list of signatures to call if an error occurs while executing the task.
timelimit
- <tuple>(float, float):
Added in version 3.1.
Task execution time limit settings. This is a tuple of hard and soft time limit value (int/float or
None
for no limit).Example value specifying a soft time limit of 3 seconds, and a hard time limit of 10 seconds:
{'timelimit': (3.0, 10.0)}
示例消息¶
Example message
以下是以 JSON 格式调用 celery.task.ping 任务的示例:
{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
This is an example invocation of a celery.task.ping task in json format:
{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
"task": "celery.task.PingTask",
"args": [],
"kwargs": {},
"retries": 0,
"eta": "2009-11-17T12:30:56.527191"}
任务序列化¶
Task Serialization
使用 content_type 消息头支持多种类型的序列化格式。
默认支持的 MIME 类型如下表所示:
Scheme |
MIME 类型 |
---|---|
json |
application/json |
yaml |
application/x-yaml |
pickle |
application/x-python-serialize |
msgpack |
application/x-msgpack |
Several types of serialization formats are supported using the content_type message header.
The MIME-types supported by default are shown in the following table.
Scheme |
MIME Type |
---|---|
json |
application/json |
yaml |
application/x-yaml |
pickle |
application/x-python-serialize |
msgpack |
application/x-msgpack |
Event 消息¶
Event Messages
事件消息始终使用 JSON 进行序列化,并可包含任意的消息体字段。
自 4.0 版本起,消息体可以是单个映射(单个事件),也可以是映射列表(多个事件)。
此外,事件消息中还必须包含一些标准字段:
Event messages are always JSON serialized and can contain arbitrary message body fields.
Since version 4.0. the body can consist of either a single mapping (one event), or a list of mappings (multiple events).
There are also standard fields that must always be present in an event message:
标准正文字段¶
Standard body fields
- string
type
事件类型。该字段是一个字符串,包含 类别 与 动作, 两者之间以连字符分隔(例如:
task-succeeded
)。
- string
- string
hostname
发生事件的完整主机名。
- string
- unsigned long long
clock
此事件的逻辑时钟值(Lamport 时间戳)。
- unsigned long long
- float
timestamp
事件发生时对应的 UNIX 时间戳。
- float
- signed short
utcoffset
描述事件源主机的时区。该字段表示相对于 UTC 的时差小时数 (例如:-2 或 +1)。
- signed short
- unsigned long long
pid
事件来源进程的进程 ID。
- unsigned long long
- string
type
The type of event. This is a string containing the category and action separated by a dash delimiter (e.g.,
task-succeeded
).
- string
- string
hostname
The fully qualified hostname of where the event occurred at.
- string
- unsigned long long
clock
The logical clock value for this event (Lamport time-stamp).
- unsigned long long
- float
timestamp
The UNIX time-stamp corresponding to the time of when the event occurred.
- float
- signed short
utcoffset
This field describes the timezone of the originating host, and is specified as the number of hours ahead of/behind UTC (e.g., -2 or +1).
- signed short
- unsigned long long
pid
The process id of the process the event originated in.
- unsigned long long
标准事件类型¶
Standard event types
示例消息¶
Example message
以下是 task-succeeded
事件的消息字段示例:
properties = {
'routing_key': 'task.succeeded',
'exchange': 'celeryev',
'content_type': 'application/json',
'content_encoding': 'utf-8',
'delivery_mode': 1,
}
headers = {
'hostname': 'worker1@george.vandelay.com',
}
body = {
'type': 'task-succeeded',
'hostname': 'worker1@george.vandelay.com',
'pid': 6335,
'clock': 393912923921,
'timestamp': 1401717709.101747,
'utcoffset': -1,
'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
'retval': '4',
'runtime': 0.0003212,
}
This is the message fields for a task-succeeded
event:
properties = {
'routing_key': 'task.succeeded',
'exchange': 'celeryev',
'content_type': 'application/json',
'content_encoding': 'utf-8',
'delivery_mode': 1,
}
headers = {
'hostname': 'worker1@george.vandelay.com',
}
body = {
'type': 'task-succeeded',
'hostname': 'worker1@george.vandelay.com',
'pid': 6335,
'clock': 393912923921,
'timestamp': 1401717709.101747,
'utcoffset': -1,
'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
'retval': '4',
'runtime': 0.0003212,
)