"""Registry of available tasks."""importinspectfromimportlibimportimport_modulefromcelery._stateimportget_current_appfromcelery.app.autoretryimportadd_autoretry_behaviourfromcelery.exceptionsimportInvalidTaskError,NotRegistered__all__=('TaskRegistry',)
[文档]classTaskRegistry(dict):"""Map of registered tasks."""NotRegistered=NotRegistereddef__missing__(self,key):raiseself.NotRegistered(key)
[文档]defregister(self,task):"""Register a task in the task registry. The task will be automatically instantiated if not already an instance. Name must be configured prior to registration. """iftask.nameisNone:raiseInvalidTaskError('Task class {!r} must specify .name attribute'.format(type(task).__name__))task=inspect.isclass(task)andtask()ortaskadd_autoretry_behaviour(task)self[task.name]=task
[文档]defunregister(self,name):"""Unregister task by name. Arguments: name (str): name of the task to unregister, or a :class:`celery.app.task.Task` with a valid `name` attribute. Raises: celery.exceptions.NotRegistered: if the task is not registered. """try:self.pop(getattr(name,'name',name))exceptKeyError:raiseself.NotRegistered(name)
# -- these methods are irrelevant now and will be removed in 4.0