"""Utilities for debugging memory usage, blocking calls, etc."""importosimportsysimporttracebackfromcontextlibimportcontextmanagerfromfunctoolsimportpartialfrompprintimportpprintfromcelery.platformsimportsignalsfromcelery.utils.textimportWhateverIOtry:frompsutilimportProcessexceptImportError:Process=None__all__=('blockdetection','sample_mem','memdump','sample','humanbytes','mem_rss','ps','cry',)UNITS=((2**40.0,'TB'),(2**30.0,'GB'),(2**20.0,'MB'),(2**10.0,'KB'),(0.0,'b'),)_process=None_mem_sample=[]def_on_blocking(signum,frame):importinspectraiseRuntimeError(f'Blocking detection timed-out at: {inspect.getframeinfo(frame)}')@contextmanagerdefblockdetection(timeout):"""Context that raises an exception if process is blocking. Uses ``SIGALRM`` to detect blocking functions. """ifnottimeout:yieldelse:old_handler=signals['ALRM']old_handler=Noneifold_handler==_on_blockingelseold_handlersignals['ALRM']=_on_blockingtry:yieldsignals.arm_alarm(timeout)finally:ifold_handler:signals['ALRM']=old_handlersignals.reset_alarm()
[文档]defsample_mem():"""Sample RSS memory usage. Statistics can then be output by calling :func:`memdump`. """current_rss=mem_rss()_mem_sample.append(current_rss)returncurrent_rss
def_memdump(samples=10):# pragma: no coverS=_mem_sampleprev=list(S)iflen(S)<=sampleselsesample(S,samples)_mem_sample[:]=[]importgcgc.collect()after_collect=mem_rss()returnprev,after_collect
[文档]defmemdump(samples=10,file=None):# pragma: no cover"""Dump memory statistics. Will print a sample of all RSS memory samples added by calling :func:`sample_mem`, and in addition print used RSS memory after :func:`gc.collect`. """say=partial(print,file=file)ifps()isNone:say('- rss: (psutil not installed).')returnprev,after_collect=_memdump(samples)ifprev:say('- rss (sample):')formeminprev:say(f'- > {mem},')say(f'- rss (end): {after_collect}.')
[文档]defsample(x,n,k=0):"""Given a list `x` a sample of length ``n`` of that list is returned. For example, if `n` is 10, and `x` has 100 items, a list of every tenth. item is returned. ``k`` can be used as offset. """j=len(x)//nfor_inrange(n):try:yieldx[k]exceptIndexError:breakk+=j
defhfloat(f,p=5):"""Convert float to value suitable for humans. Arguments: f (float): The floating point number. p (int): Floating point precision (default is 5). """i=int(f)returniifi==felse'{0:.{p}}'.format(f,p=p)defhumanbytes(s):"""Convert bytes to human-readable form (e.g., KB, MB)."""returnnext(f'{hfloat(s/divifdivelses)}{unit}'fordiv,unitinUNITSifs>=div)
[文档]defmem_rss():"""Return RSS memory usage as a humanized string."""p=ps()ifpisnotNone:returnhumanbytes(_process_memory_info(p).rss)
[文档]defps():# pragma: no cover"""Return the global :class:`psutil.Process` instance. Note: Returns :const:`None` if :pypi:`psutil` is not installed. """global_processif_processisNoneandProcessisnotNone:_process=Process(os.getpid())return_process
def_process_memory_info(process):try:returnprocess.memory_info()exceptAttributeError:returnprocess.get_memory_info()defcry(out=None,sepchr='=',seplen=49):# pragma: no cover"""Return stack-trace of all active threads. See Also: Taken from https://gist.github.com/737056. """importthreadingout=WhateverIO()ifoutisNoneelseoutP=partial(print,file=out)# get a map of threads by their ID so we can print their names# during the traceback dumptmap={t.ident:tfortinthreading.enumerate()}sep=sepchr*seplenfortid,frameinsys._current_frames().items():thread=tmap.get(tid)ifnotthread:# skip old junk (left-overs from a fork)continueP(f'{thread.name}')P(sep)traceback.print_stack(frame,file=out)P(sep)P('LOCAL VARIABLES')P(sep)pprint(frame.f_locals,stream=out)P('\n')returnout.getvalue()