"""Actual App instance implementation."""importfunctoolsimportimportlibimportinspectimportosimportsysimportthreadingimporttypingimportwarningsfromcollectionsimportUserDict,defaultdict,dequefromdatetimeimportdatetimefromdatetimeimporttimezoneasdatetime_timezonefromoperatorimportattrgetterfromclick.exceptionsimportExitfromdateutil.parserimportisoparsefromkombuimportExchange,poolsfromkombu.clocksimportLamportClockfromkombu.commonimportoid_fromfromkombu.transport.native_delayed_deliveryimportcalculate_routing_keyfromkombu.utils.compatimportregister_after_forkfromkombu.utils.objectsimportcached_propertyfromkombu.utils.uuidimportuuidfromvineimportstarpromisefromceleryimportplatforms,signalsfromcelery._stateimport(_announce_app_finalized,_deregister_app,_register_app,_set_current_app,_task_stack,connect_on_app_finalize,get_current_app,get_current_worker_task,set_default_app)fromcelery.exceptionsimportAlwaysEagerIgnored,ImproperlyConfiguredfromcelery.loadersimportget_loader_clsfromcelery.localimportPromiseProxy,maybe_evaluatefromcelery.utilsimportabstractfromcelery.utils.collectionsimportAttributeDictMixinfromcelery.utils.dispatchimportSignalfromcelery.utils.functionalimportfirst,head_from_fun,maybe_listfromcelery.utils.importsimportgen_task_name,instantiate,symbol_by_namefromcelery.utils.logimportget_loggerfromcelery.utils.objectsimportFallbackContext,mro_lookupfromcelery.utils.timeimportmaybe_make_aware,timezone,to_utcfrom..utils.annotationsimportannotation_is_class,annotation_issubclass,get_optional_argfrom..utils.quorum_queuesimportdetect_quorum_queues# Load all builtin tasksfrom.importbackends,builtins# noqafrom.annotationsimportprepareasprepare_annotationsfrom.autoretryimportadd_autoretry_behaviourfrom.defaultsimportDEFAULT_SECURITY_DIGEST,find_deprecated_settingsfrom.registryimportTaskRegistryfrom.utilsimport(AppPickler,Settings,_new_key_to_old,_old_key_to_new,_unpickle_app,_unpickle_app_v2,appstr,bugreport,detect_settings)iftyping.TYPE_CHECKING:# pragma: no cover # codecov does not capture this# flake8 marks the BaseModel import as unused, because the actual typehint is quoted.frompydanticimportBaseModel# noqa: F401__all__=('Celery',)logger=get_logger(__name__)BUILTIN_FIXUPS={'celery.fixups.django:fixup',}USING_EXECV=os.environ.get('FORKED_BY_MULTIPROCESSING')ERR_ENVVAR_NOT_SET="""The environment variable {0!r} is not set,and as such the configuration could not be loaded.Please set this variable and make sure it points toa valid configuration module.Example:{0}="proj.celeryconfig""""defapp_has_custom(app,attr):"""Return true if app has customized method `attr`. Note: This is used for optimizations in cases where we know how the default behavior works, but need to account for someone using inheritance to override a method/property. """returnmro_lookup(app.__class__,attr,stop={Celery,object},monkey_patched=[__name__])def_unpickle_appattr(reverse_name,args):"""Unpickle app."""# Given an attribute name and a list of args, gets# the attribute from the current app and calls it.returnget_current_app()._rgetattr(reverse_name)(*args)def_after_fork_cleanup_app(app):# This is used with multiprocessing.register_after_fork,# so need to be at module level.try:app._after_fork()exceptExceptionasexc:# pylint: disable=broad-exceptlogger.info('after forker raised exception: %r',exc,exc_info=1)defpydantic_wrapper(app:"Celery",task_fun:typing.Callable[...,typing.Any],task_name:str,strict:bool=True,context:typing.Optional[typing.Dict[str,typing.Any]]=None,dump_kwargs:typing.Optional[typing.Dict[str,typing.Any]]=None):"""Wrapper to validate arguments and serialize return values using Pydantic."""try:pydantic=importlib.import_module('pydantic')exceptModuleNotFoundErrorasex:raiseImproperlyConfigured('You need to install pydantic to use pydantic model serialization.')fromexBaseModel:typing.Type['BaseModel']=pydantic.BaseModel# noqa: F811 # only defined when type checkingifcontextisNone:context={}ifdump_kwargsisNone:dump_kwargs={}dump_kwargs.setdefault('mode','json')task_signature=inspect.signature(task_fun)@functools.wraps(task_fun)defwrapper(*task_args,**task_kwargs):# Validate task parameters if type hinted as BaseModelbound_args=task_signature.bind(*task_args,**task_kwargs)forarg_name,arg_valueinbound_args.arguments.items():arg_annotation=task_signature.parameters[arg_name].annotationoptional_arg=get_optional_arg(arg_annotation)ifoptional_argisnotNoneandarg_valueisnotNone:arg_annotation=optional_argifannotation_issubclass(arg_annotation,BaseModel):bound_args.arguments[arg_name]=arg_annotation.model_validate(arg_value,strict=strict,context={**context,'celery_app':app,'celery_task_name':task_name},)# Call the task with (potentially) converted argumentsreturned_value=task_fun(*bound_args.args,**bound_args.kwargs)# Dump Pydantic model if the returned value is an instance of pydantic.BaseModel *and* its# class matches the typehintreturn_annotation=task_signature.return_annotationoptional_return_annotation=get_optional_arg(return_annotation)ifoptional_return_annotationisnotNone:return_annotation=optional_return_annotationif(annotation_is_class(return_annotation)andisinstance(returned_value,BaseModel)andisinstance(returned_value,return_annotation)):returnreturned_value.model_dump(**dump_kwargs)returnreturned_valuereturnwrapperclassPendingConfiguration(UserDict,AttributeDictMixin):# `app.conf` will be of this type before being explicitly configured,# meaning the app can keep any configuration set directly# on `app.conf` before the `app.config_from_object` call.## accessing any key will finalize the configuration,# replacing `app.conf` with a concrete settings object.callback=None_data=Nonedef__init__(self,conf,callback):object.__setattr__(self,'_data',conf)object.__setattr__(self,'callback',callback)def__setitem__(self,key,value):self._data[key]=valuedefclear(self):self._data.clear()defupdate(self,*args,**kwargs):self._data.update(*args,**kwargs)defsetdefault(self,*args,**kwargs):returnself._data.setdefault(*args,**kwargs)def__contains__(self,key):# XXX will not show finalized configuration# setdefault will cause `key in d` to happen,# so for setdefault to be lazy, so does contains.returnkeyinself._datadef__len__(self):returnlen(self.data)def__repr__(self):returnrepr(self.data)@cached_propertydefdata(self):returnself.callback()
[文档]classCelery:"""Celery application. Arguments: main (str): Name of the main module if running as `__main__`. This is used as the prefix for auto-generated task names. Keyword Arguments: broker (str): URL of the default broker used. backend (Union[str, Type[celery.backends.base.Backend]]): The result store backend class, or the name of the backend class to use. Default is the value of the :setting:`result_backend` setting. autofinalize (bool): If set to False a :exc:`RuntimeError` will be raised if the task registry or tasks are used before the app is finalized. set_as_current (bool): Make this the global current app. include (List[str]): List of modules every worker should import. amqp (Union[str, Type[AMQP]]): AMQP object or class name. events (Union[str, Type[celery.app.events.Events]]): Events object or class name. log (Union[str, Type[Logging]]): Log object or class name. control (Union[str, Type[celery.app.control.Control]]): Control object or class name. tasks (Union[str, Type[TaskRegistry]]): A task registry, or the name of a registry class. fixups (List[str]): List of fix-up plug-ins (e.g., see :mod:`celery.fixups.django`). config_source (Union[str, class]): Take configuration from a class, or object. Attributes may include any settings described in the documentation. task_cls (Union[str, Type[celery.app.task.Task]]): base task class to use. See :ref:`this section <custom-task-cls-app-wide>` for usage. """#: This is deprecated, use :meth:`reduce_keys` insteadPickler=AppPicklerSYSTEM=platforms.SYSTEMIS_macOS,IS_WINDOWS=platforms.IS_macOS,platforms.IS_WINDOWS#: Name of the `__main__` module. Required for standalone scripts.#:#: If set this will be used instead of `__main__` when automatically#: generating task names.main=None#: Custom options for command-line programs.#: See :ref:`extending-commandoptions`user_options=None#: Custom bootsteps to extend and modify the worker.#: See :ref:`extending-bootsteps`.steps=Nonebuiltin_fixups=BUILTIN_FIXUPSamqp_cls='celery.app.amqp:AMQP'backend_cls=Noneevents_cls='celery.app.events:Events'loader_cls=Nonelog_cls='celery.app.log:Logging'control_cls='celery.app.control:Control'task_cls='celery.app.task:Task'registry_cls='celery.app.registry:TaskRegistry'#: Thread local storage._local=None_fixups=None_pool=None_conf=None_after_fork_registered=False#: Signal sent when app is loading configuration.on_configure=None#: Signal sent after app has prepared the configuration.on_after_configure=None#: Signal sent after app has been finalized.on_after_finalize=None#: Signal sent by every new process after fork.on_after_fork=Nonedef__init__(self,main=None,loader=None,backend=None,amqp=None,events=None,log=None,control=None,set_as_current=True,tasks=None,broker=None,include=None,changes=None,config_source=None,fixups=None,task_cls=None,autofinalize=True,namespace=None,strict_typing=True,**kwargs):self._local=threading.local()self._backend_cache=Noneself.clock=LamportClock()self.main=mainself.amqp_cls=amqporself.amqp_clsself.events_cls=eventsorself.events_clsself.loader_cls=loaderorself._get_default_loader()self.log_cls=logorself.log_clsself.control_cls=controlorself.control_clsself._custom_task_cls_used=(# Custom task class provided as argumentbool(task_cls)# subclass of Celery with a task_cls attributeorself.__class__isnotCeleryandhasattr(self.__class__,'task_cls'))self.task_cls=task_clsorself.task_clsself.set_as_current=set_as_currentself.registry_cls=symbol_by_name(self.registry_cls)self.user_options=defaultdict(set)self.steps=defaultdict(set)self.autofinalize=autofinalizeself.namespace=namespaceself.strict_typing=strict_typingself.configured=Falseself._config_source=config_sourceself._pending_defaults=deque()self._pending_periodic_tasks=deque()self.finalized=Falseself._finalize_mutex=threading.RLock()self._pending=deque()self._tasks=tasksifnotisinstance(self._tasks,TaskRegistry):self._tasks=self.registry_cls(self._tasksor{})# If the class defines a custom __reduce_args__ we need to use# the old way of pickling apps: pickling a list of# args instead of the new way that pickles a dict of keywords.self._using_v1_reduce=app_has_custom(self,'__reduce_args__')# these options are moved to the config to# simplify pickling of the app object.self._preconf=changesor{}self._preconf_set_by_auto=set()self.__autoset('broker_url',broker)self.__autoset('result_backend',backend)self.__autoset('include',include)forkey,valueinkwargs.items():self.__autoset(key,value)self._conf=Settings(PendingConfiguration(self._preconf,self._finalize_pending_conf),prefix=self.namespace,keys=(_old_key_to_new,_new_key_to_old),)# - Apply fix-ups.self.fixups=set(self.builtin_fixups)iffixupsisNoneelsefixups# ...store fixup instances in _fixups to keep weakrefs alive.self._fixups=[symbol_by_name(fixup)(self)forfixupinself.fixups]ifself.set_as_current:self.set_current()# Signalsifself.on_configureisNone:# used to be a method pre 4.0self.on_configure=Signal(name='app.on_configure')self.on_after_configure=Signal(name='app.on_after_configure',providing_args={'source'},)self.on_after_finalize=Signal(name='app.on_after_finalize')self.on_after_fork=Signal(name='app.on_after_fork')# Boolean signalling, whether fast_trace_task are enabled.# this attribute is set in celery.worker.trace and checked by celery.worker.requestself.use_fast_trace_task=Falseself.on_init()_register_app(self)def_get_default_loader(self):# the --loader command-line argument sets the environment variable.return(os.environ.get('CELERY_LOADER')orself.loader_clsor'celery.loaders.app:AppLoader')
[文档]defon_init(self):"""Optional callback called at init."""
[文档]defclose(self):"""Clean up after the application. Only necessary for dynamically created apps, and you should probably use the :keyword:`with` statement instead. Example: >>> with Celery(set_as_current=False) as app: ... with app.connection_for_write() as conn: ... pass """self._pool=None_deregister_app(self)
defstart(self,argv=None):"""Run :program:`celery` using `argv`. Uses :data:`sys.argv` if `argv` is not specified. """fromcelery.bin.celeryimportcelerycelery.params[0].default=selfifargvisNone:argv=sys.argvtry:celery.main(args=argv,standalone_mode=False)exceptExitase:returne.exit_codefinally:celery.params[0].default=Nonedefworker_main(self,argv=None):"""Run :program:`celery worker` using `argv`. Uses :data:`sys.argv` if `argv` is not specified. """ifargvisNone:argv=sys.argvif'worker'notinargv:raiseValueError("The worker sub-command must be specified in argv.\n""Use app.start() to programmatically start other commands.")self.start(argv=argv)
[文档]deftask(self,*args,**opts):"""Decorator to create a task class out of any callable. See :ref:`Task options<task-options>` for a list of the arguments that can be passed to this decorator. Examples: .. code-block:: python @app.task def refresh_feed(url): store_feed(feedparser.parse(url)) with setting extra options: .. code-block:: python @app.task(exchange='feeds') def refresh_feed(url): return store_feed(feedparser.parse(url)) Note: App Binding: For custom apps the task decorator will return a proxy object, so that the act of creating the task is not performed until the task is used or the task registry is accessed. If you're depending on binding to be deferred, then you must not access any attributes on the returned object until the application is fully set up (finalized). """ifUSING_EXECVandopts.get('lazy',True):# When using execv the task in the original module will point to a# different app, so doing things like 'add.request' will point to# a different task instance. This makes sure it will always use# the task instance from the current app.# Really need a better solution for this :(from.importshared_taskreturnshared_task(*args,lazy=False,**opts)definner_create_task_cls(shared=True,filter=None,lazy=True,**opts):_filt=filterdef_create_task_cls(fun):ifshared:defcons(app):returnapp._task_from_fun(fun,**opts)cons.__name__=fun.__name__connect_on_app_finalize(cons)ifnotlazyorself.finalized:ret=self._task_from_fun(fun,**opts)else:# return a proxy object that evaluates on first useret=PromiseProxy(self._task_from_fun,(fun,),opts,__doc__=fun.__doc__)self._pending.append(ret)if_filt:return_filt(ret)returnretreturn_create_task_clsiflen(args)==1:ifcallable(args[0]):returninner_create_task_cls(**opts)(*args)raiseTypeError('argument 1 to @task() must be a callable')ifargs:raiseTypeError('@task() takes exactly 1 argument ({} given)'.format(sum([len(args),len(opts)])))returninner_create_task_cls(**opts)
deftype_checker(self,fun,bound=False):returnstaticmethod(head_from_fun(fun,bound=bound))def_task_from_fun(self,fun,name=None,base=None,bind=False,pydantic:bool=False,pydantic_strict:bool=False,pydantic_context:typing.Optional[typing.Dict[str,typing.Any]]=None,pydantic_dump_kwargs:typing.Optional[typing.Dict[str,typing.Any]]=None,**options,):ifnotself.finalizedandnotself.autofinalize:raiseRuntimeError('Contract breach: app not finalized')name=nameorself.gen_task_name(fun.__name__,fun.__module__)base=baseorself.Taskifnamenotinself._tasks:ifpydanticisTrue:fun=pydantic_wrapper(self,fun,name,pydantic_strict,pydantic_context,pydantic_dump_kwargs)run=funifbindelsestaticmethod(fun)task=type(fun.__name__,(base,),dict({'app':self,'name':name,'run':run,'_decorated':True,'__doc__':fun.__doc__,'__module__':fun.__module__,'__annotations__':fun.__annotations__,'__header__':self.type_checker(fun,bound=bind),'__wrapped__':run},**options))()# for some reason __qualname__ cannot be set in type()# so we have to set it here.try:task.__qualname__=fun.__qualname__exceptAttributeError:passself._tasks[task.name]=tasktask.bind(self)# connects task to this appadd_autoretry_behaviour(task,**options)else:task=self._tasks[name]returntaskdefregister_task(self,task,**options):"""Utility for registering a task-based class. Note: This is here for compatibility with old Celery 1.0 style task classes, you should not need to use this for new projects. """task=inspect.isclass(task)andtask()ortaskifnottask.name:task_cls=type(task)task.name=self.gen_task_name(task_cls.__name__,task_cls.__module__)add_autoretry_behaviour(task,**options)self.tasks[task.name]=tasktask._app=selftask.bind(self)returntask
[文档]deffinalize(self,auto=False):"""Finalize the app. This loads built-in tasks, evaluates pending task decorators, reads configuration, etc. """withself._finalize_mutex:ifnotself.finalized:ifautoandnotself.autofinalize:raiseRuntimeError('Contract breach: app not finalized')self.finalized=True_announce_app_finalized(self)pending=self._pendingwhilepending:maybe_evaluate(pending.popleft())fortaskinself._tasks.values():task.bind(self)self.on_after_finalize.send(sender=self)
[文档]defadd_defaults(self,fun):"""Add default configuration from dict ``d``. If the argument is a callable function then it will be regarded as a promise, and it won't be loaded until the configuration is actually needed. This method can be compared to: .. code-block:: pycon >>> celery.conf.update(d) with a difference that 1) no copy will be made and 2) the dict will not be transferred when the worker spawns child processes, so it's important that the same configuration happens at import time when pickle restores the object on the other side. """ifnotcallable(fun):d,fun=fun,lambda:difself.configured:returnself._conf.add_defaults(fun())self._pending_defaults.append(fun)
[文档]defconfig_from_object(self,obj,silent=False,force=False,namespace=None):"""Read configuration from object. Object is either an actual object or the name of a module to import. Example: >>> celery.config_from_object('myapp.celeryconfig') >>> from myapp import celeryconfig >>> celery.config_from_object(celeryconfig) Arguments: silent (bool): If true then import errors will be ignored. force (bool): Force reading configuration immediately. By default the configuration will be read only when required. """self._config_source=objself.namespace=namespaceorself.namespaceifforceorself.configured:self._conf=Noneifself.loader.config_from_object(obj,silent=silent):returnself.conf
[文档]defconfig_from_envvar(self,variable_name,silent=False,force=False):"""Read configuration from environment variable. The value of the environment variable must be the name of a module to import. Example: >>> os.environ['CELERY_CONFIG_MODULE'] = 'myapp.celeryconfig' >>> celery.config_from_envvar('CELERY_CONFIG_MODULE') """module_name=os.environ.get(variable_name)ifnotmodule_name:ifsilent:returnFalseraiseImproperlyConfigured(ERR_ENVVAR_NOT_SET.strip().format(variable_name))returnself.config_from_object(module_name,silent=silent,force=force)
[文档]defsetup_security(self,allowed_serializers=None,key=None,key_password=None,cert=None,store=None,digest=DEFAULT_SECURITY_DIGEST,serializer='json'):"""Setup the message-signing serializer. This will affect all application instances (a global operation). Disables untrusted serializers and if configured to use the ``auth`` serializer will register the ``auth`` serializer with the provided settings into the Kombu serializer registry. Arguments: allowed_serializers (Set[str]): List of serializer names, or content_types that should be exempt from being disabled. key (str): Name of private key file to use. Defaults to the :setting:`security_key` setting. key_password (bytes): Password to decrypt the private key. Defaults to the :setting:`security_key_password` setting. cert (str): Name of certificate file to use. Defaults to the :setting:`security_certificate` setting. store (str): Directory containing certificates. Defaults to the :setting:`security_cert_store` setting. digest (str): Digest algorithm used when signing messages. Default is ``sha256``. serializer (str): Serializer used to encode messages after they've been signed. See :setting:`task_serializer` for the serializers supported. Default is ``json``. """fromcelery.securityimportsetup_securityreturnsetup_security(allowed_serializers,key,key_password,cert,store,digest,serializer,app=self)
[文档]defautodiscover_tasks(self,packages=None,related_name='tasks',force=False):"""Auto-discover task modules. Searches a list of packages for a "tasks.py" module (or use related_name argument). If the name is empty, this will be delegated to fix-ups (e.g., Django). For example if you have a directory layout like this: .. code-block:: text foo/__init__.py tasks.py models.py bar/__init__.py tasks.py models.py baz/__init__.py models.py Then calling ``app.autodiscover_tasks(['foo', 'bar', 'baz'])`` will result in the modules ``foo.tasks`` and ``bar.tasks`` being imported. Arguments: packages (List[str]): List of packages to search. This argument may also be a callable, in which case the value returned is used (for lazy evaluation). related_name (Optional[str]): The name of the module to find. Defaults to "tasks": meaning "look for 'module.tasks' for every module in ``packages``.". If ``None`` will only try to import the package, i.e. "look for 'module'". force (bool): By default this call is lazy so that the actual auto-discovery won't happen until an application imports the default modules. Forcing will cause the auto-discovery to happen immediately. """ifforce:returnself._autodiscover_tasks(packages,related_name)signals.import_modules.connect(starpromise(self._autodiscover_tasks,packages,related_name,),weak=False,sender=self)
def_autodiscover_tasks(self,packages,related_name,**kwargs):ifpackages:returnself._autodiscover_tasks_from_names(packages,related_name)returnself._autodiscover_tasks_from_fixups(related_name)def_autodiscover_tasks_from_names(self,packages,related_name):# packages argument can be lazyreturnself.loader.autodiscover_tasks(packages()ifcallable(packages)elsepackages,related_name,)def_autodiscover_tasks_from_fixups(self,related_name):returnself._autodiscover_tasks_from_names([pkgforfixupinself._fixupsifhasattr(fixup,'autodiscover_tasks')forpkginfixup.autodiscover_tasks()],related_name=related_name)
[文档]defsend_task(self,name,args=None,kwargs=None,countdown=None,eta=None,task_id=None,producer=None,connection=None,router=None,result_cls=None,expires=None,publisher=None,link=None,link_error=None,add_to_parent=True,group_id=None,group_index=None,retries=0,chord=None,reply_to=None,time_limit=None,soft_time_limit=None,root_id=None,parent_id=None,route_name=None,shadow=None,chain=None,task_type=None,replaced_task_nesting=0,**options):"""Send task by name. Supports the same arguments as :meth:`@-Task.apply_async`. Arguments: name (str): Name of task to call (e.g., `"tasks.add"`). result_cls (AsyncResult): Specify custom result class. """parent=have_parent=Noneamqp=self.amqptask_id=task_idoruuid()producer=producerorpublisher# XXX compatrouter=routeroramqp.routerconf=self.confifconf.task_always_eager:# pragma: no coverwarnings.warn(AlwaysEagerIgnored('task_always_eager has no effect on send_task',),stacklevel=2)ignore_result=options.pop('ignore_result',False)options=router.route(options,route_nameorname,args,kwargs,task_type)driver_type=self.producer_pool.connections.connection.transport.driver_typeis_native_delayed_delivery=detect_quorum_queues(self,driver_type)[0]ifis_native_delayed_deliveryandoptions['queue'].exchange.type!='direct':ifeta:ifisinstance(eta,str):eta=isoparse(eta)countdown=(maybe_make_aware(eta)-self.now()).total_seconds()ifcountdown:ifcountdown>0:routing_key=calculate_routing_key(int(countdown),options["queue"].routing_key)exchange=Exchange('celery_delayed_27',type='topic',)deloptions['queue']options['routing_key']=routing_keyoptions['exchange']=exchangeelifis_native_delayed_deliveryandoptions['queue'].exchange.type=='direct':logger.warning('Direct exchanges are not supported with native delayed delivery.\n'f'{options["queue"].exchange.name} is a direct exchange but should be a topic exchange or ''a fanout exchange in order for native delayed delivery to work properly.\n''If quorum queues are used, this task may block the worker process until the ETA arrives.')ifexpiresisnotNone:ifisinstance(expires,datetime):expires_s=(maybe_make_aware(expires)-self.now()).total_seconds()elifisinstance(expires,str):expires_s=(maybe_make_aware(isoparse(expires))-self.now()).total_seconds()else:expires_s=expiresifexpires_s<0:logger.warning(f"{task_id} has an expiration date in the past ({-expires_s}s ago).\n""We assume this is intended and so we have set the ""expiration date to 0 instead.\n""According to RabbitMQ's documentation:\n""\"Setting the TTL to 0 causes messages to be expired upon ""reaching a queue unless they can be delivered to a ""consumer immediately.\"\n""If this was unintended, please check the code which ""published this task.")expires_s=0options["expiration"]=expires_sifnotroot_idornotparent_id:parent=self.current_worker_taskifparent:ifnotroot_id:root_id=parent.request.root_idorparent.request.idifnotparent_id:parent_id=parent.request.idifconf.task_inherit_parent_priority:options.setdefault('priority',parent.request.delivery_info.get('priority'))# alias for 'task_as_v2'message=amqp.create_task_message(task_id,name,args,kwargs,countdown,eta,group_id,group_index,expires,retries,chord,maybe_list(link),maybe_list(link_error),reply_toorself.thread_oid,time_limit,soft_time_limit,self.conf.task_send_sent_event,root_id,parent_id,shadow,chain,ignore_result=ignore_result,replaced_task_nesting=replaced_task_nesting,**options)stamped_headers=options.pop('stamped_headers',[])forstampinstamped_headers:options.pop(stamp)ifconnection:producer=amqp.Producer(connection,auto_declare=False)withself.producer_or_acquire(producer)asP:withP.connection._reraise_as_library_errors():ifnotignore_result:self.backend.on_task_call(P,task_id)amqp.send_task_message(P,name,message,**options)result=(result_clsorself.AsyncResult)(task_id)# We avoid using the constructor since a custom result class# can be used, in which case the constructor may still use# the old signature.result.ignored=ignore_resultifadd_to_parent:ifnothave_parent:parent,have_parent=self.current_worker_task,Trueifparent:parent.add_trail(result)returnresult
[文档]defconnection_for_read(self,url=None,**kwargs):"""Establish connection used for consuming. See Also: :meth:`connection` for supported arguments. """returnself._connection(urlorself.conf.broker_read_url,**kwargs)
[文档]defconnection_for_write(self,url=None,**kwargs):"""Establish connection used for producing. See Also: :meth:`connection` for supported arguments. """returnself._connection(urlorself.conf.broker_write_url,**kwargs)
[文档]defconnection(self,hostname=None,userid=None,password=None,virtual_host=None,port=None,ssl=None,connect_timeout=None,transport=None,transport_options=None,heartbeat=None,login_method=None,failover_strategy=None,**kwargs):"""Establish a connection to the message broker. Please use :meth:`connection_for_read` and :meth:`connection_for_write` instead, to convey the intent of use for this connection. Arguments: url: Either the URL or the hostname of the broker to use. hostname (str): URL, Hostname/IP-address of the broker. If a URL is used, then the other argument below will be taken from the URL instead. userid (str): Username to authenticate as. password (str): Password to authenticate with virtual_host (str): Virtual host to use (domain). port (int): Port to connect to. ssl (bool, Dict): Defaults to the :setting:`broker_use_ssl` setting. transport (str): defaults to the :setting:`broker_transport` setting. transport_options (Dict): Dictionary of transport specific options. heartbeat (int): AMQP Heartbeat in seconds (``pyamqp`` only). login_method (str): Custom login method to use (AMQP only). failover_strategy (str, Callable): Custom failover strategy. **kwargs: Additional arguments to :class:`kombu.Connection`. Returns: kombu.Connection: the lazy connection instance. """returnself.connection_for_write(hostnameorself.conf.broker_write_url,userid=userid,password=password,virtual_host=virtual_host,port=port,ssl=ssl,connect_timeout=connect_timeout,transport=transport,transport_options=transport_options,heartbeat=heartbeat,login_method=login_method,failover_strategy=failover_strategy,**kwargs)
def_connection(self,url,userid=None,password=None,virtual_host=None,port=None,ssl=None,connect_timeout=None,transport=None,transport_options=None,heartbeat=None,login_method=None,failover_strategy=None,**kwargs):conf=self.confreturnself.amqp.Connection(url,useridorconf.broker_user,passwordorconf.broker_password,virtual_hostorconf.broker_vhost,portorconf.broker_port,transport=transportorconf.broker_transport,ssl=self.either('broker_use_ssl',ssl),heartbeat=heartbeat,login_method=login_methodorconf.broker_login_method,failover_strategy=(failover_strategyorconf.broker_failover_strategy),transport_options=dict(conf.broker_transport_options,**transport_optionsor{}),connect_timeout=self.either('broker_connection_timeout',connect_timeout),)broker_connection=connectiondef_acquire_connection(self,pool=True):"""Helper for :meth:`connection_or_acquire`."""ifpool:returnself.pool.acquire(block=True)returnself.connection_for_write()
[文档]defconnection_or_acquire(self,connection=None,pool=True,*_,**__):"""Context used to acquire a connection from the pool. For use within a :keyword:`with` statement to get a connection from the pool if one is not already provided. Arguments: connection (kombu.Connection): If not provided, a connection will be acquired from the connection pool. """returnFallbackContext(connection,self._acquire_connection,pool=pool)
[文档]defproducer_or_acquire(self,producer=None):"""Context used to acquire a producer from the pool. For use within a :keyword:`with` statement to get a producer from the pool if one is not already provided Arguments: producer (kombu.Producer): If not provided, a producer will be acquired from the producer pool. """returnFallbackContext(producer,self.producer_pool.acquire,block=True,)
default_producer=producer_or_acquire# XXX compat
[文档]defprepare_config(self,c):"""Prepare configuration before it is merged with the defaults."""returnfind_deprecated_settings(c)
[文档]defnow(self):"""Return the current time and date as a datetime."""now_in_utc=to_utc(datetime.now(datetime_timezone.utc))returnnow_in_utc.astimezone(self.timezone)
[文档]defselect_queues(self,queues=None):"""Select subset of queues. Arguments: queues (Sequence[str]): a list of queue names to keep. """returnself.amqp.queues.select(queues)
defeither(self,default_key,*defaults):"""Get key from configuration or use default values. Fallback to the value of a configuration key if none of the `*values` are true. """returnfirst(None,[first(None,defaults),starpromise(self.conf.get,default_key),])
[文档]defbugreport(self):"""Return information useful in bug reports."""returnbugreport(self)
def_get_backend(self):backend,url=backends.by_url(self.backend_clsorself.conf.result_backend,self.loader)returnbackend(app=self,url=url)def_finalize_pending_conf(self):"""Get config value by key and finalize loading the configuration. Note: This is used by PendingConfiguration: as soon as you access a key the configuration is read. """try:conf=self._conf=self._load_config()exceptAttributeErroraserr:# AttributeError is not propagated, it is "handled" by# PendingConfiguration parent class. This causes# confusing RecursionError.raiseModuleNotFoundError(*err.args)fromerrreturnconfdef_load_config(self):ifisinstance(self.on_configure,Signal):self.on_configure.send(sender=self)else:# used to be a method pre 4.0self.on_configure()ifself._config_source:self.loader.config_from_object(self._config_source)self.configured=Truesettings=detect_settings(self.prepare_config(self.loader.conf),self._preconf,ignore_keys=self._preconf_set_by_auto,prefix=self.namespace,)ifself._confisnotNone:# replace in place, as someone may have referenced app.conf,# done some changes, accessed a key, and then try to make more# changes to the reference and not the finalized value.self._conf.swap_with(settings)else:self._conf=settings# load lazy config dict initializers.pending_def=self._pending_defaultswhilepending_def:self._conf.add_defaults(maybe_evaluate(pending_def.popleft()()))# load lazy periodic taskspending_beat=self._pending_periodic_taskswhilepending_beat:periodic_task_args,periodic_task_kwargs=pending_beat.popleft()self._add_periodic_task(*periodic_task_args,**periodic_task_kwargs)self.on_after_configure.send(sender=self,source=self._conf)returnself._confdef_after_fork(self):self._pool=Nonetry:self.__dict__['amqp']._producer_pool=Noneexcept(AttributeError,KeyError):passself.on_after_fork.send(sender=self)
[文档]defsignature(self,*args,**kwargs):"""Return a new :class:`~celery.Signature` bound to this app."""kwargs['app']=selfreturnself._canvas.signature(*args,**kwargs)
[文档]defadd_periodic_task(self,schedule,sig,args=(),kwargs=(),name=None,**opts):""" Add a periodic task to beat schedule. Celery beat store tasks based on `sig` or `name` if provided. Adding the same signature twice make the second task override the first one. To avoid the override, use distinct `name` for them. """key,entry=self._sig_to_periodic_task_entry(schedule,sig,args,kwargs,name,**opts)ifself.configured:self._add_periodic_task(key,entry,name=name)else:self._pending_periodic_tasks.append([(key,entry),{"name":name}])returnkey
def_sig_to_periodic_task_entry(self,schedule,sig,args=(),kwargs=None,name=None,**opts):kwargs={}ifnotkwargselsekwargssig=(sig.clone(args,kwargs)ifisinstance(sig,abstract.CallableSignature)elseself.signature(sig.name,args,kwargs))returnnameorrepr(sig),{'schedule':schedule,'task':sig.name,'args':sig.args,'kwargs':sig.kwargs,'options':dict(sig.options,**opts),}def_add_periodic_task(self,key,entry,name=None):ifnameisNoneandkeyinself._conf.beat_schedule:logger.warning(f"Periodic task key='{key}' shadowed a previous unnamed periodic task."" Pass a name kwarg to add_periodic_task to silence this warning.")self._conf.beat_schedule[key]=entrydefcreate_task_cls(self):"""Create a base task class bound to this app."""returnself.subclass_with_self(self.task_cls,name='Task',attribute='_app',keep_reduce=True,abstract=True,)defsubclass_with_self(self,Class,name=None,attribute='app',reverse=None,keep_reduce=False,**kw):"""Subclass an app-compatible class. App-compatible means that the class has a class attribute that provides the default app it should use, for example: ``class Foo: app = None``. Arguments: Class (type): The app-compatible class to subclass. name (str): Custom name for the target class. attribute (str): Name of the attribute holding the app, Default is 'app'. reverse (str): Reverse path to this object used for pickling purposes. For example, to get ``app.AsyncResult``, use ``"AsyncResult"``. keep_reduce (bool): If enabled a custom ``__reduce__`` implementation won't be provided. """Class=symbol_by_name(Class)reverse=reverseifreverseelseClass.__name__def__reduce__(self):return_unpickle_appattr,(reverse,self.__reduce_args__())attrs=dict({attribute:self},__module__=Class.__module__,__doc__=Class.__doc__,**kw)ifnotkeep_reduce:attrs['__reduce__']=__reduce__returntype(nameorClass.__name__,(Class,),attrs)def_rgetattr(self,path):returnattrgetter(path)(self)def__enter__(self):returnselfdef__exit__(self,*exc_info):self.close()def__repr__(self):returnf'<{type(self).__name__}{appstr(self)}>'def__reduce__(self):ifself._using_v1_reduce:returnself.__reduce_v1__()return(_unpickle_app_v2,(self.__class__,self.__reduce_keys__()))def__reduce_v1__(self):# Reduce only pickles the configuration changes,# so the default configuration doesn't have to be passed# between processes.return(_unpickle_app,(self.__class__,self.Pickler)+self.__reduce_args__(),)def__reduce_keys__(self):"""Keyword arguments used to reconstruct the object when unpickling."""return{'main':self.main,'changes':self._conf.changesifself.configuredelseself._preconf,'loader':self.loader_cls,'backend':self.backend_cls,'amqp':self.amqp_cls,'events':self.events_cls,'log':self.log_cls,'control':self.control_cls,'fixups':self.fixups,'config_source':self._config_source,'task_cls':self.task_cls,'namespace':self.namespace,}def__reduce_args__(self):"""Deprecated method, please use :meth:`__reduce_keys__` instead."""return(self.main,self._conf.changesifself.configuredelse{},self.loader_cls,self.backend_cls,self.amqp_cls,self.events_cls,self.log_cls,self.control_cls,False,self._config_source)@cached_propertydefWorker(self):"""Worker application. See Also: :class:`~@Worker`. """returnself.subclass_with_self('celery.apps.worker:Worker')@cached_propertydefWorkController(self,**kwargs):"""Embeddable worker. See Also: :class:`~@WorkController`. """returnself.subclass_with_self('celery.worker:WorkController')@cached_propertydefBeat(self,**kwargs):""":program:`celery beat` scheduler application. See Also: :class:`~@Beat`. """returnself.subclass_with_self('celery.apps.beat:Beat')@cached_propertydefTask(self):"""Base task class for this app."""returnself.create_task_cls()@cached_propertydefannotations(self):returnprepare_annotations(self.conf.task_annotations)@cached_propertydefAsyncResult(self):"""Create new result instance. See Also: :class:`celery.result.AsyncResult`. """returnself.subclass_with_self('celery.result:AsyncResult')@cached_propertydefResultSet(self):returnself.subclass_with_self('celery.result:ResultSet')@cached_propertydefGroupResult(self):"""Create new group result instance. See Also: :class:`celery.result.GroupResult`. """returnself.subclass_with_self('celery.result:GroupResult')@propertydefpool(self):"""Broker connection pool: :class:`~@pool`. Note: This attribute is not related to the workers concurrency pool. """ifself._poolisNone:self._ensure_after_fork()limit=self.conf.broker_pool_limitpools.set_limit(limit)self._pool=pools.connections[self.connection_for_write()]returnself._pool@propertydefcurrent_task(self):"""Instance of task being executed, or :const:`None`."""return_task_stack.top@propertydefcurrent_worker_task(self):"""The task currently being executed by a worker or :const:`None`. Differs from :data:`current_task` in that it's not affected by tasks calling other tasks directly, or eagerly. """returnget_current_worker_task()@cached_propertydefoid(self):"""Universally unique identifier for this app."""# since 4.0: thread.get_ident() is not included when# generating the process id. This is due to how the RPC# backend now dedicates a single thread to receive results,# which would not work if each thread has a separate id.returnoid_from(self,threads=False)@propertydefthread_oid(self):"""Per-thread unique identifier for this app."""try:returnself._local.oidexceptAttributeError:self._local.oid=new_oid=oid_from(self,threads=True)returnnew_oid@cached_propertydefamqp(self):"""AMQP related functionality: :class:`~@amqp`."""returninstantiate(self.amqp_cls,app=self)@propertydef_backend(self):"""A reference to the backend object Uses self._backend_cache if it is thread safe. Otherwise, use self._local """ifself._backend_cacheisnotNone:returnself._backend_cachereturngetattr(self._local,"backend",None)@_backend.setterdef_backend(self,backend):"""Set the backend object on the app"""ifbackend.thread_safe:self._backend_cache=backendelse:self._local.backend=backend@propertydefbackend(self):"""Current backend instance."""ifself._backendisNone:self._backend=self._get_backend()returnself._backend@propertydefconf(self):"""Current configuration."""ifself._confisNone:self._conf=self._load_config()returnself._conf@conf.setterdefconf(self,d):self._conf=d@cached_propertydefcontrol(self):"""Remote control: :class:`~@control`."""returninstantiate(self.control_cls,app=self)@cached_propertydefevents(self):"""Consuming and sending events: :class:`~@events`."""returninstantiate(self.events_cls,app=self)@cached_propertydefloader(self):"""Current loader instance."""returnget_loader_cls(self.loader_cls)(app=self)@cached_propertydeflog(self):"""Logging: :class:`~@log`."""returninstantiate(self.log_cls,app=self)@cached_propertydef_canvas(self):fromceleryimportcanvasreturncanvas@cached_propertydeftasks(self):"""Task registry. Warning: Accessing this attribute will also auto-finalize the app. """self.finalize(auto=True)returnself._tasks@propertydefproducer_pool(self):returnself.amqp.producer_pooldefuses_utc_timezone(self):"""Check if the application uses the UTC timezone."""returnself.timezone==timezone.utc@cached_propertydeftimezone(self):"""Current timezone for this app. This is a cached property taking the time zone from the :setting:`timezone` setting. """conf=self.confifnotconf.timezone:ifconf.enable_utc:returntimezone.utcelse:returntimezone.localreturntimezone.get_timezone(conf.timezone)