"""SQLAlchemy session."""importtimefromkombu.utils.compatimportregister_after_forkfromsqlalchemyimportcreate_enginefromsqlalchemy.excimportDatabaseErrorfromsqlalchemy.ormimportsessionmakerfromsqlalchemy.poolimportNullPoolfromcelery.utils.timeimportget_exponential_backoff_intervaltry:fromsqlalchemy.ormimportdeclarative_baseexceptImportError:# TODO: Remove this once we drop support for SQLAlchemy < 1.4.fromsqlalchemy.ext.declarativeimportdeclarative_baseResultModelBase=declarative_base()__all__=('SessionManager',)PREPARE_MODELS_MAX_RETRIES=10def_after_fork_cleanup_session(session):session._after_fork()
[文档]defprepare_models(self,engine):ifnotself.prepared:# SQLAlchemy will check if the items exist before trying to# create them, which is a race condition. If it raises an error# in one iteration, the next may pass all the existence checks# and the call will succeed.retries=0whileTrue:try:ResultModelBase.metadata.create_all(engine)exceptDatabaseError:ifretries<PREPARE_MODELS_MAX_RETRIES:sleep_amount_ms=get_exponential_backoff_interval(10,retries,1000,True)time.sleep(sleep_amount_ms/1000)retries+=1else:raiseelse:breakself.prepared=True