消息协议

Message Protocol

Task 消息

Task messages

版本 2

Version 2

定义

Definition

properties = {
    'correlation_id': uuid task_id,
    'content_type': string mimetype,
    'content_encoding': string encoding,

    # optional
    'reply_to': string queue_or_url,
}
headers = {
    'lang': string 'py'
    'task': string task,
    'id': uuid task_id,
    'root_id': uuid root_id,
    'parent_id': uuid parent_id,
    'group': uuid group_id,

    # optional
    'meth': string method_name,
    'shadow': string alias_name,
    'eta': iso8601 ETA,
    'expires': iso8601 expires,
    'retries': int retries,
    'timelimit': (soft, hard),
    'argsrepr': str repr(args),
    'kwargsrepr': str repr(kwargs),
    'origin': str nodename,
    'replaced_task_nesting': int
}

body = (
    object[] args,
    Mapping kwargs,
    Mapping embed {
        'callbacks': Signature[] callbacks,
        'errbacks': Signature[] errbacks,
        'chain': Signature[] chain,
        'chord': Signature chord_callback,
    }
)

示例

Example

此示例使用协议版本 2 发送任务消息:

This example sends a task message using version 2 of the protocol:

# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8

import json
import os
import socket

task_id = uuid()
args = (2, 2)
kwargs = {}
basic_publish(
    message=json.dumps((args, kwargs, None)),
    application_headers={
        'lang': 'py',
        'task': 'proj.tasks.add',
        'argsrepr': repr(args),
        'kwargsrepr': repr(kwargs),
        'origin': '@'.join([os.getpid(), socket.gethostname()])
    }
    properties={
        'correlation_id': task_id,
        'content_type': 'application/json',
        'content_encoding': 'utf-8',
    }
)

与版本 1 的变更

Changes from version 1

  • 通过 task 消息头检测协议版本。

  • 通过 lang 消息头支持多语言。 Worker 可以将消息重定向给支持该语言的其他 worker。

  • 元数据迁移至消息头(headers)。 这意味着 worker 或中间件可以在不解码消息主体(payload)的情况下 基于消息头内容做出决策,例如 payload 使用了 Python 特有的 pickle 序列化方式。

  • 始终使用 UTC 时间 不再使用 utc 标志,因此任何缺失时区信息的时间值将被视为 UTC 时间。

  • 消息体(body)仅用于语言相关的数据。 - Python 将 args/kwargs 和嵌入的签名存储于消息体中。 - 如果消息使用原始编码(raw encoding),则原始数据会作为单一参数传递给函数。 - Java/C 等语言可以使用 Thrift/protobuf 文档作为消息体内容。

  • origin 表示发送任务的节点名称。

  • 基于 taskmeth 消息头分发给对应 actor。 meth 在 Python 中未使用,但将来可能用于指定类+方法对。

  • Chain(链式调用)获得了专用字段。 之前将 chain 递归地嵌套在 callbacks 参数中, 会在递归层级过深时导致问题。

    新消息协议通过指定一个签名列表来解决此问题, 每个任务在发送下一个消息时从列表中弹出一个签名:

    execute_task(message)
    chain = embed['chain']
    if chain:
        sig = maybe_signature(chain.pop())
        sig.apply_async(chain=chain)
    
  • correlation_id 替代了原先的 task_id 字段。

  • root_idparent_id 字段用于追踪完整的工作流链路。

  • shadow 允许为日志指定不同的任务名称, 监控工具可用于表示某些以函数作为参数调用的任务概念:

    from celery.utils.imports import qualname
    
    class PickleTask(Task):
    
        def unpack_args(self, fun, args=()):
            return fun, args
    
        def apply_async(self, args, kwargs, **options):
            fun, real_args = self.unpack_args(*args)
            return super().apply_async(
                (fun, real_args, kwargs), shadow=qualname(fun), **options
            )
    
    @app.task(base=PickleTask)
    def call(fun, args, kwargs):
        return fun(*args, **kwargs)
    
  • Protocol version detected by the presence of a task message header.

  • Support for multiple languages via the lang header.

    Worker may redirect the message to a worker that supports the language.

  • Meta-data moved to headers.

    This means that workers/intermediates can inspect the message and make decisions based on the headers without decoding the payload (that may be language specific, for example serialized by the Python specific pickle serializer).

  • Always UTC

    There's no utc flag anymore, so any time information missing timezone will be expected to be in UTC time.

  • Body is only for language specific data.
    • Python stores args/kwargs and embedded signatures in body.

    • If a message uses raw encoding then the raw data

    will be passed as a single argument to the function.

    • Java/C, etc. can use a Thrift/protobuf document as the body

  • origin is the name of the node sending the task.

  • Dispatches to actor based on task, meth headers

    meth is unused by Python, but may be used in the future to specify class+method pairs.

  • Chain gains a dedicated field.

    Reducing the chain into a recursive callbacks argument causes problems when the recursion limit is exceeded.

    This is fixed in the new message protocol by specifying a list of signatures, each task will then pop a task off the list when sending the next message:

    execute_task(message)
    chain = embed['chain']
    if chain:
        sig = maybe_signature(chain.pop())
        sig.apply_async(chain=chain)
    
  • correlation_id replaces task_id field.

  • root_id and parent_id fields helps keep track of work-flows.

  • shadow lets you specify a different name for logs, monitors can be used for concepts like tasks that calls a function specified as argument:
    from celery.utils.imports import qualname
    
    class PickleTask(Task):
    
        def unpack_args(self, fun, args=()):
            return fun, args
    
        def apply_async(self, args, kwargs, **options):
            fun, real_args = self.unpack_args(*args)
            return super().apply_async(
                (fun, real_args, kwargs), shadow=qualname(fun), **options
            )
    
    @app.task(base=PickleTask)
    def call(fun, args, kwargs):
        return fun(*args, **kwargs)
    

