[文档]defimport_service(module_name:str):""" 导入基于字符串路径的服务类 """# module[:class_name]parts=module_name.split(":",1)iflen(parts)==1:module_name,obj=module_name,Noneelse:module_name,obj=parts[0],parts[1]try:__import__(module_name)exceptImportErrorasexc:ifmodule_name.endswith(".py")andos.path.exists(module_name):raiseCommandError("Failed to find service, did you mean '{}'?".format(module_name[:-3].replace("/",".")))missing_module_re=MISSING_MODULE_TEMPLATE.format(module_name)# 有没有更好的方法来做到这一点?ifre.match(missing_module_re,str(exc)):raiseCommandError(exc)# 找到模块,但在其他地方导入时引发了导入错误,让它冒泡(导致打印完整的堆栈跟踪)。raisemodule=sys.modules[module_name]# 未指定服务类# 遍历查找服务类以及入口点(RPC远程调用)ifobjisNone:found_services=[]# 查找具有入口点的顶级对象(service)。for_,potential_serviceininspect.getmembers(module,is_type):ifinspect.getmembers(potential_service,is_entrypoint):found_services.append(potential_service)ifnotfound_services:raiseCommandError("Failed to find anything that looks like a service in module ""{!r}".format(module_name))# 指定的服务类不为空else:try:service_cls=getattr(module,obj)exceptAttributeError:raiseCommandError("Failed to find service class {!r} in module {!r}".format(obj,module_name))ifnotisinstance(service_cls,type):raiseCommandError("Service must be a class.")found_services=[service_cls]returnfound_services
[文档]defsetup_backdoor(runner:ServiceRunner,port:int):"""启动 后门"""def_bad_call():raiseRuntimeError("This would kill your service, not close the backdoor. To exit, ""use ctrl-c.")socket=eventlet.listen(("localhost",port))# work around https://github.com/celery/kombu/issues/838socket.settimeout(None)gt=eventlet.spawn(backdoor.backdoor_server,socket,locals={"runner":runner,"quit":_bad_call,"exit":_bad_call,},)returnsocket,gt
[文档]defrun(services:List[Type],config:dict,backdoor_port:Optional[int]=None):""" 基于配置,运行指定的服务类列表 """service_runner=ServiceRunner(config)forservice_clsinservices:service_runner.add_service(service_cls)defshutdown(signum,frame):# 信号处理程序由主循环运行,无法使用 eventlet 原语,因此我们必须在绿色线程中调用 `stop`。eventlet.spawn_n(service_runner.stop)signal.signal(signal.SIGTERM,shutdown)ifbackdoor_portisnotNone:setup_backdoor(service_runner,backdoor_port)service_runner.start()# 如果信号处理程序在 eventlet 正在等待套接字时触发,# `__main__` 绿色线程会收到一个 OSError(4) "Interrupted system call"。# 这是 eventlet hub 机制的副作用。为了保护 nameko 不受该异常的影响,# 我们将 `runner.wait` 调用包装在此处生成的绿色线程中,以便捕获(并静默)该异常。runnlet=eventlet.spawn(service_runner.wait)whileTrue:try:runnlet.wait()exceptOSErrorasexc:ifexc.errno==errno.EINTR:# 这是由信号处理程序引起的 OSError(4)。# 忽略它并返回继续等待 runner。continueraiseexceptKeyboardInterrupt:print()# 在终端中显示类似于 bash 的 ^C 形式,看起来更好。try:service_runner.stop()exceptKeyboardInterrupt:print()# as aboveservice_runner.kill()else:# runner.wait completedbreak# pragma: no cover (coverage problem on py39)