importerrnoimportnumbersimportosimportsubprocessimportsysfromitertoolsimportzip_longestifsys.platform=='win32':try:import_winapi# noqaexceptImportError:# pragma: no coverfrom_multiprocessingimportwin32as_winapi# noqaelse:_winapi=None# noqatry:importresourceexceptImportError:# pragma: no coverresource=NonefromioimportUnsupportedOperationFILENO_ERRORS=(AttributeError,ValueError,UnsupportedOperation)ifhasattr(os,'write'):__write__=os.writedefsend_offset(fd,buf,offset):return__write__(fd,buf[offset:])else:# non-posix platformdefsend_offset(fd,buf,offset):# noqaraiseNotImplementedError('send_offset')try:fsencode=os.fsencodefsdecode=os.fsdecodeexceptAttributeError:def_fscodec():encoding=sys.getfilesystemencoding()ifencoding=='mbcs':errors='strict'else:errors='surrogateescape'deffsencode(filename):""" Encode filename to the filesystem encoding with 'surrogateescape' error handler, return bytes unchanged. On Windows, use 'strict' error handler if the file system encoding is 'mbcs' (which is the default encoding). """ifisinstance(filename,bytes):returnfilenameelifisinstance(filename,str):returnfilename.encode(encoding,errors)else:raiseTypeError("expect bytes or str, not %s"%type(filename).__name__)deffsdecode(filename):""" Decode filename from the filesystem encoding with 'surrogateescape' error handler, return str unchanged. On Windows, use 'strict' error handler if the file system encoding is 'mbcs' (which is the default encoding). """ifisinstance(filename,str):returnfilenameelifisinstance(filename,bytes):returnfilename.decode(encoding,errors)else:raiseTypeError("expect bytes or str, not %s"%type(filename).__name__)returnfsencode,fsdecodefsencode,fsdecode=_fscodec()del_fscodecdefmaybe_fileno(f):"""Get object fileno, or :const:`None` if not defined."""ifisinstance(f,numbers.Integral):returnftry:returnf.fileno()exceptFILENO_ERRORS:pass
[文档]defget_fdmax(default=None):"""Return the maximum number of open file descriptors on this system. :keyword default: Value returned if there's no file descriptor limit. """try:returnos.sysconf('SC_OPEN_MAX')except:passifresourceisNone:# Windowsreturndefaultfdmax=resource.getrlimit(resource.RLIMIT_NOFILE)[1]iffdmax==resource.RLIM_INFINITY:returndefaultreturnfdmax
defuniq(it):"""Return all unique elements in ``it``, preserving order."""seen=set()return(seen.add(obj)orobjforobjinitifobjnotinseen)try:closerange=os.closerangeexceptAttributeError:defcloserange(fd_low,fd_high):# noqaforfdinreversed(range(fd_low,fd_high)):try:os.close(fd)exceptOSErrorasexc:ifexc.errno!=errno.EBADF:raisedefclose_open_fds(keep=None):# must make sure this is 0-inclusive (Issue #celery/1882)keep=list(uniq(sorted(fforfinmap(maybe_fileno,keepor[])iffisnotNone)))maxfd=get_fdmax(default=2048)kL,kH=iter([-1]+keep),iter(keep+[maxfd])forlow,highinzip_longest(kL,kH):iflow+1!=high:closerange(low+1,high)else:
defget_errno(exc):""":exc:`socket.error` and :exc:`IOError` first got the ``.errno`` attribute in Py2.7"""try:returnexc.errnoexceptAttributeError:return0try:import_posixsubprocessexceptImportError:defspawnv_passfds(path,args,passfds):ifsys.platform!='win32':# when not using _posixsubprocess (on earlier python) and not on# windows, we want to keep stdout/stderr open...passfds=passfds+[maybe_fileno(sys.stdout),maybe_fileno(sys.stderr),]pid=os.fork()ifnotpid:close_open_fds(keep=sorted(fforfinpassfdsiff))os.execv(fsencode(path),args)returnpidelse:defspawnv_passfds(path,args,passfds):passfds=sorted(passfds)errpipe_read,errpipe_write=os.pipe()try:args=[args,[fsencode(path)],True,tuple(passfds),None,None,-1,-1,-1,-1,-1,-1,errpipe_read,errpipe_write,False,False]ifsys.version_info>=(3,11):args.append(-1)# process_groupifsys.version_info>=(3,9):args.extend((None,None,None,-1))# group, extra_groups, user, umaskargs.append(None)# preexec_fnifsys.version_info>=(3,11):args.append(subprocess._USE_VFORK)return_posixsubprocess.fork_exec(*args)finally:os.close(errpipe_read)os.close(errpipe_write)ifsys.platform=='win32':defsetblocking(handle,blocking):raiseNotImplementedError('setblocking not implemented on win32')defisblocking(handle):raiseNotImplementedError('isblocking not implemented on win32')else:fromosimportO_NONBLOCKfromfcntlimportfcntl,F_GETFL,F_SETFLdefisblocking(handle):# noqareturnnot(fcntl(handle,F_GETFL)&O_NONBLOCK)defsetblocking(handle,blocking):# noqaflags=fcntl(handle,F_GETFL,0)fcntl(handle,F_SETFL,flags&(~O_NONBLOCK)ifblockingelseflags|O_NONBLOCK,)E_PSUTIL_MISSING="""On Windows, the ability to inspect memory usage requires the psutil library.You can install it using pip: $ pip install psutil"""E_RESOURCE_MISSING="""Your platform ({0}) does not seem to have the `resource.getrusage' function.Please open an issue so that we can add support for this platform."""ifsys.platform=='win32':try:importpsutilexceptImportError:# pragma: no coverpsutil=None# noqadefmem_rss():# type () -> intifpsutilisNone:raiseImportError(E_PSUTIL_MISSING.strip())returnint(psutil.Process(os.getpid()).memory_info()[0]/1024.0)else:try:fromresourceimportgetrusage,RUSAGE_SELFexceptImportError:# pragma: no covergetrusage=RUSAGE_SELF=None# noqaif'bsd'insys.platformorsys.platform=='darwin':# On BSD platforms :man:`getrusage(2)` ru_maxrss field is in bytes.defmaxrss_to_kb(v):# type: (SupportsInt) -> intreturnint(v)/1024.0else:# On Linux it's kilobytes.defmaxrss_to_kb(v):# type: (SupportsInt) -> intreturnint(v)defmem_rss():# type () -> intifresourceisNone:raiseImportError(E_RESOURCE_MISSING.strip().format(sys.platform))returnmaxrss_to_kb(getrusage(RUSAGE_SELF).ru_maxrss)