"""Remote Debugger.Introduction============This is a remote debugger for Celery tasks running in multiprocessingpool workers. Inspired by a lost post on dzone.com.Usage-----.. code-block:: python from celery.contrib import rdb from celery import task @task() def add(x, y): result = x + y rdb.set_trace() return resultEnvironment Variables=====================.. envvar:: CELERY_RDB_HOST``CELERY_RDB_HOST``------------------- Hostname to bind to. Default is '127.0.0.1' (only accessible from localhost)... envvar:: CELERY_RDB_PORT``CELERY_RDB_PORT``------------------- Base port to bind to. Default is 6899. The debugger will try to find an available port starting from the base port. The selected port will be logged by the worker."""importerrnoimportosimportsocketimportsysfrompdbimportPdbfrombilliard.processimportcurrent_process__all__=('CELERY_RDB_HOST','CELERY_RDB_PORT','DEFAULT_PORT','Rdb','debugger','set_trace',)DEFAULT_PORT=6899CELERY_RDB_HOST=os.environ.get('CELERY_RDB_HOST')or'127.0.0.1'CELERY_RDB_PORT=int(os.environ.get('CELERY_RDB_PORT')orDEFAULT_PORT)#: Holds the currently active debugger._current=[None]_frame=getattr(sys,'_getframe')NO_AVAILABLE_PORT="""\{self.ident}: Couldn't find an available port.Please specify one using the CELERY_RDB_PORT environment variable."""BANNER="""\{self.ident}: Ready to connect: telnet {self.host}{self.port}Type `exit` in session to continue.{self.ident}: Waiting for client..."""SESSION_STARTED='{self.ident}: Now in session with {self.remote_addr}.'SESSION_ENDED='{self.ident}: Session with {self.remote_addr} ended.'
[文档]classRdb(Pdb):"""Remote debugger."""me='Remote Debugger'_prev_outs=None_sock=Nonedef__init__(self,host=CELERY_RDB_HOST,port=CELERY_RDB_PORT,port_search_limit=100,port_skew=+0,out=sys.stdout):self.active=Trueself.out=outself._prev_handles=sys.stdin,sys.stdoutself._sock,this_port=self.get_avail_port(host,port,port_search_limit,port_skew,)self._sock.setblocking(1)self._sock.listen(1)self.ident=f'{self.me}:{this_port}'self.host=hostself.port=this_portself.say(BANNER.format(self=self))self._client,address=self._sock.accept()self._client.setblocking(1)self.remote_addr=':'.join(str(v)forvinaddress)self.say(SESSION_STARTED.format(self=self))self._handle=sys.stdin=sys.stdout=self._client.makefile('rw')super().__init__(completekey='tab',stdin=self._handle,stdout=self._handle)defget_avail_port(self,host,port,search_limit=100,skew=+0):try:_,skew=current_process().name.split('-')skew=int(skew)exceptValueError:passthis_port=Noneforiinrange(search_limit):_sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)this_port=port+skew+itry:_sock.bind((host,this_port))exceptOSErrorasexc:ifexc.errnoin[errno.EADDRINUSE,errno.EINVAL]:continueraiseelse:return_sock,this_portraiseException(NO_AVAILABLE_PORT.format(self=self))defsay(self,m):print(m,file=self.out)def__enter__(self):returnselfdef__exit__(self,*exc_info):self._close_session()def_close_session(self):self.stdin,self.stdout=sys.stdin,sys.stdout=self._prev_handlesifself.active:ifself._handleisnotNone:self._handle.close()ifself._clientisnotNone:self._client.close()ifself._sockisnotNone:self._sock.close()self.active=Falseself.say(SESSION_ENDED.format(self=self))defdo_continue(self,arg):self._close_session()self.set_continue()return1do_c=do_cont=do_continuedefdo_quit(self,arg):self._close_session()self.set_quit()return1do_q=do_exit=do_quitdefset_quit(self):# this raises a BdbQuit exception that we're unable to catch.sys.settrace(None)
[文档]defdebugger():"""Return the current debugger instance, or create if none."""rdb=_current[0]ifrdbisNoneornotrdb.active:rdb=_current[0]=Rdb()returnrdb
[文档]defset_trace(frame=None):"""Set break-point at current location, or a specified frame."""ifframeisNone:frame=_frame().f_backreturndebugger().set_trace(frame)