"""Apache Cassandra result store backend using the DataStax driver."""importthreadingfromceleryimportstatesfromcelery.exceptionsimportImproperlyConfiguredfromcelery.utils.logimportget_loggerfrom.baseimportBaseBackendtry:# pragma: no coverimportcassandraimportcassandra.authimportcassandra.clusterimportcassandra.queryexceptImportError:cassandra=None__all__=('CassandraBackend',)logger=get_logger(__name__)E_NO_CASSANDRA="""You need to install the cassandra-driver library touse the Cassandra backend. See https://github.com/datastax/python-driver"""E_NO_SUCH_CASSANDRA_AUTH_PROVIDER="""CASSANDRA_AUTH_PROVIDER you provided is not a valid auth_provider class.See https://datastax.github.io/python-driver/api/cassandra/auth.html."""E_CASSANDRA_MISCONFIGURED='Cassandra backend improperly configured.'E_CASSANDRA_NOT_CONFIGURED='Cassandra backend not configured.'Q_INSERT_RESULT="""INSERT INTO {table} ( task_id, status, result, date_done, traceback, children) VALUES (%s, %s, %s, %s, %s, %s) {expires};"""Q_SELECT_RESULT="""SELECT status, result, date_done, traceback, childrenFROM {table}WHERE task_id=%sLIMIT 1"""Q_CREATE_RESULT_TABLE="""CREATE TABLE {table} ( task_id text, status text, result blob, date_done timestamp, traceback blob, children blob, PRIMARY KEY ((task_id), date_done)) WITH CLUSTERING ORDER BY (date_done DESC);"""Q_EXPIRES=""" USING TTL {0}"""defbuf_t(x):returnbytes(x,'utf8')
[文档]classCassandraBackend(BaseBackend):"""Cassandra/AstraDB backend utilizing DataStax driver. Raises: celery.exceptions.ImproperlyConfigured: if module :pypi:`cassandra-driver` is not available, or not-exactly-one of the :setting:`cassandra_servers` and the :setting:`cassandra_secure_bundle_path` settings is set. """#: List of Cassandra servers with format: ``hostname``.servers=None#: Location of the secure connect bundle zipfile (absolute path).bundle_path=Nonesupports_autoexpire=True# autoexpire supported via entry_ttldef__init__(self,servers=None,keyspace=None,table=None,entry_ttl=None,port=None,bundle_path=None,**kwargs):super().__init__(**kwargs)ifnotcassandra:raiseImproperlyConfigured(E_NO_CASSANDRA)conf=self.app.confself.servers=serversorconf.get('cassandra_servers',None)self.bundle_path=bundle_pathorconf.get('cassandra_secure_bundle_path',None)self.port=portorconf.get('cassandra_port',None)or9042self.keyspace=keyspaceorconf.get('cassandra_keyspace',None)self.table=tableorconf.get('cassandra_table',None)self.cassandra_options=conf.get('cassandra_options',{})# either servers or bundle path must be provided...db_directions=self.serversorself.bundle_pathifnotdb_directionsornotself.keyspaceornotself.table:raiseImproperlyConfigured(E_CASSANDRA_NOT_CONFIGURED)# ...but not both:ifself.serversandself.bundle_path:raiseImproperlyConfigured(E_CASSANDRA_MISCONFIGURED)expires=entry_ttlorconf.get('cassandra_entry_ttl',None)self.cqlexpires=(Q_EXPIRES.format(expires)ifexpiresisnotNoneelse'')read_cons=conf.get('cassandra_read_consistency')or'LOCAL_QUORUM'write_cons=conf.get('cassandra_write_consistency')or'LOCAL_QUORUM'self.read_consistency=getattr(cassandra.ConsistencyLevel,read_cons,cassandra.ConsistencyLevel.LOCAL_QUORUM)self.write_consistency=getattr(cassandra.ConsistencyLevel,write_cons,cassandra.ConsistencyLevel.LOCAL_QUORUM)self.auth_provider=Noneauth_provider=conf.get('cassandra_auth_provider',None)auth_kwargs=conf.get('cassandra_auth_kwargs',None)ifauth_providerandauth_kwargs:auth_provider_class=getattr(cassandra.auth,auth_provider,None)ifnotauth_provider_class:raiseImproperlyConfigured(E_NO_SUCH_CASSANDRA_AUTH_PROVIDER)self.auth_provider=auth_provider_class(**auth_kwargs)self._cluster=Noneself._session=Noneself._write_stmt=Noneself._read_stmt=Noneself._lock=threading.RLock()def_get_connection(self,write=False):"""Prepare the connection for action. Arguments: write (bool): are we a writer? """ifself._sessionisnotNone:returnself._lock.acquire()try:ifself._sessionisnotNone:return# using either 'servers' or 'bundle_path' here:ifself.servers:self._cluster=cassandra.cluster.Cluster(self.servers,port=self.port,auth_provider=self.auth_provider,**self.cassandra_options)else:# 'bundle_path' is guaranteed to be setself._cluster=cassandra.cluster.Cluster(cloud={'secure_connect_bundle':self.bundle_path,},auth_provider=self.auth_provider,**self.cassandra_options)self._session=self._cluster.connect(self.keyspace)# We're forced to do concatenation below, as formatting would# blow up on superficial %s that'll be processed by Cassandraself._write_stmt=cassandra.query.SimpleStatement(Q_INSERT_RESULT.format(table=self.table,expires=self.cqlexpires),)self._write_stmt.consistency_level=self.write_consistencyself._read_stmt=cassandra.query.SimpleStatement(Q_SELECT_RESULT.format(table=self.table),)self._read_stmt.consistency_level=self.read_consistencyifwrite:# Only possible writers "workers" are allowed to issue# CREATE TABLE. This is to prevent conflicting situations# where both task-creator and task-executor would issue it# at the same time.# Anyway; if you're doing anything critical, you should# have created this table in advance, in which case# this query will be a no-op (AlreadyExists)make_stmt=cassandra.query.SimpleStatement(Q_CREATE_RESULT_TABLE.format(table=self.table),)make_stmt.consistency_level=self.write_consistencytry:self._session.execute(make_stmt)exceptcassandra.AlreadyExists:passexceptcassandra.OperationTimedOut:# a heavily loaded or gone Cassandra cluster failed to respond.# leave this class in a consistent stateifself._clusterisnotNone:self._cluster.shutdown()# also shuts down _sessionself._cluster=Noneself._session=Noneraise# we did fail after all - reraisefinally:self._lock.release()def_store_result(self,task_id,result,state,traceback=None,request=None,**kwargs):"""Store return value and state of an executed task."""self._get_connection(write=True)self._session.execute(self._write_stmt,(task_id,state,buf_t(self.encode(result)),self.app.now(),buf_t(self.encode(traceback)),buf_t(self.encode(self.current_task_children(request)))))
def_get_task_meta_for(self,task_id):"""Get task meta-data for a task by id."""self._get_connection()res=self._session.execute(self._read_stmt,(task_id,)).one()ifnotres:return{'status':states.PENDING,'result':None}status,result,date_done,traceback,children=resreturnself.meta_from_decoded({'task_id':task_id,'status':status,'result':self.decode(result),'date_done':date_done,'traceback':self.decode(traceback),'children':self.decode(children),})def__reduce__(self,args=(),kwargs=None):kwargs={}ifnotkwargselsekwargskwargs.update({'servers':self.servers,'keyspace':self.keyspace,'table':self.table})returnsuper().__reduce__(args,kwargs)