"""Internal state.This is an internal module containing thread statelike the ``current_app``, and ``current_task``.This module shouldn't be used directly."""importosimportsysimportthreadingimportweakreffromcelery.localimportProxyfromcelery.utils.threadsimportLocalStack__all__=('set_default_app','get_current_app','get_current_task','get_current_worker_task','current_app','current_task','connect_on_app_finalize',)#: Global default app used when no current app.default_app=None#: Function returning the app provided or the default app if none.#:#: The environment variable :envvar:`CELERY_TRACE_APP` is used to#: trace app leaks. When enabled an exception is raised if there#: is no active app.app_or_default=None#: List of all app instances (weakrefs), mustn't be used directly._apps=weakref.WeakSet()#: Global set of functions to call whenever a new app is finalized.#: Shared tasks, and built-in tasks are created by adding callbacks here._on_app_finalizers=set()_task_join_will_block=False
[文档]defconnect_on_app_finalize(callback):"""Connect callback to be called when any app is finalized."""_on_app_finalizers.add(callback)returncallback
def_announce_app_finalized(app):callbacks=set(_on_app_finalizers)forcallbackincallbacks:callback(app)def_set_task_join_will_block(blocks):global_task_join_will_block_task_join_will_block=blocksdeftask_join_will_block():return_task_join_will_blockclass_TLS(threading.local):#: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute#: sets this, so it will always contain the last instantiated app,#: and is the default app returned by :func:`app_or_default`.current_app=None_tls=_TLS()_task_stack=LocalStack()#: Function used to push a task to the thread local stack#: keeping track of the currently executing task.#: You must remember to pop the task after.push_current_task=_task_stack.push#: Function used to pop a task from the thread local stack#: keeping track of the currently executing task.pop_current_task=_task_stack.pop
def_get_current_app():ifdefault_appisNone:#: creates the global fallback app instance.fromcelery.app.baseimportCeleryset_default_app(Celery('default',fixups=[],set_as_current=False,loader=os.environ.get('CELERY_LOADER')or'default',))return_tls.current_appordefault_appdef_set_current_app(app):_tls.current_app=appifos.environ.get('C_STRICT_APP'):# pragma: no coverdefget_current_app():"""Return the current app."""raiseRuntimeError('USES CURRENT APP')elifos.environ.get('C_WARN_APP'):# pragma: no cover
[文档]defget_current_worker_task():"""Currently executing task, that was applied by the worker. This is used to differentiate between the actual task executed by the worker and any task that was called within a task (using ``task.__call__`` or ``task.apply``) """fortaskinreversed(_task_stack.stack):ifnottask.request.called_directly:returntask
#: Proxy to current app.current_app=Proxy(get_current_app)#: Proxy to current task.current_task=Proxy(get_current_task)def_register_app(app):_apps.add(app)def_deregister_app(app):_apps.discard(app)def_get_active_apps():return_appsdef_app_or_default(app=None):ifappisNone:returnget_current_app()returnappdef_app_or_default_trace(app=None):# pragma: no coverfromtracebackimportprint_stacktry:frombilliard.processimportcurrent_processexceptImportError:current_process=NoneifappisNone:ifgetattr(_tls,'current_app',None):print('-- RETURNING TO CURRENT APP --')# +print_stack()return_tls.current_appifnotcurrent_processorcurrent_process()._name=='MainProcess':raiseException('DEFAULT APP')print('-- RETURNING TO DEFAULT APP --')# +print_stack()returndefault_appreturnapp
[文档]defenable_trace():"""Enable tracing of app instances."""globalapp_or_defaultapp_or_default=_app_or_default_trace
[文档]defdisable_trace():"""Disable tracing of app instances."""globalapp_or_defaultapp_or_default=_app_or_default
ifos.environ.get('CELERY_TRACE_APP'):# pragma: no coverenable_trace()else:disable_trace()