importasyncioimportinspectimportrefromcopyimportcopy,deepcopyfromfunctoolsimportpartialfromtypingimport(Any,Awaitable,Callable,Dict,Generator,Iterable,List,Optional,Set,Tuple,Type,TypedDict,TypeVar,Union,cast,)frompypikaimportOrder,Query,Tablefrompypika.termsimportTermfromtyping_extensionsimportSelffromtortoiseimportconnectionsfromtortoise.backends.base.clientimportBaseDBAsyncClientfromtortoise.exceptionsimport(ConfigurationError,DoesNotExist,IncompleteInstanceError,IntegrityError,ObjectDoesNotExistError,OperationalError,ParamsError,)fromtortoise.fields.baseimportFieldfromtortoise.fields.dataimportIntFieldfromtortoise.fields.relationalimport(BackwardFKRelation,BackwardOneToOneRelation,ForeignKeyFieldInstance,ManyToManyFieldInstance,ManyToManyRelation,NoneAwaitable,OneToOneFieldInstance,ReverseRelation,)fromtortoise.filtersimportFilterInfoDict,get_filters_for_fieldfromtortoise.functionsimportFunctionfromtortoise.indexesimportIndexfromtortoise.managerimportManagerfromtortoise.querysetimport(BulkCreateQuery,BulkUpdateQuery,ExistsQuery,Q,QuerySet,QuerySetSingle,RawSQLQuery,)fromtortoise.routerimportrouterfromtortoise.signalsimportSignalsfromtortoise.transactionsimportin_transactionMODEL=TypeVar("MODEL",bound="Model")EMPTY=object()defget_together(meta:"Model.Meta",together:str)->Tuple[Tuple[str,...],...]:_together=getattr(meta,together,())if_togetherandisinstance(_together,(list,tuple))andisinstance(_together[0],str):_together=(_together,)# return without validation, validation will be done further in the codereturn_togetherdefprepare_default_ordering(meta:"Model.Meta")->Tuple[Tuple[str,Order],...]:ordering_list=getattr(meta,"ordering",())parsed_ordering=tuple(QuerySet._resolve_ordering_string(ordering)fororderinginordering_list)returnparsed_orderingclassFkSetterKwargs(TypedDict):_key:strrelation_field:strto_field:strdef_fk_setter(self:"Model",value:"Optional[Model]",_key:str,relation_field:str,to_field:str,)->None:setattr(self,relation_field,getattr(value,to_field)ifvalueelseNone)setattr(self,_key,value)def_fk_getter(self:"Model",_key:str,ftype:"Type[Model]",relation_field:str,to_field:str)->Awaitable:try:returngetattr(self,_key)exceptAttributeError:value=getattr(self,relation_field)ifvalueisnotNone:returnftype.filter(**{to_field:value}).first()returnNoneAwaitabledef_rfk_getter(self:"Model",_key:str,ftype:"Type[Model]",frelfield:str,from_field:str)->ReverseRelation:val=getattr(self,_key,None)ifvalisNone:val=ReverseRelation(ftype,frelfield,self,from_field)setattr(self,_key,val)returnvaldef_ro2o_getter(self:"Model",_key:str,ftype:"Type[Model]",frelfield:str,from_field:str)->"QuerySetSingle[Optional[Model]]":ifhasattr(self,_key):returngetattr(self,_key)val=ftype.filter(**{frelfield:getattr(self,from_field)}).first()setattr(self,_key,val)returnvaldef_m2m_getter(self:"Model",_key:str,field_object:ManyToManyFieldInstance)->ManyToManyRelation:val=getattr(self,_key,None)ifvalisNone:val=ManyToManyRelation(self,field_object)setattr(self,_key,val)returnvaldef_get_comments(cls:"Type[Model]")->Dict[str,str]:""" Get comments exactly before attributes It can be multiline comment. The placeholder "{model}" will be replaced with the name of the model class. We require that the comments are in #: (with a colon) format, so you can differentiate between private and public comments. :param cls: The class we need to extract comments from its source. :return: The dictionary of comments by field name """try:source=inspect.getsource(cls)except(TypeError,OSError):# pragma: nocoveragereturn{}comments={}forcls_inreversed(cls.__mro__):ifcls_isobject:continuematches=re.findall(r"((?:(?!\n|^)[^\w\n]*#:.*?\n)+?)[^\w\n]*(\w+)\s*[:=]",source)formatchinmatches:field_name=match[1]# Extract textcomment=re.sub(r"(^\s*#:\s*|\s*$)","",match[0],flags=re.MULTILINE)# Class name templatecomments[field_name]=comment.replace("{model}",cls_.__name__)returncommentsclassMetaInfo:__slots__=("abstract","db_table","schema","app","fields","db_fields","m2m_fields","o2o_fields","backward_o2o_fields","fk_fields","backward_fk_fields","fetch_fields","fields_db_projection","_inited","fields_db_projection_reverse","filters","fields_map","default_connection","basequery","basequery_all_fields","basetable","_filters","unique_together","manager","indexes","pk_attr","generated_db_fields","_model","table_description","pk","db_pk_column","db_native_fields","db_default_fields","db_complex_fields","_default_ordering","_ordering_validated",)def__init__(self,meta:"Model.Meta")->None:self.abstract:bool=getattr(meta,"abstract",False)self.manager:Manager=getattr(meta,"manager",Manager())self.db_table:str=getattr(meta,"table","")self.schema:Optional[str]=getattr(meta,"schema",None)self.app:Optional[str]=getattr(meta,"app",None)self.unique_together:Tuple[Tuple[str,...],...]=get_together(meta,"unique_together")self.indexes:Tuple[Tuple[str,...],...]=get_together(meta,"indexes")self._default_ordering:Tuple[Tuple[str,Order],...]=prepare_default_ordering(meta)self._ordering_validated:bool=Falseself.fields:Set[str]=set()self.db_fields:Set[str]=set()self.m2m_fields:Set[str]=set()self.fk_fields:Set[str]=set()self.o2o_fields:Set[str]=set()self.backward_fk_fields:Set[str]=set()self.backward_o2o_fields:Set[str]=set()self.fetch_fields:Set[str]=set()self.fields_db_projection:Dict[str,str]={}self.fields_db_projection_reverse:Dict[str,str]={}self._filters:Dict[str,FilterInfoDict]={}self.filters:Dict[str,FilterInfoDict]={}self.fields_map:Dict[str,Field]={}self._inited:bool=Falseself.default_connection:Optional[str]=Noneself.basequery:Query=Query()self.basequery_all_fields:Query=Query()self.basetable:Table=Table("")self.pk_attr:str=getattr(meta,"pk_attr","")self.generated_db_fields:Tuple[str,...]=None# type: ignoreself._model:Type["Model"]=None# type: ignoreself.table_description:str=getattr(meta,"table_description","")self.pk:Field=None# type: ignoreself.db_pk_column:str=""self.db_native_fields:List[Tuple[str,str,Field]]=[]self.db_default_fields:List[Tuple[str,str,Field]]=[]self.db_complex_fields:List[Tuple[str,str,Field]]=[]@propertydeffull_name(self)->str:returnf"{self.app}.{self._model.__name__}"defadd_field(self,name:str,value:Field)->None:ifnameinself.fields_map:raiseConfigurationError(f"Field {name} already present in meta")value.model=self._modelself.fields_map[name]=valuevalue.model_field_name=nameifvalue.has_db_field:self.fields_db_projection[name]=value.source_fieldornameifisinstance(value,ManyToManyFieldInstance):self.m2m_fields.add(name)elifisinstance(value,BackwardOneToOneRelation):self.backward_o2o_fields.add(name)elifisinstance(value,BackwardFKRelation):self.backward_fk_fields.add(name)field_filters=get_filters_for_field(field_name=name,field=value,source_field=value.source_fieldorname)self._filters.update(field_filters)self.finalise_fields()@propertydefdb(self)->BaseDBAsyncClient:ifself.default_connectionisNone:raiseConfigurationError(f"default_connection for the model {self._model} cannot be None")returnconnections.get(self.default_connection)@propertydefordering(self)->Tuple[Tuple[str,Order],...]:ifnotself._ordering_validated:unknown_fields={fforf,_inself._default_ordering}-self.fieldsraiseConfigurationError(f"Unknown fields {','.join(unknown_fields)} in "f"default ordering for model {self._model.__name__}")returnself._default_orderingdefget_filter(self,key:str)->FilterInfoDict:returnself.filters[key]deffinalise_model(self)->None:""" Finalise the model after it had been fully loaded. """self.finalise_fields()self._generate_filters()self._generate_lazy_fk_m2m_fields()self._generate_db_fields()deffinalise_fields(self)->None:self.db_fields=set(self.fields_db_projection.values())self.fields=set(self.fields_map.keys())self.fields_db_projection_reverse={value:keyforkey,valueinself.fields_db_projection.items()}self.fetch_fields=(self.m2m_fields|self.backward_fk_fields|self.fk_fields|self.backward_o2o_fields|self.o2o_fields)generated_fields=[(field.source_fieldorfield.model_field_name)forfieldinself.fields_map.values()iffield.generated]self.generated_db_fields=tuple(generated_fields)self._ordering_validated=Trueforfield_name,_inself._default_ordering:iffield_name.split("__")[0]notinself.fields:self._ordering_validated=Falsebreakdef_generate_lazy_fk_m2m_fields(self)->None:# Create lazy FK fields on model.forkeyinself.fk_fields:_key=f"_{key}"fk_field_object:ForeignKeyFieldInstance=self.fields_map[key]# type: ignorerelation_field=cast(str,fk_field_object.source_field)to_field=fk_field_object.to_field_instance.model_field_nameproperty_kwargs:FkSetterKwargs=dict(_key=_key,relation_field=relation_field,to_field=to_field,)setattr(self._model,key,property(partial(_fk_getter,ftype=fk_field_object.related_model,**property_kwargs,),partial(_fk_setter,**property_kwargs,),partial(_fk_setter,value=None,**property_kwargs,),),)# Create lazy reverse FK fields on model.forkeyinself.backward_fk_fields:_key=f"_{key}"backward_fk_field_object:BackwardFKRelation=self.fields_map[key]# type: ignoresetattr(self._model,key,property(partial(_rfk_getter,_key=_key,ftype=backward_fk_field_object.related_model,frelfield=backward_fk_field_object.relation_field,from_field=backward_fk_field_object.to_field_instance.model_field_name,)),)# Create lazy one to one fields on model.forkeyinself.o2o_fields:_key=f"_{key}"o2o_field_object=cast(OneToOneFieldInstance,self.fields_map[key])relation_field=cast(str,o2o_field_object.source_field)to_field=o2o_field_object.to_field_instance.model_field_nameproperty_kwargs=dict(_key=_key,relation_field=relation_field,to_field=to_field,)setattr(self._model,key,property(partial(_fk_getter,ftype=o2o_field_object.related_model,**property_kwargs,),partial(_fk_setter,**property_kwargs,),partial(_fk_setter,value=None,**property_kwargs,),),)# Create lazy reverse one to one fields on model.forkeyinself.backward_o2o_fields:_key=f"_{key}"backward_o2o_field_object:BackwardOneToOneRelation=self.fields_map[# type: ignorekey]setattr(self._model,key,property(partial(_ro2o_getter,_key=_key,ftype=backward_o2o_field_object.related_model,frelfield=backward_o2o_field_object.relation_field,from_field=backward_o2o_field_object.to_field_instance.model_field_name,),),)# Create lazy M2M fields on model.forkeyinself.m2m_fields:_key=f"_{key}"field_object=cast(ManyToManyFieldInstance,self.fields_map[key])setattr(self._model,key,property(partial(_m2m_getter,_key=_key,field_object=field_object)),)def_generate_db_fields(self)->None:self.db_default_fields.clear()self.db_complex_fields.clear()self.db_native_fields.clear()forkeyinself.db_fields:model_field=self.fields_db_projection_reverse[key]field=self.fields_map[model_field]is_native_field_type=field.field_typeinself.db.executor_class.DB_NATIVEdefault_converter=field.__class__.to_python_valueisField.to_python_valueifis_native_field_typeand(default_converterorfield.skip_to_python_if_native):self.db_native_fields.append((key,model_field,field))elifdefault_converter:self.db_default_fields.append((key,model_field,field))else:self.db_complex_fields.append((key,model_field,field))def_generate_filters(self)->None:get_overridden_filter_func=self.db.executor_class.get_overridden_filter_funcforkey,filter_infoinself._filters.items():overridden_operator=get_overridden_filter_func(filter_func=filter_info["operator"])ifoverridden_operator:filter_info=copy(filter_info)filter_info["operator"]=overridden_operatorself.filters[key]=filter_infoclassModelMeta(type):__slots__=()def__new__(mcs,name:str,bases:Tuple[Type,...],attrs:dict):fields_db_projection:Dict[str,str]={}fields_map:Dict[str,Field]={}filters:Dict[str,FilterInfoDict]={}fk_fields:Set[str]=set()m2m_fields:Set[str]=set()o2o_fields:Set[str]=set()meta_class:"Model.Meta"=attrs.get("Meta",type("Meta",(),{}))pk_attr:str="id"# Searching for Field attributes in the class hierarchydef__search_for_field_attributes(base:Type,attrs:dict)->None:""" Searching for class attributes of type fields.Field in the given class. If an attribute of the class is an instance of fields.Field, then it will be added to the fields dict. But only, if the key is not already in the dict. So derived classes have a higher precedence. Multiple Inheritance is supported from left to right. After checking the given class, the function will look into the classes according to the MRO (method resolution order). The MRO is 'natural' order, in which python traverses methods and fields. For more information on the magic behind check out: `The Python 2.3 Method Resolution Order <https://www.python.org/download/releases/2.3/mro/>`_. """forparentinbase.__mro__[1:]:__search_for_field_attributes(parent,attrs)meta=getattr(base,"_meta",None)ifmeta:# For abstract classesforkey,valueinmeta.fields_map.items():attrs[key]=value# For abstract classes managerforkey,valueinbase.__dict__.items():ifisinstance(value,Manager)andkeynotinattrs:attrs[key]=value.__class__()else:# For mixin classesforkey,valueinbase.__dict__.items():ifisinstance(value,Field)andkeynotinattrs:attrs[key]=value# Start searching for fields in the base classes.inherited_attrs:dict={}forbaseinbases:__search_for_field_attributes(base,inherited_attrs)ifinherited_attrs:# Ensure that the inherited fields are before the defined ones.attrs={**inherited_attrs,**attrs}ifname!="Model":custom_pk_present=Falseforkey,valueinattrs.items():ifisinstance(value,Field):ifvalue.pk:ifcustom_pk_present:raiseConfigurationError(f"Can't create model {name} with two primary keys,"" only single primary key is supported")ifvalue.generatedandnotvalue.allows_generated:raiseConfigurationError(f"Field '{key}' ({value.__class__.__name__}) can't be DB-generated")custom_pk_present=Truepk_attr=keyifnotcustom_pk_presentandnotgetattr(meta_class,"abstract",None):if"id"notinattrs:attrs={"id":IntField(primary_key=True),**attrs}ifnotisinstance(attrs["id"],Field)ornotattrs["id"].pk:raiseConfigurationError(f"Can't create model {name} without explicit primary key if field 'id'"" already present")forkey,valueinattrs.items():ifisinstance(value,Field):ifgetattr(meta_class,"abstract",None):value=deepcopy(value)fields_map[key]=valuevalue.model_field_name=keyifisinstance(value,OneToOneFieldInstance):o2o_fields.add(key)elifisinstance(value,ForeignKeyFieldInstance):fk_fields.add(key)elifisinstance(value,ManyToManyFieldInstance):m2m_fields.add(key)else:fields_db_projection[key]=value.source_fieldorkeyfield,source_field=fields_map[key],fields_db_projection[key]filters.update(get_filters_for_field(field_name=key,field=field,source_field=source_field))ifvalue.pk:filters.update(get_filters_for_field(field_name="pk",field=field,source_field=source_field))# Clean the class attributesforslotinfields_map:attrs.pop(slot,None)attrs["_meta"]=meta=MetaInfo(meta_class)meta.fields_map=fields_mapmeta.fields_db_projection=fields_db_projectionmeta._filters=filtersmeta.fk_fields=fk_fieldsmeta.backward_fk_fields=set()meta.o2o_fields=o2o_fieldsmeta.backward_o2o_fields=set()meta.m2m_fields=m2m_fieldsmeta.default_connection=Nonemeta.pk_attr=pk_attrmeta.pk=fields_map.get(pk_attr)# type: ignoreifmeta.pk:meta.db_pk_column=meta.pk.source_fieldormeta.pk_attrmeta._inited=Falseifnotfields_map:meta.abstract=Truenew_class=super().__new__(mcs,name,bases,attrs)forfieldinmeta.fields_map.values():field.model=new_class# type: ignoreforfname,commentin_get_comments(new_class).items():# type: ignoreiffnameinfields_map:fields_map[fname].docstring=commentiffields_map[fname].descriptionisNone:fields_map[fname].description=comment.split("\n")[0]ifnew_class.__doc__andnotmeta.table_description:meta.table_description=inspect.cleandoc(new_class.__doc__).split("\n")[0]forkey,valueinattrs.items():ifisinstance(value,Manager):value._model=new_classmeta._model=new_class# type: ignoremeta.manager._model=new_classmeta.finalise_fields()returnnew_classdef__getitem__(cls:Type[MODEL],key:Any)->QuerySetSingle[MODEL]:# type: ignorereturncls._getbypk(key)# type: ignore
[docs]classModel(metaclass=ModelMeta):""" Base class for all Tortoise ORM Models. """# I don' like this here, but it makes auto completion and static analysis much happier_meta=MetaInfo(None)# type: ignore_listeners:Dict[Signals,Dict[Type[MODEL],List[Callable]]]={# type: ignoreSignals.pre_save:{},Signals.post_save:{},Signals.pre_delete:{},Signals.post_delete:{},}def__init__(self,**kwargs:Any)->None:# self._meta is a very common attribute lookup, lets cache it.meta=self._metaself._partial=Falseself._saved_in_db=Falseself._custom_generated_pk=Falseself._await_when_save:Dict[str,Callable[[],Awaitable[Any]]]={}# Assign defaults for missing fieldsforkeyinmeta.fields.difference(self._set_kwargs(kwargs)):field_object=meta.fields_map[key]field_default=field_object.defaultifinspect.iscoroutinefunction(field_default):self._await_when_save[key]=field_defaultelifcallable(field_default):setattr(self,key,field_default())else:setattr(self,key,deepcopy(field_object.default))def__setattr__(self,key,value):# set field value override async default functionifhasattr(self,"_await_when_save"):self._await_when_save.pop(key,None)super().__setattr__(key,value)def_set_kwargs(self,kwargs:dict)->Set[str]:meta=self._meta# Assign values and do type conversionspassed_fields={*kwargs.keys()}|meta.fetch_fieldsforkey,valueinkwargs.items():ifkeyinmeta.fk_fieldsorkeyinmeta.o2o_fields:ifvalueandnotvalue._saved_in_db:raiseOperationalError(f"You should first call .save() on {value} before referring to it")setattr(self,key,value)passed_fields.add(meta.fields_map[key].source_field)elifkeyinmeta.fields_db_projection:field_object=meta.fields_map[key]iffield_object.pkandfield_object.generated:self._custom_generated_pk=TrueifvalueisNoneandnotfield_object.null:raiseValueError(f"{key} is non nullable field, but null was passed")setattr(self,key,field_object.to_python_value(value))elifkeyinmeta.backward_fk_fields:raiseConfigurationError("You can't set backward relations through init, change related model instead")elifkeyinmeta.backward_o2o_fields:raiseConfigurationError("You can't set backward one to one relations through init,"" change related model instead")elifkeyinmeta.m2m_fields:raiseConfigurationError("You can't set m2m relations through init, use m2m_manager instead")returnpassed_fields@classmethoddef_init_from_db(cls:Type[MODEL],**kwargs:Any)->MODEL:self=cls.__new__(cls)self._partial=Falseself._saved_in_db=Trueself._custom_generated_pk=self._meta.db_pk_columnnotinself._meta.generated_db_fieldsself._await_when_save={}meta=self._metainited_keys:Set[str]=set()try:# This is like so for performance reasons.# We want to avoid conditionals and calling .to_python_value()# Native fields are fields that are already converted to/from python to DB type# by the DB driverforkey,model_field,fieldinmeta.db_native_fields:setattr(self,model_field,kwargs[key])inited_keys.add(key)# Fields that don't override .to_python_value() are converted without a call# as we already know what we will be doing.forkey,model_field,fieldinmeta.db_default_fields:if(value:=kwargs[key])isnotNone:value=field.field_type(value)setattr(self,model_field,value)inited_keys.add(key)# These fields need manual .to_python_value()forkey,model_field,fieldinmeta.db_complex_fields:setattr(self,model_field,field.to_python_value(kwargs[key]))inited_keys.add(key)exceptKeyError:self._partial=Truenative_fields:List[Field]=[ffor*_,finmeta.db_native_fields]default_fields=complex_fields=Noneforkey,valueinkwargs.items():ifkeyininited_keysorkeynotinmeta.fields_map:continueif(field:=meta.fields_map[key])notinnative_fields:ifdefault_fieldsisNone:default_fields=[ffor*_,finmeta.db_default_fields]iffieldindefault_fields:ifvalueisnotNone:value=field.field_type(value)else:ifcomplex_fieldsisNone:complex_fields=[ffor*_,finmeta.db_complex_fields]value=field.to_python_value(value)setattr(self,key,value)returnselfdef__str__(self)->str:returnf"<{self.__class__.__name__}>"def__repr__(self)->str:ifself.pk:returnf"<{self.__class__.__name__}: {self.pk}>"returnf"<{self.__class__.__name__}>"def__hash__(self)->int:ifnotself.pk:raiseTypeError("Model instances without id are unhashable")returnhash(self.pk)def__iter__(self):forfieldinself._meta.db_fields:yieldfield,getattr(self,field)def__eq__(self,other:object)->bool:returntype(other)istype(self)andself.pk==other.pk# type: ignoredef_get_pk_val(self)->Any:returngetattr(self,self._meta.pk_attr,None)def_set_pk_val(self,value:Any)->None:setattr(self,self._meta.pk_attr,value)pk=property(_get_pk_val,_set_pk_val)""" Alias to the models Primary Key. Can be used as a field name when doing filtering e.g. ``.filter(pk=...)`` etc... """@classmethodasyncdef_getbypk(cls:Type[MODEL],key:Any)->MODEL:try:returnawaitcls.get(pk=key)except(DoesNotExist,ValueError):raiseObjectDoesNotExistError(cls,cls._meta.pk_attr,key)
[docs]defclone(self:MODEL,pk:Any=EMPTY)->MODEL:""" Create a new clone of the object that when you do a ``.save()`` will create a new record. :param pk: An optionally required value if the model doesn't generate its own primary key. Any value you specify here will always be used. :return: A copy of the current object without primary key information. :raises ParamsError: If pk is required but not provided. """obj=copy(self)ifpkisEMPTY:pk_field:Field=self._meta.pkifpk_field.generatedisFalseandpk_field.defaultisNone:raiseParamsError(f"{self._meta.full_name} requires explicit primary key. Please use .clone(pk=<value>)")else:obj.pk=Noneelse:obj.pk=pkobj._saved_in_db=Falsereturnobj
[docs]defupdate_from_dict(self:MODEL,data:dict)->MODEL:""" Updates the current model with the provided dict. This can allow mass-updating a model from a dict, also ensuring that datatype conversions happen. This will ignore any extra fields, and NOT update the model with them, but will raise errors on bad types or updating Many-instance relations. :param data: The parameters you want to update in a dict format :return: The current model instance :raises ConfigurationError: When attempting to update a remote instance (e.g. a reverse ForeignKey or ManyToMany relation) :raises ValueError: When a passed parameter is not type compatible """self._set_kwargs(data)returnself
[docs]@classmethoddefregister_listener(cls,signal:Signals,listener:Callable):""" Register listener to current model class for special Signal. :param signal: one of tortoise.signals.Signals :param listener: callable listener :raises ConfigurationError: When listener is not callable """ifnotcallable(listener):raiseConfigurationError("Signal listener must be callable!")cls_listeners=cls._listeners.get(signal).setdefault(cls,[])# type:ignoreiflistenernotincls_listeners:cls_listeners.append(listener)
asyncdef_set_async_default_field(self)->None:"""retrieve value from field's async default value"""ifhasattr(self,"_await_when_save"):fork,vinself._await_when_save.copy().items():setattr(self,k,awaitv())self._await_when_save={}asyncdef_wait_for_listeners(self,signal:Signals,*listener_args)->None:cls_listeners=self._listeners.get(signal,{}).get(self.__class__,[])listeners=[listener(self.__class__,self,*listener_args)forlistenerincls_listeners]awaitasyncio.gather(*listeners)asyncdef_pre_delete(self,using_db:Optional[BaseDBAsyncClient]=None)->None:awaitself._wait_for_listeners(Signals.pre_delete,using_db)asyncdef_post_delete(self,using_db:Optional[BaseDBAsyncClient]=None)->None:awaitself._wait_for_listeners(Signals.post_delete,using_db)asyncdef_pre_save(self,using_db:Optional[BaseDBAsyncClient]=None,update_fields:Optional[Iterable[str]]=None,)->None:awaitself._wait_for_listeners(Signals.pre_save,using_db,update_fields)asyncdef_post_save(self,using_db:Optional[BaseDBAsyncClient]=None,created:bool=False,update_fields:Optional[Iterable[str]]=None,)->None:awaitself._wait_for_listeners(Signals.post_save,created,using_db,update_fields)
[docs]asyncdefsave(self,using_db:Optional[BaseDBAsyncClient]=None,update_fields:Optional[Iterable[str]]=None,force_create:bool=False,force_update:bool=False,)->None:""" Creates/Updates the current model object. :param update_fields: If provided, it should be a tuple/list of fields by name. This is the subset of fields that should be updated. If the object needs to be created ``update_fields`` will be ignored. :param using_db: Specific DB connection to use instead of default bound :param force_create: Forces creation of the record :param force_update: Forces updating of the record :raises IncompleteInstanceError: If the model is partial and the fields are not available for persistence. :raises IntegrityError: If the model can't be created or updated (specifically if force_create or force_update has been set) """awaitself._set_async_default_field()db=using_dborself._choose_db(True)executor=db.executor_class(model=self.__class__,db=db)ifself._partial:ifupdate_fields:forfieldinupdate_fields:ifnothasattr(self,self._meta.pk_attr):raiseIncompleteInstanceError(f"{self.__class__.__name__} is a partial model without primary key fetchd. Partial update not available")ifnothasattr(self,field):raiseIncompleteInstanceError(f"{self.__class__.__name__} is a partial model, field '{field}' is not available")else:raiseIncompleteInstanceError(f"{self.__class__.__name__} is a partial model, can only be saved with the relevant update_field provided")awaitself._pre_save(db,update_fields)ifforce_create:awaitexecutor.execute_insert(self)created=Trueelifforce_update:rows=awaitexecutor.execute_update(self,update_fields)ifrows==0:raiseIntegrityError(f"Can't update object that doesn't exist. PK: {self.pk}")created=Falseelse:ifself._saved_in_dborupdate_fields:ifself.pkisNone:awaitexecutor.execute_insert(self)created=Trueelse:awaitexecutor.execute_update(self,update_fields)created=Falseelse:# TODO: Do a merge/upsert operation here instead. Let the executor determine an optimal strategy for each DB engine.awaitexecutor.execute_insert(self)created=Trueself._saved_in_db=Trueawaitself._post_save(db,created,update_fields)
[docs]asyncdefdelete(self,using_db:Optional[BaseDBAsyncClient]=None)->None:""" Deletes the current model object. :param using_db: Specific DB connection to use instead of default bound :raises OperationalError: If object has never been persisted. """db=using_dborself._choose_db(True)ifnotself._saved_in_db:raiseOperationalError("Can't delete unpersisted record")awaitself._pre_delete(db)awaitdb.executor_class(model=self.__class__,db=db).execute_delete(self)awaitself._post_delete(db)
[docs]asyncdeffetch_related(self,*args:Any,using_db:Optional[BaseDBAsyncClient]=None)->None:""" Fetch related fields. .. code-block:: python3 User.fetch_related("emails", "manager") :param args: The related fields that should be fetched. :param using_db: Specific DB connection to use instead of default bound """db=using_dborself._choose_db()awaitdb.executor_class(model=self.__class__,db=db).fetch_for_list([self],*args)
[docs]asyncdefrefresh_from_db(self,fields:Optional[Iterable[str]]=None,using_db:Optional[BaseDBAsyncClient]=None,)->None:""" Refresh latest data from db. When this method is called without arguments all db fields of the model are updated to the values currently present in the database. .. code-block:: python3 user.refresh_from_db(fields=['name']) :param fields: The special fields that to be refreshed. :param using_db: Specific DB connection to use instead of default bound. :raises OperationalError: If object has never been persisted. """ifnotself._saved_in_db:raiseOperationalError("Can't refresh unpersisted record")db=using_dborself._choose_db()qs=QuerySet(self.__class__).using_db(db).only(*(fieldsor[]))obj=awaitqs.get(pk=self.pk)forfieldinfieldsorself._meta.db_fields:setattr(self,field,getattr(obj,field,None))
@classmethoddef_choose_db(cls,for_write:bool=False):""" Return the connection that will be used if this query is executed now. :param for_write: Whether this query for write. :return: BaseDBAsyncClient: """iffor_write:db=router.db_for_write(cls)else:db=router.db_for_read(cls)returndborcls._meta.db
[docs]@classmethodasyncdefget_or_create(cls,defaults:Optional[dict]=None,using_db:Optional[BaseDBAsyncClient]=None,**kwargs:Any,)->Tuple[Self,bool]:""" Fetches the object if exists (filtering on the provided parameters), else creates an instance with any unspecified parameters as default values. :param defaults: Default values to be added to a created instance if it can't be fetched. :param using_db: Specific DB connection to use instead of default bound :param kwargs: Query parameters. :raises IntegrityError: If create failed :raises TransactionManagementError: If transaction error :raises ParamsError: If defaults conflict with kwargs """ifnotdefaults:defaults={}db=using_dborcls._choose_db(True)try:returnawaitcls.filter(**kwargs).using_db(db).get(),FalseexceptDoesNotExist:returnawaitcls._create_or_get(db,defaults,**kwargs)
@classmethodasyncdef_create_or_get(cls,db:BaseDBAsyncClient,defaults:dict,**kwargs)->Tuple[Self,bool]:"""Try to create, if fails with IntegrityError then try to get"""forkeyindefaults.keys()&kwargs.keys():if(default_value:=defaults[key])!=(query_value:=kwargs[key]):raiseParamsError(f"Conflict value with {key=}: {default_value=} vs {query_value=}")merged_defaults={**kwargs,**defaults}try:asyncwithin_transaction(connection_name=db.connection_name)asconnection:returnawaitcls.create(using_db=connection,**merged_defaults),TrueexceptIntegrityErrorasexc:try:returnawaitcls.filter(**kwargs).using_db(db).get(),FalseexceptDoesNotExist:passraiseexc@classmethoddef_db_queryset(cls,using_db:Optional[BaseDBAsyncClient]=None,for_write:bool=False)->QuerySet[Self]:db=using_dborcls._choose_db(for_write)returncls._meta.manager.get_queryset().using_db(db)
[docs]@classmethoddefselect_for_update(cls,nowait:bool=False,skip_locked:bool=False,of:Tuple[str,...]=(),using_db:Optional[BaseDBAsyncClient]=None,)->QuerySet[Self]:""" Make QuerySet select for update. Returns a queryset that will lock rows until the end of the transaction, generating a SELECT ... FOR UPDATE SQL statement on supported databases. """returncls._db_queryset(using_db,for_write=True).select_for_update(nowait,skip_locked,of)
[docs]@classmethodasyncdefupdate_or_create(cls:Type[MODEL],defaults:Optional[dict]=None,using_db:Optional[BaseDBAsyncClient]=None,**kwargs:Any,)->Tuple[MODEL,bool]:""" A convenience method for updating an object with the given kwargs, creating a new one if necessary. :param defaults: Default values used to update the object. :param using_db: Specific DB connection to use instead of default bound :param kwargs: Query parameters. """ifnotdefaults:defaults={}db=using_dborcls._choose_db(True)asyncwithin_transaction(connection_name=db.connection_name)asconnection:instance=awaitcls.select_for_update().using_db(connection).get_or_none(**kwargs)ifinstance:awaitinstance.update_from_dict(defaults).save(using_db=connection)returninstance,Falsereturnawaitcls._create_or_get(db,defaults,**kwargs)
[docs]@classmethodasyncdefcreate(cls:Type[MODEL],using_db:Optional[BaseDBAsyncClient]=None,**kwargs:Any)->MODEL:""" Create a record in the DB and returns the object. .. code-block:: python3 user = await User.create(name="...", email="...") Equivalent to: .. code-block:: python3 user = User(name="...", email="...") await user.save() :param using_db: Specific DB connection to use instead of default bound :param kwargs: Model parameters. """instance=cls(**kwargs)instance._saved_in_db=Falsedb=using_dborcls._choose_db(True)awaitinstance.save(using_db=db,force_create=True)returninstance
[docs]@classmethoddefbulk_update(cls:Type[MODEL],objects:Iterable[MODEL],fields:Iterable[str],batch_size:Optional[int]=None,using_db:Optional[BaseDBAsyncClient]=None,)->"BulkUpdateQuery[MODEL]":""" Update the given fields in each of the given objects in the database. This method efficiently updates the given fields on the provided model instances, generally with one query. .. code-block:: python3 users = [ await User.create(name="...", email="..."), await User.create(name="...", email="...") ] users[0].name = 'name1' users[1].name = 'name2' await User.bulk_update(users, fields=['name']) :param objects: List of objects to bulk create :param fields: The fields to update :param batch_size: How many objects are created in a single query :param using_db: Specific DB connection to use instead of default bound """returncls._db_queryset(using_db,for_write=True).bulk_update(objects,fields,batch_size)
[docs]@classmethodasyncdefin_bulk(cls:Type[MODEL],id_list:Iterable[Union[str,int]],field_name:str="pk",using_db:Optional[BaseDBAsyncClient]=None,)->Dict[str,MODEL]:""" Return a dictionary mapping each of the given IDs to the object with that ID. If `id_list` isn't provided, evaluate the entire QuerySet. :param id_list: A list of field values :param field_name: Must be a unique field :param using_db: Specific DB connection to use instead of default bound """returnawaitcls._db_queryset(using_db).in_bulk(id_list,field_name)
[docs]@classmethoddefbulk_create(cls:Type[MODEL],objects:Iterable[MODEL],batch_size:Optional[int]=None,ignore_conflicts:bool=False,update_fields:Optional[Iterable[str]]=None,on_conflict:Optional[Iterable[str]]=None,using_db:Optional[BaseDBAsyncClient]=None,)->"BulkCreateQuery[MODEL]":""" Bulk insert operation: .. note:: The bulk insert operation will do the minimum to ensure that the object created in the DB has all the defaults and generated fields set, but may be incomplete reference in Python. e.g. ``IntField`` primary keys will not be populated. This is recommended only for throw away inserts where you want to ensure optimal insert performance. .. code-block:: python3 User.bulk_create([ User(name="...", email="..."), User(name="...", email="...") ]) :param on_conflict: On conflict index name :param update_fields: Update fields when conflicts :param ignore_conflicts: Ignore conflicts when inserting :param objects: List of objects to bulk create :param batch_size: How many objects are created in a single query :param using_db: Specific DB connection to use instead of default bound """returncls._db_queryset(using_db,for_write=True).bulk_create(objects,batch_size,ignore_conflicts,update_fields,on_conflict)
[docs]@classmethoddeffirst(cls,using_db:Optional[BaseDBAsyncClient]=None)->QuerySetSingle[Optional[Self]]:""" Generates a QuerySet that returns the first record. """returncls._db_queryset(using_db).first()
[docs]@classmethoddeffilter(cls,*args:Q,**kwargs:Any)->QuerySet[Self]:""" Generates a QuerySet with the filter applied. :param args: Q functions containing constraints. Will be AND'ed. :param kwargs: Simple filter constraints. """returncls._meta.manager.get_queryset().filter(*args,**kwargs)
[docs]@classmethoddefexclude(cls,*args:Q,**kwargs:Any)->QuerySet[Self]:""" Generates a QuerySet with the exclude applied. :param args: Q functions containing constraints. Will be AND'ed. :param kwargs: Simple filter constraints. """returncls._meta.manager.get_queryset().exclude(*args,**kwargs)
[docs]@classmethoddefannotate(cls,**kwargs:Union[Function,Term])->QuerySet[Self]:""" Annotates the result set with extra Functions/Aggregations/Expressions. :param kwargs: Parameter name and the Function/Aggregation to annotate with. """returncls._meta.manager.get_queryset().annotate(**kwargs)
[docs]@classmethoddefall(cls,using_db:Optional[BaseDBAsyncClient]=None)->QuerySet[Self]:""" Returns the complete QuerySet. """returncls._db_queryset(using_db)
[docs]@classmethoddefget(cls,*args:Q,using_db:Optional[BaseDBAsyncClient]=None,**kwargs:Any)->QuerySetSingle[Self]:""" Fetches a single record for a Model type using the provided filter parameters. .. code-block:: python3 user = await User.get(username="foo") :param using_db: The DB connection to use :param args: Q functions containing constraints. Will be AND'ed. :param kwargs: Simple filter constraints. :raises MultipleObjectsReturned: If provided search returned more than one object. :raises DoesNotExist: If object can not be found. """returncls._db_queryset(using_db).get(*args,**kwargs)
[docs]@classmethoddefraw(cls,sql:str,using_db:Optional[BaseDBAsyncClient]=None)->"RawSQLQuery":""" Executes a RAW SQL and returns the result .. code-block:: python3 result = await User.raw("select * from users where name like '%test%'") :param using_db: The specific DB connection to use :param sql: The raw sql. """returncls._db_queryset(using_db).raw(sql)
[docs]@classmethoddefexists(cls:Type[MODEL],*args:Q,using_db:Optional[BaseDBAsyncClient]=None,**kwargs:Any)->ExistsQuery:""" Return True/False whether record exists with the provided filter parameters. .. code-block:: python3 result = await User.exists(username="foo") :param using_db: The specific DB connection to use. :param args: Q functions containing constraints. Will be AND'ed. :param kwargs: Simple filter constraints. """returncls._db_queryset(using_db).filter(*args,**kwargs).exists()
[docs]@classmethoddefget_or_none(cls,*args:Q,using_db:Optional[BaseDBAsyncClient]=None,**kwargs:Any)->QuerySetSingle[Optional[Self]]:""" Fetches a single record for a Model type using the provided filter parameters or None. .. code-block:: python3 user = await User.get_or_none(username="foo") :param using_db: The specific DB connection to use. :param args: Q functions containing constraints. Will be AND'ed. :param kwargs: Simple filter constraints. """returncls._db_queryset(using_db).get_or_none(*args,**kwargs)
[docs]@classmethodasyncdeffetch_for_list(cls,instance_list:"Iterable[Model]",*args:Any,using_db:Optional[BaseDBAsyncClient]=None,)->None:""" Fetches related models for provided list of Model objects. :param instance_list: List of Model objects to fetch relations for. :param args: Relation names to fetch. :param using_db: DO NOT USE """db=using_dborcls._choose_db()awaitdb.executor_class(model=cls,db=db).fetch_for_list(instance_list,*args)
@classmethoddef_check(cls)->None:""" Calls various checks to validate the model. :raises ConfigurationError: If the model has not been configured correctly. """cls._check_together("unique_together")cls._check_together("indexes")@classmethoddef_check_together(cls,together:str)->None:""" Check the value of "unique_together" option. :raises ConfigurationError: If the model has not been configured correctly. """_together=getattr(cls._meta,together)ifnotisinstance(_together,(tuple,list)):raiseConfigurationError(f"'{cls.__name__}.{together}' must be a list or tuple.")ifany(notisinstance(unique_fields,(tuple,list,Index))forunique_fieldsin_together):raiseConfigurationError(f"All '{cls.__name__}.{together}' elements must be lists or tuples.")forfields_tuplein_together:ifisinstance(fields_tuple,Index):fields_tuple=fields_tuple.fieldsforfield_nameinfields_tuple:field=cls._meta.fields_map.get(field_name)ifnotfield:raiseConfigurationError(f"'{cls.__name__}.{together}' has no '{field_name}' field.")ifisinstance(field,ManyToManyFieldInstance):raiseConfigurationError(f"'{cls.__name__}.{together}' '{field_name}' field refers"" to ManyToMany field.")
[docs]@classmethoddefdescribe(cls,serializable:bool=True)->dict:""" Describes the given list of models or ALL registered models. :param serializable: ``False`` if you want raw python objects, ``True`` for JSON-serializable data. (Defaults to ``True``) :return: A dictionary containing the model description. The base dict has a fixed set of keys that reference a list of fields (or a single field in the case of the primary key): .. code-block:: python3 { "name": str # Qualified model name "app": str # 'App' namespace "table": str # DB table name "abstract": bool # Is the model Abstract? "description": str # Description of table (nullable) "docstring": str # Model docstring (nullable) "unique_together": [...] # List of List containing field names that # are unique together "pk_field": {...} # Primary key field "data_fields": [...] # Data fields "fk_fields": [...] # Foreign Key fields FROM this model "backward_fk_fields": [...] # Foreign Key fields TO this model "o2o_fields": [...] # OneToOne fields FROM this model "backward_o2o_fields": [...] # OneToOne fields TO this model "m2m_fields": [...] # Many-to-Many fields } Each field is specified as defined in :meth:`tortoise.fields.base.Field.describe` """return{"name":cls._meta.full_name,"app":cls._meta.app,"table":cls._meta.db_table,"abstract":cls._meta.abstract,"description":cls._meta.table_descriptionorNone,"docstring":inspect.cleandoc(cls.__doc__or"")orNone,"unique_together":cls._meta.unique_togetheror[],"indexes":cls._meta.indexesor[],"pk_field":cls._meta.fields_map[cls._meta.pk_attr].describe(serializable),"data_fields":[field.describe(serializable)forname,fieldincls._meta.fields_map.items()ifname!=cls._meta.pk_attrandnamein(cls._meta.fields-cls._meta.fetch_fields)],"fk_fields":[field.describe(serializable)forname,fieldincls._meta.fields_map.items()ifnameincls._meta.fk_fields],"backward_fk_fields":[field.describe(serializable)forname,fieldincls._meta.fields_map.items()ifnameincls._meta.backward_fk_fields],"o2o_fields":[field.describe(serializable)forname,fieldincls._meta.fields_map.items()ifnameincls._meta.o2o_fields],"backward_o2o_fields":[field.describe(serializable)forname,fieldincls._meta.fields_map.items()ifnameincls._meta.backward_o2o_fields],"m2m_fields":[field.describe(serializable)forname,fieldincls._meta.fields_map.items()ifnameincls._meta.m2m_fields],}
[docs]classMeta:""" The ``Meta`` class is used to configure metadata for the Model. Usage: .. code-block:: python3 class Foo(Model): ... class Meta: table="custom_table" unique_together=(("field_a", "field_b"), ) """