"""MongoDB result store backend."""fromdatetimeimportdatetime,timedelta,timezonefromkombu.exceptionsimportEncodeErrorfromkombu.utils.objectsimportcached_propertyfromkombu.utils.urlimportmaybe_sanitize_url,urlparsefromceleryimportstatesfromcelery.exceptionsimportImproperlyConfiguredfrom.baseimportBaseBackendtry:importpymongoexceptImportError:pymongo=Noneifpymongo:try:frombson.binaryimportBinaryexceptImportError:frompymongo.binaryimportBinaryfrompymongo.errorsimportInvalidDocumentelse:# pragma: no coverBinary=NoneclassInvalidDocument(Exception):pass__all__=('MongoBackend',)BINARY_CODECS=frozenset(['pickle','msgpack'])
[文档]classMongoBackend(BaseBackend):"""MongoDB result backend. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`pymongo` is not available. """mongo_host=Nonehost='localhost'port=27017user=Nonepassword=Nonedatabase_name='celery'taskmeta_collection='celery_taskmeta'groupmeta_collection='celery_groupmeta'max_pool_size=10options=Nonesupports_autoexpire=False_connection=Nonedef__init__(self,app=None,**kwargs):self.options={}super().__init__(app,**kwargs)ifnotpymongo:raiseImproperlyConfigured('You need to install the pymongo library to use the ''MongoDB backend.')# Set option defaultsforkey,valueinself._prepare_client_options().items():self.options.setdefault(key,value)# update conf with mongo uri data, only if uri was givenifself.url:self.url=self._ensure_mongodb_uri_compliance(self.url)uri_data=pymongo.uri_parser.parse_uri(self.url)# build the hosts list to create a mongo connectionhostslist=[f'{x[0]}:{x[1]}'forxinuri_data['nodelist']]self.user=uri_data['username']self.password=uri_data['password']self.mongo_host=hostslistifuri_data['database']:# if no database is provided in the uri, use defaultself.database_name=uri_data['database']self.options.update(uri_data['options'])# update conf with specific settingsconfig=self.app.conf.get('mongodb_backend_settings')ifconfigisnotNone:ifnotisinstance(config,dict):raiseImproperlyConfigured('MongoDB backend settings should be grouped in a dict')config=dict(config)# don't modify originalif'host'inconfigor'port'inconfig:# these should take over uri confself.mongo_host=Noneself.host=config.pop('host',self.host)self.port=config.pop('port',self.port)self.mongo_host=config.pop('mongo_host',self.mongo_host)self.user=config.pop('user',self.user)self.password=config.pop('password',self.password)self.database_name=config.pop('database',self.database_name)self.taskmeta_collection=config.pop('taskmeta_collection',self.taskmeta_collection,)self.groupmeta_collection=config.pop('groupmeta_collection',self.groupmeta_collection,)self.options.update(config.pop('options',{}))self.options.update(config)@staticmethoddef_ensure_mongodb_uri_compliance(url):parsed_url=urlparse(url)ifnotparsed_url.scheme.startswith('mongodb'):url=f'mongodb+{url}'ifurl=='mongodb://':url+='localhost'returnurldef_prepare_client_options(self):ifpymongo.version_tuple>=(3,):return{'maxPoolSize':self.max_pool_size}else:# pragma: no coverreturn{'max_pool_size':self.max_pool_size,'auto_start_request':False}def_get_connection(self):"""Connect to the MongoDB server."""ifself._connectionisNone:frompymongoimportMongoClienthost=self.mongo_hostifnothost:# The first pymongo.Connection() argument (host) can be# a list of ['host:port'] elements or a mongodb connection# URI. If this is the case, don't use self.port# but let pymongo get the port(s) from the URI instead.# This enables the use of replica sets and sharding.# See pymongo.Connection() for more info.host=self.hostifisinstance(host,str) \
andnothost.startswith('mongodb://'):host=f'mongodb://{host}:{self.port}'# don't change self.optionsconf=dict(self.options)conf['host']=hostifself.user:conf['username']=self.userifself.password:conf['password']=self.passwordself._connection=MongoClient(**conf)returnself._connection
[文档]defencode(self,data):ifself.serializer=='bson':# mongodb handles serializationreturndatapayload=super().encode(data)# serializer which are in a unsupported format (pickle/binary)ifself.serializerinBINARY_CODECS:payload=Binary(payload)returnpayload
def_store_result(self,task_id,result,state,traceback=None,request=None,**kwargs):"""Store return value and state of an executed task."""meta=self._get_result_meta(result=self.encode(result),state=state,traceback=traceback,request=request,format_date=False)# Add the _id for mongodbmeta['_id']=task_idtry:self.collection.replace_one({'_id':task_id},meta,upsert=True)exceptInvalidDocumentasexc:raiseEncodeError(exc)returnresultdef_get_task_meta_for(self,task_id):"""Get task meta-data for a task by id."""obj=self.collection.find_one({'_id':task_id})ifobj:ifself.app.conf.find_value_for_key('extended','result'):returnself.meta_from_decoded({'name':obj['name'],'args':obj['args'],'task_id':obj['_id'],'queue':obj['queue'],'kwargs':obj['kwargs'],'status':obj['status'],'worker':obj['worker'],'retries':obj['retries'],'children':obj['children'],'date_done':obj['date_done'],'traceback':obj['traceback'],'result':self.decode(obj['result']),})returnself.meta_from_decoded({'task_id':obj['_id'],'status':obj['status'],'result':self.decode(obj['result']),'date_done':obj['date_done'],'traceback':obj['traceback'],'children':obj['children'],})return{'status':states.PENDING,'result':None}def_save_group(self,group_id,result):"""Save the group result."""meta={'_id':group_id,'result':self.encode([i.idforiinresult]),'date_done':datetime.now(timezone.utc),}self.group_collection.replace_one({'_id':group_id},meta,upsert=True)returnresultdef_restore_group(self,group_id):"""Get the result for a group by id."""obj=self.group_collection.find_one({'_id':group_id})ifobj:return{'task_id':obj['_id'],'date_done':obj['date_done'],'result':[self.app.AsyncResult(task)fortaskinself.decode(obj['result'])],}def_delete_group(self,group_id):"""Delete a group by id."""self.group_collection.delete_one({'_id':group_id})def_forget(self,task_id):"""Remove result from MongoDB. Raises: pymongo.exceptions.OperationsError: if the task_id could not be removed. """# By using safe=True, this will wait until it receives a response from# the server. Likewise, it will raise an OperationsError if the# response was unable to be completed.self.collection.delete_one({'_id':task_id})
def__reduce__(self,args=(),kwargs=None):kwargs={}ifnotkwargselsekwargsreturnsuper().__reduce__(args,dict(kwargs,expires=self.expires,url=self.url))def_get_database(self):conn=self._get_connection()returnconn[self.database_name]@cached_propertydefdatabase(self):"""Get database from MongoDB connection. performs authentication if necessary. """returnself._get_database()@cached_propertydefcollection(self):"""Get the meta-data task collection."""collection=self.database[self.taskmeta_collection]# Ensure an index on date_done is there, if not process the index# in the background. Once completed cleanup will be much fastercollection.create_index('date_done',background=True)returncollection@cached_propertydefgroup_collection(self):"""Get the meta-data task collection."""collection=self.database[self.groupmeta_collection]# Ensure an index on date_done is there, if not process the index# in the background. Once completed cleanup will be much fastercollection.create_index('date_done',background=True)returncollection@cached_propertydefexpires_delta(self):returntimedelta(seconds=self.expires)
[文档]defas_uri(self,include_password=False):"""Return the backend as an URI. Arguments: include_password (bool): Password censored if disabled. """ifnotself.url:return'mongodb://'ifinclude_password:returnself.urlif','notinself.url:returnmaybe_sanitize_url(self.url)uri1,remainder=self.url.split(',',1)return','.join([maybe_sanitize_url(uri1),remainder])