"""Redis result store backend."""importtimefromcontextlibimportcontextmanagerfromfunctoolsimportpartialfromsslimportCERT_NONE,CERT_OPTIONAL,CERT_REQUIREDfromurllib.parseimportunquotefromkombu.utils.functionalimportretry_over_timefromkombu.utils.objectsimportcached_propertyfromkombu.utils.urlimport_parse_url,maybe_sanitize_urlfromceleryimportstatesfromcelery._stateimporttask_join_will_blockfromcelery.canvasimportmaybe_signaturefromcelery.exceptionsimportBackendStoreError,ChordError,ImproperlyConfiguredfromcelery.resultimportGroupResult,allow_join_resultfromcelery.utils.functionalimport_regen,dictfilterfromcelery.utils.logimportget_loggerfromcelery.utils.timeimporthumanize_secondsfrom.asynchronousimportAsyncBackendMixin,BaseResultConsumerfrom.baseimportBaseKeyValueStoreBackendtry:importredis.connectionfromkombu.transport.redisimportget_redis_error_classesexceptImportError:redis=Noneget_redis_error_classes=Nonetry:importredis.sentinelexceptImportError:pass__all__=('RedisBackend','SentinelBackend')E_REDIS_MISSING="""You need to install the redis library in order to use \the Redis result store backend."""E_REDIS_SENTINEL_MISSING="""You need to install the redis library with support of \sentinel in order to use the Redis result store backend."""W_REDIS_SSL_CERT_OPTIONAL="""Setting ssl_cert_reqs=CERT_OPTIONAL when connecting to redis means that \celery might not validate the identity of the redis broker when connecting. \This leaves you vulnerable to man in the middle attacks."""W_REDIS_SSL_CERT_NONE="""Setting ssl_cert_reqs=CERT_NONE when connecting to redis means that celery \will not validate the identity of the redis broker when connecting. This \leaves you vulnerable to man in the middle attacks."""E_REDIS_SSL_PARAMS_AND_SCHEME_MISMATCH="""SSL connection parameters have been provided but the specified URL scheme \is redis://. A Redis SSL connection URL should use the scheme rediss://."""E_REDIS_SSL_CERT_REQS_MISSING_INVALID="""A rediss:// URL must have parameter ssl_cert_reqs and this must be set to \CERT_REQUIRED, CERT_OPTIONAL, or CERT_NONE"""E_LOST='Connection to Redis lost: Retry (%s/%s) %s.'E_RETRY_LIMIT_EXCEEDED="""Retry limit exceeded while trying to reconnect to the Celery redis result \store backend. The Celery application must be restarted."""logger=get_logger(__name__)classResultConsumer(BaseResultConsumer):_pubsub=Nonedef__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)self._get_key_for_task=self.backend.get_key_for_taskself._decode_result=self.backend.decode_resultself._ensure=self.backend.ensureself._connection_errors=self.backend.connection_errorsself.subscribed_to=set()defon_after_fork(self):try:self.backend.client.connection_pool.reset()ifself._pubsubisnotNone:self._pubsub.close()exceptKeyErrorase:logger.warning(str(e))super().on_after_fork()def_reconnect_pubsub(self):self._pubsub=Noneself.backend.client.connection_pool.reset()# task state might have changed when the connection was down so we# retrieve meta for all subscribed tasks before going into pubsub modeifself.subscribed_to:metas=self.backend.client.mget(self.subscribed_to)metas=[metaformetainmetasifmeta]formetainmetas:self.on_state_change(self._decode_result(meta),None)self._pubsub=self.backend.client.pubsub(ignore_subscribe_messages=True,)# subscribed_to maybe empty after on_state_changeifself.subscribed_to:self._pubsub.subscribe(*self.subscribed_to)else:self._pubsub.connection=self._pubsub.connection_pool.get_connection('pubsub',self._pubsub.shard_hint)# even if there is nothing to subscribe, we should not lose the callback after connecting.# The on_connect callback will re-subscribe to any channels we previously subscribed to.self._pubsub.connection.register_connect_callback(self._pubsub.on_connect)@contextmanagerdefreconnect_on_error(self):try:yieldexceptself._connection_errors:try:self._ensure(self._reconnect_pubsub,())exceptself._connection_errors:logger.critical(E_RETRY_LIMIT_EXCEEDED)raisedef_maybe_cancel_ready_task(self,meta):ifmeta['status']instates.READY_STATES:self.cancel_for(meta['task_id'])defon_state_change(self,meta,message):super().on_state_change(meta,message)self._maybe_cancel_ready_task(meta)defstart(self,initial_task_id,**kwargs):self._pubsub=self.backend.client.pubsub(ignore_subscribe_messages=True,)self._consume_from(initial_task_id)defon_wait_for_pending(self,result,**kwargs):formetainresult._iter_meta(**kwargs):ifmetaisnotNone:self.on_state_change(meta,None)defstop(self):ifself._pubsubisnotNone:self._pubsub.close()defdrain_events(self,timeout=None):ifself._pubsub:withself.reconnect_on_error():message=self._pubsub.get_message(timeout=timeout)ifmessageandmessage['type']=='message':self.on_state_change(self._decode_result(message['data']),message)eliftimeout:time.sleep(timeout)defconsume_from(self,task_id):ifself._pubsubisNone:returnself.start(task_id)self._consume_from(task_id)def_consume_from(self,task_id):key=self._get_key_for_task(task_id)ifkeynotinself.subscribed_to:self.subscribed_to.add(key)withself.reconnect_on_error():self._pubsub.subscribe(key)defcancel_for(self,task_id):key=self._get_key_for_task(task_id)self.subscribed_to.discard(key)ifself._pubsub:withself.reconnect_on_error():self._pubsub.unsubscribe(key)
[文档]classRedisBackend(BaseKeyValueStoreBackend,AsyncBackendMixin):"""Redis task result store. It makes use of the following commands: GET, MGET, DEL, INCRBY, EXPIRE, SET, SETEX """ResultConsumer=ResultConsumer#: :pypi:`redis` client module.redis=redisconnection_class_ssl=redis.SSLConnectionifrediselseNone#: Maximum number of connections in the pool.max_connections=Nonesupports_autoexpire=Truesupports_native_join=True#: Maximal length of string value in Redis.#: 512 MB - https://redis.io/topics/data-types_MAX_STR_VALUE_SIZE=536870912def__init__(self,host=None,port=None,db=None,password=None,max_connections=None,url=None,connection_pool=None,**kwargs):super().__init__(expires_type=int,**kwargs)_get=self.app.conf.getifself.redisisNone:raiseImproperlyConfigured(E_REDIS_MISSING.strip())ifhostand'://'inhost:url,host=host,Noneself.max_connections=(max_connectionsor_get('redis_max_connections')orself.max_connections)self._ConnectionPool=connection_poolsocket_timeout=_get('redis_socket_timeout')socket_connect_timeout=_get('redis_socket_connect_timeout')retry_on_timeout=_get('redis_retry_on_timeout')socket_keepalive=_get('redis_socket_keepalive')health_check_interval=_get('redis_backend_health_check_interval')self.connparams={'host':_get('redis_host')or'localhost','port':_get('redis_port')or6379,'db':_get('redis_db')or0,'password':_get('redis_password'),'max_connections':self.max_connections,'socket_timeout':socket_timeoutandfloat(socket_timeout),'retry_on_timeout':retry_on_timeoutorFalse,'socket_connect_timeout':socket_connect_timeoutandfloat(socket_connect_timeout),}username=_get('redis_username')ifusername:# We're extra careful to avoid including this configuration value# if it wasn't specified since older versions of py-redis# don't support specifying a username.# Only Redis>6.0 supports username/password authentication.# TODO: Include this in connparams' definition once we drop# support for py-redis<3.4.0.self.connparams['username']=usernameifhealth_check_interval:self.connparams["health_check_interval"]=health_check_interval# absent in redis.connection.UnixDomainSocketConnectionifsocket_keepalive:self.connparams['socket_keepalive']=socket_keepalive# "redis_backend_use_ssl" must be a dict with the keys:# 'ssl_cert_reqs', 'ssl_ca_certs', 'ssl_certfile', 'ssl_keyfile'# (the same as "broker_use_ssl")ssl=_get('redis_backend_use_ssl')ifssl:self.connparams.update(ssl)self.connparams['connection_class']=self.connection_class_sslifurl:self.connparams=self._params_from_url(url,self.connparams)# If we've received SSL parameters via query string or the# redis_backend_use_ssl dict, check ssl_cert_reqs is valid. If set# via query string ssl_cert_reqs will be a string so convert it hereif('connection_class'inself.connparamsandissubclass(self.connparams['connection_class'],redis.SSLConnection)):ssl_cert_reqs_missing='MISSING'ssl_string_to_constant={'CERT_REQUIRED':CERT_REQUIRED,'CERT_OPTIONAL':CERT_OPTIONAL,'CERT_NONE':CERT_NONE,'required':CERT_REQUIRED,'optional':CERT_OPTIONAL,'none':CERT_NONE}ssl_cert_reqs=self.connparams.get('ssl_cert_reqs',ssl_cert_reqs_missing)ssl_cert_reqs=ssl_string_to_constant.get(ssl_cert_reqs,ssl_cert_reqs)ifssl_cert_reqsnotinssl_string_to_constant.values():raiseValueError(E_REDIS_SSL_CERT_REQS_MISSING_INVALID)ifssl_cert_reqs==CERT_OPTIONAL:logger.warning(W_REDIS_SSL_CERT_OPTIONAL)elifssl_cert_reqs==CERT_NONE:logger.warning(W_REDIS_SSL_CERT_NONE)self.connparams['ssl_cert_reqs']=ssl_cert_reqsself.url=urlself.connection_errors,self.channel_errors=(get_redis_error_classes()ifget_redis_error_classeselse((),()))self.result_consumer=self.ResultConsumer(self,self.app,self.accept,self._pending_results,self._pending_messages,)def_params_from_url(self,url,defaults):scheme,host,port,username,password,path,query=_parse_url(url)connparams=dict(defaults,**dictfilter({'host':host,'port':port,'username':username,'password':password,'db':query.pop('virtual_host',None)}))ifscheme=='socket':# use 'path' as path to the socket… in this case# the database number should be given in 'query'connparams.update({'connection_class':self.redis.UnixDomainSocketConnection,'path':'/'+path,})# host+port are invalid options when using this connection type.connparams.pop('host',None)connparams.pop('port',None)connparams.pop('socket_connect_timeout')else:connparams['db']=pathssl_param_keys=['ssl_ca_certs','ssl_certfile','ssl_keyfile','ssl_cert_reqs']ifscheme=='redis':# If connparams or query string contain ssl params, raise errorif(any(keyinconnparamsforkeyinssl_param_keys)orany(keyinqueryforkeyinssl_param_keys)):raiseValueError(E_REDIS_SSL_PARAMS_AND_SCHEME_MISMATCH)ifscheme=='rediss':connparams['connection_class']=redis.SSLConnection# The following parameters, if present in the URL, are encoded. We# must add the decoded values to connparams.forssl_settinginssl_param_keys:ssl_val=query.pop(ssl_setting,None)ifssl_val:connparams[ssl_setting]=unquote(ssl_val)# db may be string and start with / like in kombu.db=connparams.get('db')or0db=db.strip('/')ifisinstance(db,str)elsedbconnparams['db']=int(db)forkey,valueinquery.items():ifkeyinredis.connection.URL_QUERY_ARGUMENT_PARSERS:query[key]=redis.connection.URL_QUERY_ARGUMENT_PARSERS[key](value)# Query parameters override other parametersconnparams.update(query)returnconnparams
[文档]defset(self,key,value,**retry_policy):ifisinstance(value,str)andlen(value)>self._MAX_STR_VALUE_SIZE:raiseBackendStoreError('value too large for Redis backend')returnself.ensure(self._set,(key,value),**retry_policy)
[文档]defapply_chord(self,header_result_args,body,**kwargs):# If any of the child results of this chord are complex (ie. group# results themselves), we need to save `header_result` to ensure that# the expected structure is retained when we finish the chord and pass# the results onward to the body in `on_chord_part_return()`. We don't# do this is all cases to retain an optimisation in the common case# where a chord header is comprised of simple result objects.ifnotisinstance(header_result_args[1],_regen):header_result=self.app.GroupResult(*header_result_args)ifany(isinstance(nr,GroupResult)fornrinheader_result.results):header_result.save(backend=self)
[文档]defon_chord_part_return(self,request,state,result,propagate=None,**kwargs):app=self.apptid,gid,group_index=request.id,request.group,request.group_indexifnotgidornottid:returnifgroup_indexisNone:group_index='+inf'client=self.clientjkey=self.get_key_for_group(gid,'.j')tkey=self.get_key_for_group(gid,'.t')skey=self.get_key_for_group(gid,'.s')result=self.encode_result(result,state)encoded=self.encode([1,tid,state,result])withclient.pipeline()aspipe:pipeline=(pipe.zadd(jkey,{encoded:group_index}).zcount(jkey,"-inf","+inf")ifself._chord_zsetelsepipe.rpush(jkey,encoded).llen(jkey)).get(tkey).get(skey)ifself.expires:pipeline=pipeline \
.expire(jkey,self.expires) \
.expire(tkey,self.expires) \
.expire(skey,self.expires)_,readycount,totaldiff,chord_size_bytes=pipeline.execute()[:4]totaldiff=int(totaldiffor0)ifchord_size_bytes:try:callback=maybe_signature(request.chord,app=app)total=int(chord_size_bytes)+totaldiffifreadycount==total:header_result=GroupResult.restore(gid)ifheader_resultisnotNone:# If we manage to restore a `GroupResult`, then it must# have been complex and saved by `apply_chord()` earlier.## Before we can join the `GroupResult`, it needs to be# manually marked as ready to avoid blockingheader_result.on_ready()# We'll `join()` it to get the results and ensure they are# structured as intended rather than the flattened version# we'd construct without any other information.join_func=(header_result.join_nativeifheader_result.supports_native_joinelseheader_result.join)withallow_join_result():resl=join_func(timeout=app.conf.result_chord_join_timeout,propagate=True)else:# Otherwise simply extract and decode the results we# stashed along the way, which should be faster for large# numbers of simple results in the chord header.decode,unpack=self.decode,self._unpack_chord_resultwithclient.pipeline()aspipe:ifself._chord_zset:pipeline=pipe.zrange(jkey,0,-1)else:pipeline=pipe.lrange(jkey,0,total)resl,=pipeline.execute()resl=[unpack(tup,decode)fortupinresl]try:callback.delay(resl)exceptExceptionasexc:# pylint: disable=broad-exceptlogger.exception('Chord callback for %r raised: %r',request.group,exc)returnself.chord_error_from_stack(callback,ChordError(f'Callback error: {exc!r}'),)finally:withclient.pipeline()aspipe:pipe \
.delete(jkey) \
.delete(tkey) \
.delete(skey) \
.execute()exceptChordErrorasexc:logger.exception('Chord %r raised: %r',request.group,exc)returnself.chord_error_from_stack(callback,exc)exceptExceptionasexc:# pylint: disable=broad-exceptlogger.exception('Chord %r raised: %r',request.group,exc)returnself.chord_error_from_stack(callback,ChordError(f'Join error: {exc!r}'),)
ifgetattr(redis,"sentinel",None):classSentinelManagedSSLConnection(redis.sentinel.SentinelManagedConnection,redis.SSLConnection):"""Connect to a Redis server using Sentinel + TLS. Use Sentinel to identify which Redis server is the current master to connect to and when connecting to the Master server, use an SSL Connection. """
[文档]classSentinelBackend(RedisBackend):"""Redis sentinel task result store."""# URL looks like `sentinel://0.0.0.0:26347/3;sentinel://0.0.0.0:26348/3`_SERVER_URI_SEPARATOR=";"sentinel=getattr(redis,"sentinel",None)connection_class_ssl=SentinelManagedSSLConnectionifsentinelelseNonedef__init__(self,*args,**kwargs):ifself.sentinelisNone:raiseImproperlyConfigured(E_REDIS_SENTINEL_MISSING.strip())super().__init__(*args,**kwargs)
[文档]defas_uri(self,include_password=False):"""Return the server addresses as URIs, sanitizing the password or not."""# Allow superclass to do work if we don't need to force sanitizationifinclude_password:returnsuper().as_uri(include_password=include_password,)# Otherwise we need to ensure that all components get sanitized rather# by passing them one by one to the `kombu` helperuri_chunks=(maybe_sanitize_url(chunk)forchunkin(self.urlor"").split(self._SERVER_URI_SEPARATOR))# Similar to the superclass, strip the trailing slash from URIs with# all components empty other than the schemereturnself._SERVER_URI_SEPARATOR.join(uri[:-1]ifuri.endswith(":///")elseuriforuriinuri_chunks)
def_params_from_url(self,url,defaults):chunks=url.split(self._SERVER_URI_SEPARATOR)connparams=dict(defaults,hosts=[])forchunkinchunks:data=super()._params_from_url(url=chunk,defaults=defaults)connparams['hosts'].append(data)forparamin("host","port","db","password"):connparams.pop(param)# Adding db/password in connparams to connect to the correct instanceforparamin("db","password"):ifconnparams['hosts']andparaminconnparams['hosts'][0]:connparams[param]=connparams['hosts'][0].get(param)returnconnparamsdef_get_sentinel_instance(self,**params):connparams=params.copy()hosts=connparams.pop("hosts")min_other_sentinels=self._transport_options.get("min_other_sentinels",0)sentinel_kwargs=self._transport_options.get("sentinel_kwargs",{})sentinel_instance=self.sentinel.Sentinel([(cp['host'],cp['port'])forcpinhosts],min_other_sentinels=min_other_sentinels,sentinel_kwargs=sentinel_kwargs,**connparams)returnsentinel_instancedef_get_pool(self,**params):sentinel_instance=self._get_sentinel_instance(**params)master_name=self._transport_options.get("master_name",None)returnsentinel_instance.master_for(service_name=master_name,redis_class=self._get_client(),).connection_pool