版本 1

Version 1

在协议的第 1 版中,所有字段都存储在消息体中: 这意味着 worker 和中间消费者必须反序列化 payload 才能读取字段内容。

In version 1 of the protocol all fields are stored in the message body: meaning workers and intermediate consumers must deserialize the payload to read the fields.

消息正文

Message body

  • task
    string:

    任务名称。必需

  • id
    string:

    任务的唯一标识(UUID)。必需

  • args
    list:

    参数列表。如果未提供则为一个空列表。

  • kwargs
    dictionary:

    关键字参数字典。如果未提供则为一个空字典。

  • retries
    int:

    当前任务已重试的次数。如果未指定则默认为 0

  • eta
    string (ISO 8601):

    预计到达时间,采用 ISO 8601 格式的日期和时间。 如果未提供,该消息不会被调度,而是尽快执行。

  • expires
    string (ISO 8601):

    Added in version 2.0.2.

    过期时间,采用 ISO 8601 格式的日期和时间。 如果未提供,该消息将永不过期。 当消息被接收时若已超出过期时间,则该消息将被视为已过期。

  • taskset
    string:

    此任务所属的任务组(如果有)。

  • chord
    Signature:

    Added in version 2.3.

    表示该任务是 chord 的 header 部分之一。 该字段的值是 chord 的 body, 它将在所有 header 中的任务完成后被执行。

  • utc
    bool:

    Added in version 2.5.

    若为 true,则时间使用 UTC 时区;否则应使用当前本地时区。

  • callbacks
    <list>Signature:

    Added in version 3.0.

    当任务成功完成时将调用的签名列表。

  • errbacks
    <list>Signature:

    Added in version 3.0.

    当任务执行出错时将调用的签名列表。

  • timelimit
    <tuple>(float, float):

    Added in version 3.1.

    任务执行时间限制配置。此字段是一个包含软/硬超时的元组 (可为 int/float,或使用 None 表示不限制)。

    以下示例表示软时间限制为 3 秒,硬时间限制为 10 秒:

    {'timelimit': (3.0, 10.0)}
    
  • task
    string:

    Name of the task. required

  • id
    string:

    Unique id of the task (UUID). required

  • args
    list:

    List of arguments. Will be an empty list if not provided.

  • kwargs
    dictionary:

    Dictionary of keyword arguments. Will be an empty dictionary if not provided.

  • retries
    int:

    Current number of times this task has been retried. Defaults to 0 if not specified.

  • eta
    string (ISO 8601):

    Estimated time of arrival. This is the date and time in ISO 8601 format. If not provided the message isn't scheduled, but will be executed asap.

  • expires
    string (ISO 8601):

    Added in version 2.0.2.

    Expiration date. This is the date and time in ISO 8601 format. If not provided the message will never expire. The message will be expired when the message is received and the expiration date has been exceeded.

  • taskset
    string:

    The group this task is part of (if any).

  • chord
    Signature:

    Added in version 2.3.

    Signifies that this task is one of the header parts of a chord. The value of this key is the body of the cord that should be executed when all of the tasks in the header has returned.

  • utc
    bool:

    Added in version 2.5.

    If true time uses the UTC timezone, if not the current local timezone should be used.

  • callbacks
    <list>Signature:

    Added in version 3.0.

    A list of signatures to call if the task exited successfully.

  • errbacks
    <list>Signature:

    Added in version 3.0.

    A list of signatures to call if an error occurs while executing the task.

  • timelimit
    <tuple>(float, float):

    Added in version 3.1.

    Task execution time limit settings. This is a tuple of hard and soft time limit value (int/float or None for no limit).

    Example value specifying a soft time limit of 3 seconds, and a hard time limit of 10 seconds:

    {'timelimit': (3.0, 10.0)}
    

