"""Elasticsearch result store backend."""fromdatetimeimportdatetime,timezonefromkombu.utils.encodingimportbytes_to_strfromkombu.utils.urlimport_parse_urlfromceleryimportstatesfromcelery.exceptionsimportImproperlyConfiguredfrom.baseimportKeyValueStoreBackendtry:importelasticsearchexceptImportError:elasticsearch=Nonetry:importelastic_transportexceptImportError:elastic_transport=None__all__=('ElasticsearchBackend',)E_LIB_MISSING="""\You need to install the elasticsearch library to use the Elasticsearch \result backend.\"""
[文档]classElasticsearchBackend(KeyValueStoreBackend):"""Elasticsearch Backend. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`elasticsearch` is not available. """index='celery'doc_type=Nonescheme='http'host='localhost'port=9200username=Nonepassword=Nonees_retry_on_timeout=Falsees_timeout=10es_max_retries=3def__init__(self,url=None,*args,**kwargs):super().__init__(*args,**kwargs)self.url=url_get=self.app.conf.getifelasticsearchisNone:raiseImproperlyConfigured(E_LIB_MISSING)index=doc_type=scheme=host=port=username=password=Noneifurl:scheme,host,port,username,password,path,_=_parse_url(url)ifscheme=='elasticsearch':scheme=Noneifpath:path=path.strip('/')index,_,doc_type=path.partition('/')self.index=indexorself.indexself.doc_type=doc_typeorself.doc_typeself.scheme=schemeorself.schemeself.host=hostorself.hostself.port=portorself.portself.username=usernameorself.usernameself.password=passwordorself.passwordself.es_retry_on_timeout=(_get('elasticsearch_retry_on_timeout')orself.es_retry_on_timeout)es_timeout=_get('elasticsearch_timeout')ifes_timeoutisnotNone:self.es_timeout=es_timeoutes_max_retries=_get('elasticsearch_max_retries')ifes_max_retriesisnotNone:self.es_max_retries=es_max_retriesself.es_save_meta_as_text=_get('elasticsearch_save_meta_as_text',True)self._server=None
def_index(self,id,body,**kwargs):body={bytes_to_str(k):vfork,vinbody.items()}ifself.doc_type:returnself.server.index(id=bytes_to_str(id),index=self.index,doc_type=self.doc_type,body=body,params={'op_type':'create'},**kwargs)else:returnself.server.index(id=bytes_to_str(id),index=self.index,body=body,params={'op_type':'create'},**kwargs)def_update(self,id,body,state,**kwargs):"""Update state in a conflict free manner. If state is defined (not None), this will not update ES server if either: * existing state is success * existing state is a ready state and current state in not a ready state This way, a Retry state cannot override a Success or Failure, and chord_unlock will not retry indefinitely. """body={bytes_to_str(k):vfork,vinbody.items()}try:res_get=self._get(key=id)ifnotres_get.get('found'):returnself._index(id,body,**kwargs)# document disappeared between index and get calls.exceptelasticsearch.exceptions.NotFoundError:returnself._index(id,body,**kwargs)try:meta_present_on_backend=self.decode_result(res_get['_source']['result'])except(TypeError,KeyError):passelse:ifmeta_present_on_backend['status']==states.SUCCESS:# if stored state is already in success, do nothingreturn{'result':'noop'}elifmeta_present_on_backend['status']instates.READY_STATESandstateinstates.UNREADY_STATES:# if stored state is in ready state and current not, do nothingreturn{'result':'noop'}# get current sequence number and primary term# https://www.elastic.co/guide/en/elasticsearch/reference/current/optimistic-concurrency-control.htmlseq_no=res_get.get('_seq_no',1)prim_term=res_get.get('_primary_term',1)# try to update document with current seq_no and primary_termifself.doc_type:res=self.server.update(id=bytes_to_str(id),index=self.index,doc_type=self.doc_type,body={'doc':body},params={'if_primary_term':prim_term,'if_seq_no':seq_no},**kwargs)else:res=self.server.update(id=bytes_to_str(id),index=self.index,body={'doc':body},params={'if_primary_term':prim_term,'if_seq_no':seq_no},**kwargs)# result is elastic search update query result# noop = query did not update any document# updated = at least one document got updatedifres['result']=='noop':raiseelasticsearch.exceptions.ConflictError("conflicting update occurred concurrently",elastic_transport.ApiResponseMeta(409,"HTTP/1.1",elastic_transport.HttpHeaders(),0,elastic_transport.NodeConfig(self.scheme,self.host,self.port)),None)returnres
def_get_server(self):"""Connect to the Elasticsearch server."""http_auth=Noneifself.usernameandself.password:http_auth=(self.username,self.password)returnelasticsearch.Elasticsearch(f'{self.scheme}://{self.host}:{self.port}',retry_on_timeout=self.es_retry_on_timeout,max_retries=self.es_max_retries,timeout=self.es_timeout,http_auth=http_auth,)@propertydefserver(self):ifself._serverisNone:self._server=self._get_server()returnself._server