[文档]defapply_target(target,args=(),kwargs=None,callback=None,accept_callback=None,pid=None,getpid=os.getpid,propagate=(),monotonic=time.monotonic,**_):"""Apply function within pool context."""kwargs={}ifnotkwargselsekwargsifaccept_callback:accept_callback(pidorgetpid(),monotonic())try:ret=target(*args,**kwargs)exceptpropagate:raiseexceptException:raiseexcept(WorkerShutdown,WorkerTerminate):raiseexceptBaseExceptionasexc:try:reraise(WorkerLostError,WorkerLostError(repr(exc)),sys.exc_info()[2])exceptWorkerLostError:callback(ExceptionInfo())else:callback(ret)
[文档]classBasePool:"""Task pool."""RUN=0x1CLOSE=0x2TERMINATE=0x3Timer=timer2.Timer#: set to true if the pool can be shutdown from within#: a signal handler.signal_safe=True#: set to true if pool uses greenlets.is_green=False_state=None_pool=None_does_debug=True#: only used by multiprocessing pooluses_semaphore=Falsetask_join_will_block=Truebody_can_be_buffer=Falsedef__init__(self,limit=None,putlocks=True,forking_enable=True,callbacks_propagate=(),app=None,**options):self.limit=limitself.putlocks=putlocksself.options=optionsself.forking_enable=forking_enableself.callbacks_propagate=callbacks_propagateself.app=app
[文档]defapply_async(self,target,args=None,kwargs=None,**options):"""Equivalent of the :func:`apply` built-in function. Callbacks should optimally return as soon as possible since otherwise the thread which handles the result will get blocked. """kwargs={}ifnotkwargselsekwargsargs=[]ifnotargselseargsifself._does_debug:logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)',target,truncate(safe_repr(args),1024),truncate(safe_repr(kwargs),1024))returnself.on_apply(target,args,kwargs,waitforslot=self.putlocks,callbacks_propagate=self.callbacks_propagate,**options)
def_get_info(self)->Dict[str,Any]:""" Return configuration and statistics information. Subclasses should augment the data as required. :return: The returned value must be JSON-friendly. """return{'implementation':self.__class__.__module__+':'+self.__class__.__name__,'max-concurrency':self.limit,}@propertydefinfo(self):returnself._get_info()@propertydefactive(self):returnself._state==self.RUN@propertydefnum_processes(self):returnself.limit