示例消息

Example message

以下是以 JSON 格式调用 celery.task.ping 任务的示例:

{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
 "task": "celery.task.PingTask",
 "args": [],
 "kwargs": {},
 "retries": 0,
 "eta": "2009-11-17T12:30:56.527191"}

This is an example invocation of a celery.task.ping task in json format:

{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
 "task": "celery.task.PingTask",
 "args": [],
 "kwargs": {},
 "retries": 0,
 "eta": "2009-11-17T12:30:56.527191"}

任务序列化

Task Serialization

使用 content_type 消息头支持多种类型的序列化格式。

默认支持的 MIME 类型如下表所示:

Scheme

MIME 类型

json

application/json

yaml

application/x-yaml

pickle

application/x-python-serialize

msgpack

application/x-msgpack

Several types of serialization formats are supported using the content_type message header.

The MIME-types supported by default are shown in the following table.

Scheme

MIME Type

json

application/json

yaml

application/x-yaml

pickle

application/x-python-serialize

msgpack

application/x-msgpack

Event 消息

Event Messages

事件消息始终使用 JSON 进行序列化,并可包含任意的消息体字段。

自 4.0 版本起,消息体可以是单个映射(单个事件),也可以是映射列表(多个事件)。

此外,事件消息中还必须包含一些标准字段:

Event messages are always JSON serialized and can contain arbitrary message body fields.

Since version 4.0. the body can consist of either a single mapping (one event), or a list of mappings (multiple events).

There are also standard fields that must always be present in an event message:

标准正文字段

Standard body fields

  • string type

    事件类型。该字段是一个字符串,包含 类别动作, 两者之间以连字符分隔(例如:task-succeeded)。

  • string hostname

    发生事件的完整主机名。

  • unsigned long long clock

    此事件的逻辑时钟值(Lamport 时间戳)。

  • float timestamp

    事件发生时对应的 UNIX 时间戳。

  • signed short utcoffset

    描述事件源主机的时区。该字段表示相对于 UTC 的时差小时数 (例如:-2 或 +1)。

  • unsigned long long pid

    事件来源进程的进程 ID。

  • string type

    The type of event. This is a string containing the category and action separated by a dash delimiter (e.g., task-succeeded).

  • string hostname

    The fully qualified hostname of where the event occurred at.

  • unsigned long long clock

    The logical clock value for this event (Lamport time-stamp).

  • float timestamp

    The UNIX time-stamp corresponding to the time of when the event occurred.

  • signed short utcoffset

    This field describes the timezone of the originating host, and is specified as the number of hours ahead of/behind UTC (e.g., -2 or +1).

  • unsigned long long pid

    The process id of the process the event originated in.

标准事件类型

Standard event types

有关标准事件类型及其字段的完整列表,请参阅 事件参考

For a list of standard event types and their fields see the 事件参考.

示例消息

Example message

以下是 task-succeeded 事件的消息字段示例:

properties = {
    'routing_key': 'task.succeeded',
    'exchange': 'celeryev',
    'content_type': 'application/json',
    'content_encoding': 'utf-8',
    'delivery_mode': 1,
}
headers = {
    'hostname': 'worker1@george.vandelay.com',
}
body = {
    'type': 'task-succeeded',
    'hostname': 'worker1@george.vandelay.com',
    'pid': 6335,
    'clock': 393912923921,
    'timestamp': 1401717709.101747,
    'utcoffset': -1,
    'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
    'retval': '4',
    'runtime': 0.0003212,
}

This is the message fields for a task-succeeded event:

properties = {
    'routing_key': 'task.succeeded',
    'exchange': 'celeryev',
    'content_type': 'application/json',
    'content_encoding': 'utf-8',
    'delivery_mode': 1,
}
headers = {
    'hostname': 'worker1@george.vandelay.com',
}
body = {
    'type': 'task-succeeded',
    'hostname': 'worker1@george.vandelay.com',
    'pid': 6335,
    'clock': 393912923921,
    'timestamp': 1401717709.101747,
    'utcoffset': -1,
    'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
    'retval': '4',
    'runtime': 0.0003212,
)