"""SQLAlchemy result store backend."""importloggingfromcontextlibimportcontextmanagerfromvine.utilsimportwrapsfromceleryimportstatesfromcelery.backends.baseimportBaseBackendfromcelery.exceptionsimportImproperlyConfiguredfromcelery.utils.timeimportmaybe_timedeltafrom.modelsimportTask,TaskExtended,TaskSetfrom.sessionimportSessionManagertry:fromsqlalchemy.excimportDatabaseError,InvalidRequestErrorfromsqlalchemy.orm.excimportStaleDataErrorexceptImportError:raiseImproperlyConfigured('The database result backend requires SQLAlchemy to be installed.''See https://pypi.org/project/SQLAlchemy/')logger=logging.getLogger(__name__)__all__=('DatabaseBackend',)@contextmanagerdefsession_cleanup(session):try:yieldexceptException:session.rollback()raisefinally:session.close()defretry(fun):@wraps(fun)def_inner(*args,**kwargs):max_retries=kwargs.pop('max_retries',3)forretriesinrange(max_retries):try:returnfun(*args,**kwargs)except(DatabaseError,InvalidRequestError,StaleDataError):logger.warning('Failed operation %s. Retrying %s more times.',fun.__name__,max_retries-retries-1,exc_info=True)ifretries+1>=max_retries:raisereturn_inner
[文档]classDatabaseBackend(BaseBackend):"""The database result backend."""# ResultSet.iterate should sleep this much between each pool,# to not bombard the database with queries.subpolling_interval=0.5task_cls=Tasktaskset_cls=TaskSetdef__init__(self,dburi=None,engine_options=None,url=None,**kwargs):# The `url` argument was added later and is used by# the app to set backend by url (celery.app.backends.by_url)super().__init__(expires_type=maybe_timedelta,url=url,**kwargs)conf=self.app.confifself.extended_result:self.task_cls=TaskExtendedself.url=urlordburiorconf.database_urlself.engine_options=dict(engine_optionsor{},**conf.database_engine_optionsor{})self.short_lived_sessions=kwargs.get('short_lived_sessions',conf.database_short_lived_sessions)schemas=conf.database_table_schemasor{}tablenames=conf.database_table_namesor{}self.task_cls.configure(schema=schemas.get('task'),name=tablenames.get('task'))self.taskset_cls.configure(schema=schemas.get('group'),name=tablenames.get('group'))ifnotself.url:raiseImproperlyConfigured('Missing connection string! Do you have the'' database_url setting set to a real value?')self.session_manager=SessionManager()create_tables_at_setup=conf.database_create_tables_at_setupifcreate_tables_at_setupisTrue:self._create_tables()@propertydefextended_result(self):returnself.app.conf.find_value_for_key('extended','result')def_create_tables(self):"""Create the task and taskset tables."""self.ResultSession()
@retrydef_store_result(self,task_id,result,state,traceback=None,request=None,**kwargs):"""Store return value and state of an executed task."""session=self.ResultSession()withsession_cleanup(session):task=list(session.query(self.task_cls).filter(self.task_cls.task_id==task_id))task=taskandtask[0]ifnottask:task=self.task_cls(task_id)task.task_id=task_idsession.add(task)session.flush()self._update_result(task,result,state,traceback=traceback,request=request)session.commit()def_update_result(self,task,result,state,traceback=None,request=None):meta=self._get_result_meta(result=result,state=state,traceback=traceback,request=request,format_date=False,encode=True)# Exclude the primary key id and task_id columns# as we should not set it Nonecolumns=[column.nameforcolumninself.task_cls.__table__.columnsifcolumn.namenotin{'id','task_id'}]# Iterate through the columns name of the table# to set the value from meta.# If the value is not present in meta, set Noneforcolumnincolumns:value=meta.get(column)setattr(task,column,value)@retrydef_get_task_meta_for(self,task_id):"""Get task meta-data for a task by id."""session=self.ResultSession()withsession_cleanup(session):task=list(session.query(self.task_cls).filter(self.task_cls.task_id==task_id))task=taskandtask[0]ifnottask:task=self.task_cls(task_id)task.status=states.PENDINGtask.result=Nonedata=task.to_dict()ifdata.get('args',None)isnotNone:data['args']=self.decode(data['args'])ifdata.get('kwargs',None)isnotNone:data['kwargs']=self.decode(data['kwargs'])returnself.meta_from_decoded(data)@retrydef_save_group(self,group_id,result):"""Store the result of an executed group."""session=self.ResultSession()withsession_cleanup(session):group=self.taskset_cls(group_id,result)session.add(group)session.flush()session.commit()returnresult@retrydef_restore_group(self,group_id):"""Get meta-data for group by id."""session=self.ResultSession()withsession_cleanup(session):group=session.query(self.taskset_cls).filter(self.taskset_cls.taskset_id==group_id).first()ifgroup:returngroup.to_dict()@retrydef_delete_group(self,group_id):"""Delete meta-data for group by id."""session=self.ResultSession()withsession_cleanup(session):session.query(self.taskset_cls).filter(self.taskset_cls.taskset_id==group_id).delete()session.flush()session.commit()@retrydef_forget(self,task_id):"""Forget about result."""session=self.ResultSession()withsession_cleanup(session):session.query(self.task_cls).filter(self.task_cls.task_id==task_id).delete()session.commit()