"""A directed acyclic graph of reusable components."""fromcollectionsimportdequefromthreadingimportEventfromkombu.commonimportignore_errorsfromkombu.utils.encodingimportbytes_to_strfromkombu.utils.importsimportsymbol_by_namefrom.utils.graphimportDependencyGraph,GraphFormatterfrom.utils.importsimportinstantiate,qualnamefrom.utils.logimportget_loggertry:fromgreenletimportGreenletExitexceptImportError:IGNORE_ERRORS=()else:IGNORE_ERRORS=(GreenletExit,)__all__=('Blueprint','Step','StartStopStep','ConsumerStep')#: StatesRUN=0x1CLOSE=0x2TERMINATE=0x3logger=get_logger(__name__)def_pre(ns,fmt):returnf'| {ns.alias}: {fmt}'def_label(s):returns.name.rsplit('.',1)[-1]classStepFormatter(GraphFormatter):"""Graph formatter for :class:`Blueprint`."""blueprint_prefix='⧉'conditional_prefix='∘'blueprint_scheme={'shape':'parallelogram','color':'slategray4','fillcolor':'slategray3',}deflabel(self,step):returnstepand'{}{}'.format(self._get_prefix(step),bytes_to_str((step.labelor_label(step)).encode('utf-8','ignore')),)def_get_prefix(self,step):ifstep.last:returnself.blueprint_prefixifstep.conditional:returnself.conditional_prefixreturn''defnode(self,obj,**attrs):scheme=self.blueprint_schemeifobj.lastelseself.node_schemereturnself.draw_node(obj,scheme,attrs)defedge(self,a,b,**attrs):ifa.last:attrs.update(arrowhead='none',color='darkseagreen3')returnself.draw_edge(a,b,self.edge_scheme,attrs)
[文档]classBlueprint:"""Blueprint containing bootsteps that can be applied to objects. Arguments: steps Sequence[Union[str, Step]]: List of steps. name (str): Set explicit name for this blueprint. on_start (Callable): Optional callback applied after blueprint start. on_close (Callable): Optional callback applied before blueprint close. on_stopped (Callable): Optional callback applied after blueprint stopped. """GraphFormatter=StepFormattername=Nonestate=Nonestarted=0default_steps=set()state_to_name={0:'initializing',RUN:'running',CLOSE:'closing',TERMINATE:'terminating',}def__init__(self,steps=None,name=None,on_start=None,on_close=None,on_stopped=None):self.name=nameorself.nameorqualname(type(self))self.types=set(stepsor[])|set(self.default_steps)self.on_start=on_startself.on_close=on_closeself.on_stopped=on_stoppedself.shutdown_complete=Event()self.steps={}
[文档]defsend_all(self,parent,method,description=None,reverse=True,propagate=True,args=()):description=descriptionormethod.replace('_',' ')steps=reversed(parent.steps)ifreverseelseparent.stepsforstepinsteps:ifstep:fun=getattr(step,method,None)iffunisnotNone:self._debug('%s%s...',description.capitalize(),step.alias)try:fun(parent,*args)exceptExceptionasexc:# pylint: disable=broad-exceptifpropagate:raiselogger.exception('Error on %s%s: %r',description,step.alias,exc)
[文档]defstop(self,parent,close=True,terminate=False):what='terminating'ifterminateelse'stopping'ifself.statein(CLOSE,TERMINATE):returnifself.state!=RUNorself.started!=len(parent.steps):# Not fully started, can safely exit.self.state=TERMINATEself.shutdown_complete.set()returnself.close(parent)self.state=CLOSEself.restart(parent,'terminate'ifterminateelse'stop',description=what,propagate=False,)ifself.on_stopped:self.on_stopped()self.state=TERMINATEself.shutdown_complete.set()
[文档]defjoin(self,timeout=None):try:# Will only get here if running green,# makes sure all greenthreads have exited.self.shutdown_complete.wait(timeout=timeout)exceptIGNORE_ERRORS:pass
[文档]defapply(self,parent,**kwargs):"""Apply the steps in this blueprint to an object. This will apply the ``__init__`` and ``include`` methods of each step, with the object as argument:: step = Step(obj) ... step.include(obj) For :class:`StartStopStep` the services created will also be added to the objects ``steps`` attribute. """self._debug('Preparing bootsteps.')order=self.order=[]steps=self.steps=self.claim_steps()self._debug('Building graph...')forSinself._finalize_steps(steps):step=S(parent,**kwargs)steps[step.name]=steporder.append(step)self._debug('New boot order: {%s}',', '.join(s.aliasforsinself.order))forstepinorder:step.include(parent)returnself
classStepType(type):"""Meta-class for steps."""name=Nonerequires=Nonedef__new__(cls,name,bases,attrs):module=attrs.get('__module__')qname=f'{module}.{name}'ifmoduleelsenameattrs.update(__qualname__=qname,name=attrs.get('name')orqname,)returnsuper().__new__(cls,name,bases,attrs)def__str__(cls):returncls.namedef__repr__(cls):return'step:{0.name}{{{0.requires!r}}}'.format(cls)
[文档]classStep(metaclass=StepType):"""A Bootstep. The :meth:`__init__` method is called when the step is bound to a parent object, and can as such be used to initialize attributes in the parent object at parent instantiation-time. """#: Optional step name, will use ``qualname`` if not specified.name=None#: Optional short name used for graph outputs and in logs.label=None#: Set this to true if the step is enabled based on some condition.conditional=False#: List of other steps that that must be started before this step.#: Note that all dependencies must be in the same blueprint.requires=()#: This flag is reserved for the workers Consumer,#: since it is required to always be started last.#: There can only be one object marked last#: in every blueprint.last=False#: This provides the default for :meth:`include_if`.enabled=Truedef__init__(self,parent,**kwargs):pass
[文档]definclude_if(self,parent):"""Return true if bootstep should be included. You can define this as an optional predicate that decides whether this step should be created. """returnself.enabled
[文档]classStartStopStep(Step):"""Bootstep that must be started and stopped in order."""#: Optional obj created by the :meth:`create` method.#: This is used by :class:`StartStopStep` to keep the#: original service object.obj=None