importtypesfromcopyimportcopyfromtypingimport(TYPE_CHECKING,Any,AsyncIterator,Callable,Dict,Generator,Generic,Iterable,List,Optional,Set,Tuple,Type,TypeVar,Union,cast,overload,)frompypikaimportJoinType,Order,Tablefrompypika.analyticsimportCountfrompypika.functionsimportCastfrompypika.queriesimportQueryBuilderfrompypika.termsimportCase,Field,Term,ValueWrapperfromtyping_extensionsimportLiteral,Protocolfromtortoise.backends.base.clientimportBaseDBAsyncClient,Capabilitiesfromtortoise.exceptionsimport(DoesNotExist,FieldError,IntegrityError,MultipleObjectsReturned,ParamsError,)fromtortoise.expressionsimportExpression,F,Q,RawSQLfromtortoise.fields.relationalimport(ForeignKeyFieldInstance,OneToOneFieldInstance,RelationalField,)fromtortoise.filtersimportFilterInfoDictfromtortoise.functionsimportFunctionfromtortoise.query_utilsimportPrefetch,QueryModifier,_get_joins_for_related_fieldfromtortoise.routerimportrouterfromtortoise.utilsimportchunk# Empty placeholder - Should never be edited.QUERY:QueryBuilder=QueryBuilder()ifTYPE_CHECKING:# pragma: nocoveragefromtortoise.modelsimportModelMODEL=TypeVar("MODEL",bound="Model")T_co=TypeVar("T_co",covariant=True)SINGLE=TypeVar("SINGLE",bound=bool)
[docs]classQuerySetSingle(Protocol[T_co]):""" Awaiting on this will resolve a single instance of the Model object, and not a sequence. """# pylint: disable=W0104def__await__(self)->Generator[Any,None,T_co]:...# pragma: nocoveragedefprefetch_related(self,*args:Union[str,Prefetch])->"QuerySetSingle[T_co]":...# pragma: nocoveragedefselect_related(self,*args:str)->"QuerySetSingle[T_co]":...# pragma: nocoveragedefannotate(self,**kwargs:Function)->"QuerySetSingle[T_co]":...# pragma: nocoveragedefonly(self,*fields_for_select:str)->"QuerySetSingle[T_co]":...# pragma: nocoveragedefvalues_list(self,*fields_:str,flat:bool=False)->"ValuesListQuery[Literal[True]]":...# pragma: nocoveragedefvalues(self,*args:str,**kwargs:str)->"ValuesQuery[Literal[True]]":...# pragma: nocoverage
classAwaitableQuery(Generic[MODEL]):__slots__=("_joined_tables","query","model","_db","capabilities","_annotations",)def__init__(self,model:Type[MODEL])->None:self._joined_tables:List[Table]=[]self.model:"Type[MODEL]"=modelself.query:QueryBuilder=QUERYself._db:BaseDBAsyncClient=None# type: ignoreself.capabilities:Capabilities=model._meta.db.capabilitiesself._annotations:Dict[str,Expression]={}def_choose_db(self,for_write:bool=False)->BaseDBAsyncClient:""" Return the connection that will be used if this query is executed now. :return: BaseDBAsyncClient: """ifself._db:returnself._dbiffor_write:db=router.db_for_write(self.model)else:db=router.db_for_read(self.model)returndborself.model._meta.dbdefresolve_filters(self,model:"Type[Model]",q_objects:List[Q],annotations:Dict[str,Any],custom_filters:Dict[str,FilterInfoDict],)->None:""" Builds the common filters for a QuerySet. :param model: The Model this queryset is based on. :param q_objects: The Q expressions to apply. :param annotations: Extra annotations to add. :param custom_filters: Pre-resolved filters to be passed through. """has_aggregate=self._resolve_annotate(annotations)modifier=QueryModifier()fornodeinq_objects:node._annotations=annotationsnode._custom_filters=custom_filtersmodifier&=node.resolve(model,model._meta.basetable)where_criterion,joins,having_criterion=modifier.get_query_modifiers()forjoininjoins:ifjoin[0]notinself._joined_tables:self.query=self.query.join(join[0],how=JoinType.left_outer).on(join[1])self._joined_tables.append(join[0])self.query._wheres=where_criterionself.query._havings=having_criterionifhas_aggregateand(self._joined_tablesorhaving_criterionorself.query._orderbys):self.query=self.query.groupby(*[self.model._meta.basetable[field]forfieldinself.model._meta.db_fields])def_join_table_by_field(self,table:Table,related_field_name:str,related_field:RelationalField)->Table:joins=_get_joins_for_related_field(table,related_field,related_field_name)forjoininjoins:ifjoin[0]notinself._joined_tables:self.query=self.query.join(join[0],how=JoinType.left_outer).on(join[1])self._joined_tables.append(join[0])returnjoins[-1][0]@staticmethoddef_resolve_ordering_string(ordering:str)->Tuple[str,Order]:order_type=Order.ascifordering[0]=="-":field_name=ordering[1:]order_type=Order.descelse:field_name=orderingreturnfield_name,order_typedefresolve_ordering(self,model:"Type[Model]",table:Table,orderings:Iterable[Tuple[str,str]],annotations:Dict[str,Any],)->None:""" Applies standard ordering to QuerySet. :param model: The Model this queryset is based on. :param table: ``pypika.Table`` to keep track of the virtual SQL table (to allow self referential joins) :param orderings: What columns/order to order by :param annotations: Annotations that may be ordered on :raises FieldError: If a field provided does not exist in model. """# Do not apply default ordering for annotated queries to not mess them upifnotorderingsandself.model._meta.orderingandnotannotations:orderings=self.model._meta.orderingfororderinginorderings:field_name=ordering[0]iffield_nameinmodel._meta.fetch_fields:raiseFieldError("Filtering by relation is not possible. Filter by nested field of related model")related_field_name,__,forwarded=field_name.partition("__")ifrelated_field_nameinmodel._meta.fetch_fields:related_field=cast(RelationalField,model._meta.fields_map[related_field_name])related_table=self._join_table_by_field(table,related_field_name,related_field)self.resolve_ordering(related_field.related_model,related_table,[(forwarded,ordering[1])],{},)eliffield_nameinannotations:annotation=annotations[field_name]ifisinstance(annotation,Term):self.query=self.query.orderby(annotation,order=ordering[1])else:annotation_info=annotation.resolve(self.model,table)self.query=self.query.orderby(annotation_info["field"],order=ordering[1])else:field_object=model._meta.fields_map.get(field_name)ifnotfield_object:raiseFieldError(f"Unknown field {field_name} for model {model.__name__}")field_name=field_object.source_fieldorfield_namefield=table[field_name]func=field_object.get_for_dialect(model._meta.db.capabilities.dialect,"function_cast")iffunc:field=func(field_object,field)self.query=self.query.orderby(field,order=ordering[1])def_resolve_annotate(self,extra_annotations:Dict[str,Any])->bool:ifnotself._annotationsandnotextra_annotations:returnFalsetable=self.model._meta.basetableall_annotations={**self._annotations,**extra_annotations}annotation_info={}forkey,annotationinall_annotations.items():ifisinstance(annotation,Term):annotation_info[key]={"joins":[],"field":annotation}else:annotation_info[key]=annotation.resolve(self.model,table)forkey,infoinannotation_info.items():forjoinininfo["joins"]:self._join_table_by_field(*join)ifkeyinself._annotations:self.query._select_other(info["field"].as_(key))returnany(info["field"].is_aggregateforinfoinannotation_info.values())defsql(self,**kwargs)->str:"""Return the actual SQL."""returnself.as_query().get_sql(**kwargs)defas_query(self)->QueryBuilder:"""Return the actual query."""ifself._dbisNone:self._db=self._choose_db()# type: ignoreself._make_query()returnself.querydef_make_query(self)->None:raiseNotImplementedError()# pragma: nocoverageasyncdef_execute(self)->Any:raiseNotImplementedError()# pragma: nocoverage
[docs]classQuerySet(AwaitableQuery[MODEL]):__slots__=("fields","_prefetch_map","_prefetch_queries","_single","_raise_does_not_exist","_db","_limit","_offset","_fields_for_select","_filter_kwargs","_orderings","_q_objects","_distinct","_having","_custom_filters","_group_bys","_select_for_update","_select_for_update_nowait","_select_for_update_skip_locked","_select_for_update_of","_select_related","_select_related_idx","_use_indexes","_force_indexes",)def__init__(self,model:Type[MODEL])->None:super().__init__(model)self.fields:Set[str]=model._meta.db_fieldsself._prefetch_map:Dict[str,Set[Union[str,Prefetch]]]={}self._prefetch_queries:Dict[str,List[Tuple[Optional[str],QuerySet]]]={}self._single:bool=Falseself._raise_does_not_exist:bool=Falseself._limit:Optional[int]=Noneself._offset:Optional[int]=Noneself._filter_kwargs:Dict[str,Any]={}self._orderings:List[Tuple[str,Any]]=[]self._q_objects:List[Q]=[]self._distinct:bool=Falseself._having:Dict[str,Any]={}self._custom_filters:Dict[str,FilterInfoDict]={}self._fields_for_select:Tuple[str,...]=()self._group_bys:Tuple[str,...]=()self._select_for_update:bool=Falseself._select_for_update_nowait:bool=Falseself._select_for_update_skip_locked:bool=Falseself._select_for_update_of:Set[str]=set()self._select_related:Set[str]=set()self._select_related_idx:List[Tuple["Type[Model]",int,str,"Type[Model]",Iterable[Optional[str]]]]=[]# format with: model,idx,model_name,parent_modelself._force_indexes:Set[str]=set()self._use_indexes:Set[str]=set()def_clone(self)->"QuerySet[MODEL]":queryset=self.__class__.__new__(self.__class__)queryset.fields=self.fieldsqueryset.model=self.modelqueryset.query=self.queryqueryset.capabilities=self.capabilitiesqueryset._prefetch_map=copy(self._prefetch_map)queryset._prefetch_queries=copy(self._prefetch_queries)queryset._single=self._singlequeryset._raise_does_not_exist=self._raise_does_not_existqueryset._db=self._dbqueryset._limit=self._limitqueryset._offset=self._offsetqueryset._fields_for_select=self._fields_for_selectqueryset._filter_kwargs=copy(self._filter_kwargs)queryset._orderings=copy(self._orderings)queryset._joined_tables=copy(self._joined_tables)queryset._q_objects=copy(self._q_objects)queryset._distinct=self._distinctqueryset._annotations=copy(self._annotations)queryset._having=copy(self._having)queryset._custom_filters=copy(self._custom_filters)queryset._group_bys=copy(self._group_bys)queryset._select_for_update=self._select_for_updatequeryset._select_for_update_nowait=self._select_for_update_nowaitqueryset._select_for_update_skip_locked=self._select_for_update_skip_lockedqueryset._select_for_update_of=self._select_for_update_ofqueryset._select_related=self._select_relatedqueryset._select_related_idx=self._select_related_idxqueryset._force_indexes=self._force_indexesqueryset._use_indexes=self._use_indexesreturnquerysetdef_filter_or_exclude(self,*args:Q,negate:bool,**kwargs:Any)->"QuerySet[MODEL]":queryset=self._clone()forarginargs:ifnotisinstance(arg,Q):raiseTypeError("expected Q objects as args")ifnegate:queryset._q_objects.append(~arg)else:queryset._q_objects.append(arg)forkey,valueinkwargs.items():ifnegate:queryset._q_objects.append(~Q(**{key:value}))else:queryset._q_objects.append(Q(**{key:value}))returnqueryset
[docs]deffilter(self,*args:Q,**kwargs:Any)->"QuerySet[MODEL]":""" Filters QuerySet by given kwargs. You can filter by related objects like this: .. code-block:: python3 Team.filter(events__tournament__name='Test') You can also pass Q objects to filters as args. """returnself._filter_or_exclude(negate=False,*args,**kwargs)
[docs]defexclude(self,*args:Q,**kwargs:Any)->"QuerySet[MODEL]":""" Same as .filter(), but with appends all args with NOT """returnself._filter_or_exclude(negate=True,*args,**kwargs)
[docs]deforder_by(self,*orderings:str)->"QuerySet[MODEL]":""" Accept args to filter by in format like this: .. code-block:: python3 .order_by('name', '-tournament__name') Supports ordering by related models too. A '-' before the name will result in descending sort order, default is ascending. :raises FieldError: If unknown field has been provided. """queryset=self._clone()new_ordering=[]fororderinginorderings:field_name,order_type=self._resolve_ordering_string(ordering)ifnot(field_name.split("__")[0]inself.model._meta.fieldsorfield_nameinself._annotations):raiseFieldError(f"Unknown field {field_name} for model {self.model.__name__}")new_ordering.append((field_name,order_type))queryset._orderings=new_orderingreturnqueryset
[docs]deflimit(self,limit:int)->"QuerySet[MODEL]":""" Limits QuerySet to given length. :raises ParamsError: Limit should be non-negative number. """iflimit<0:raiseParamsError("Limit should be non-negative number")queryset=self._clone()queryset._limit=limitreturnqueryset
[docs]defoffset(self,offset:int)->"QuerySet[MODEL]":""" Query offset for QuerySet. :raises ParamsError: Offset should be non-negative number. """ifoffset<0:raiseParamsError("Offset should be non-negative number")queryset=self._clone()queryset._offset=offsetifself.capabilities.requires_limitandqueryset._limitisNone:queryset._limit=1000000returnqueryset
[docs]def__getitem__(self,key:slice)->"QuerySet[MODEL]":""" Query offset and limit for Queryset. :raises ParamsError: QuerySet indices must be slices. :raises ParamsError: Slice steps should be 1 or None. :raises ParamsError: Slice start should be non-negative number or None. :raises ParamsError: Slice stop should be non-negative number greater that slice start, or None. """ifnotisinstance(key,slice):raiseParamsError("QuerySet indices must be slices.")ifnot(key.stepisNoneor(isinstance(key.step,int)andkey.step==1)):raiseParamsError("Slice steps should be 1 or None.")start=key.startifkey.startisnotNoneelse0ifnotisinstance(start,int)orstart<0:raiseParamsError("Slice start should be non-negative number or None.")ifkey.stopisnotNoneand(notisinstance(key.stop,int)orkey.stop<=start):raiseParamsError("Slice stop should be non-negative number greater that slice start, or None.",)queryset=self.offset(start)ifkey.stop:queryset=queryset.limit(key.stop-start)returnqueryset
[docs]defdistinct(self)->"QuerySet[MODEL]":""" Make QuerySet distinct. Only makes sense in combination with a ``.values()`` or ``.values_list()`` as it precedes all the fetched fields with a distinct. """queryset=self._clone()queryset._distinct=Truereturnqueryset
[docs]defselect_for_update(self,nowait:bool=False,skip_locked:bool=False,of:Tuple[str,...]=())->"QuerySet[MODEL]":""" Make QuerySet select for update. Returns a queryset that will lock rows until the end of the transaction, generating a SELECT ... FOR UPDATE SQL statement on supported databases. """ifself.capabilities.support_for_update:queryset=self._clone()queryset._select_for_update=Truequeryset._select_for_update_nowait=nowaitqueryset._select_for_update_skip_locked=skip_lockedqueryset._select_for_update_of=set(of)returnquerysetreturnself
[docs]defannotate(self,**kwargs:Union[Expression,Term])->"QuerySet[MODEL]":""" Annotate result with aggregation or function result. :raises TypeError: Value of kwarg is expected to be a ``Function`` instance. """fromtortoise.modelsimportget_filters_for_fieldqueryset=self._clone()forkey,annotationinkwargs.items():# if not isinstance(annotation, (Function, Term)):# raise TypeError("value is expected to be Function/Term instance")queryset._annotations[key]=annotationqueryset._custom_filters.update(get_filters_for_field(key,None,key))returnqueryset
[docs]defgroup_by(self,*fields:str)->"QuerySet[MODEL]":""" Make QuerySet returns list of dict or tuple with group by. Must call before .values() or .values_list() """queryset=self._clone()queryset._group_bys=fieldsreturnqueryset
[docs]defvalues_list(self,*fields_:str,flat:bool=False)->"ValuesListQuery[Literal[False]]":""" Make QuerySet returns list of tuples for given args instead of objects. If call after `.get()`, `.get_or_none()` or `.first()` return tuples for given args instead of object. If ```flat=True`` and only one arg is passed can return flat list or just scalar. If no arguments are passed it will default to a tuple containing all fields in order of declaration. """fields_for_select_list=fields_or[fieldforfieldinself.model._meta.fields_mapiffieldinself.model._meta.db_fields]+list(self._annotations.keys())returnValuesListQuery(db=self._db,model=self.model,q_objects=self._q_objects,single=self._single,raise_does_not_exist=self._raise_does_not_exist,flat=flat,fields_for_select_list=fields_for_select_list,distinct=self._distinct,limit=self._limit,offset=self._offset,orderings=self._orderings,annotations=self._annotations,custom_filters=self._custom_filters,group_bys=self._group_bys,force_indexes=self._force_indexes,use_indexes=self._use_indexes,)
[docs]defvalues(self,*args:str,**kwargs:str)->"ValuesQuery[Literal[False]]":""" Make QuerySet return dicts instead of objects. If call after `.get()`, `.get_or_none()` or `.first()` return dict instead of object. Can pass names of fields to fetch, or as a ``field_name='name_in_dict'`` kwarg. If no arguments are passed it will default to a dict containing all fields. :raises FieldError: If duplicate key has been provided. """ifargsorkwargs:fields_for_select:Dict[str,str]={}forfieldinargs:iffieldinfields_for_select:raiseFieldError(f"Duplicate key {field}")fields_for_select[field]=fieldforreturn_as,fieldinkwargs.items():ifreturn_asinfields_for_select:raiseFieldError(f"Duplicate key {return_as}")fields_for_select[return_as]=fieldelse:_fields=[fieldforfieldinself.model._meta.fields_map.keys()iffieldinself.model._meta.fields_db_projection.keys()]+list(self._annotations.keys())fields_for_select={field:fieldforfieldin_fields}returnValuesQuery(db=self._db,model=self.model,q_objects=self._q_objects,single=self._single,raise_does_not_exist=self._raise_does_not_exist,fields_for_select=fields_for_select,distinct=self._distinct,limit=self._limit,offset=self._offset,orderings=self._orderings,annotations=self._annotations,custom_filters=self._custom_filters,group_bys=self._group_bys,force_indexes=self._force_indexes,use_indexes=self._use_indexes,)
[docs]defdelete(self)->"DeleteQuery":""" Delete all objects in QuerySet. """returnDeleteQuery(db=self._db,model=self.model,q_objects=self._q_objects,annotations=self._annotations,custom_filters=self._custom_filters,limit=self._limit,orderings=self._orderings,)
[docs]defupdate(self,**kwargs:Any)->"UpdateQuery":""" Update all objects in QuerySet with given kwargs. .. admonition: Example: .. code-block:: py3 await Employee.filter(occupation='developer').update(salary=5000) Will instead of returning a resultset, update the data in the DB itself. """returnUpdateQuery(db=self._db,model=self.model,update_kwargs=kwargs,q_objects=self._q_objects,annotations=self._annotations,custom_filters=self._custom_filters,limit=self._limit,orderings=self._orderings,)
[docs]defcount(self)->"CountQuery":""" Return count of objects in queryset instead of objects. """returnCountQuery(db=self._db,model=self.model,q_objects=self._q_objects,annotations=self._annotations,custom_filters=self._custom_filters,limit=self._limit,offset=self._offset,force_indexes=self._force_indexes,use_indexes=self._use_indexes,)
[docs]defall(self)->"QuerySet[MODEL]":""" Return the whole QuerySet. Essentially a no-op except as the only operation. """returnself._clone()
[docs]defraw(self,sql:str)->"RawSQLQuery":""" Return the QuerySet from raw SQL """returnRawSQLQuery(model=self.model,db=self._db,sql=sql)
[docs]deffirst(self)->QuerySetSingle[Optional[MODEL]]:""" Limit queryset to one object and return one object instead of list. """queryset=self._clone()queryset._limit=1queryset._single=Truereturnqueryset# type: ignore
[docs]defget(self,*args:Q,**kwargs:Any)->QuerySetSingle[MODEL]:""" Fetch exactly one object matching the parameters. """queryset=self.filter(*args,**kwargs)queryset._limit=2queryset._single=Truequeryset._raise_does_not_exist=Truereturnqueryset# type: ignore
[docs]asyncdefin_bulk(self,id_list:Iterable[Union[str,int]],field_name:str)->Dict[str,MODEL]:""" Return a dictionary mapping each of the given IDs to the object with that ID. If `id_list` isn't provided, evaluate the entire QuerySet. :param id_list: A list of field values :param field_name: Must be a unique field """objs=awaitself.filter(**{f"{field_name}__in":id_list})return{getattr(obj,field_name):objforobjinobjs}
[docs]defbulk_create(self,objects:Iterable[MODEL],batch_size:Optional[int]=None,ignore_conflicts:bool=False,update_fields:Optional[Iterable[str]]=None,on_conflict:Optional[Iterable[str]]=None,)->"BulkCreateQuery[MODEL]":""" This method inserts the provided list of objects into the database in an efficient manner (generally only 1 query, no matter how many objects there are). :param on_conflict: On conflict index name :param update_fields: Update fields when conflicts :param ignore_conflicts: Ignore conflicts when inserting :param objects: List of objects to bulk create :param batch_size: How many objects are created in a single query :raises ValueError: If params do not meet specifications """ifignore_conflictsandupdate_fields:raiseValueError("ignore_conflicts and update_fields are mutually exclusive.",)ifnotignore_conflicts:if(update_fieldsandnoton_conflict)or(on_conflictandnotupdate_fields):raiseValueError("update_fields and on_conflict need set in same time.")returnBulkCreateQuery(db=self._db,model=self.model,objects=objects,batch_size=batch_size,ignore_conflicts=ignore_conflicts,update_fields=update_fields,on_conflict=on_conflict,)
[docs]defbulk_update(self,objects:Iterable[MODEL],fields:Iterable[str],batch_size:Optional[int]=None,)->"BulkUpdateQuery[MODEL]":""" Update the given fields in each of the given objects in the database. :param objects: List of objects to bulk create :param fields: The fields to update :param batch_size: How many objects are created in a single query :raises ValueError: If objects have no primary key set """ifany(obj.pkisNoneforobjinobjects):raiseValueError("All bulk_update() objects must have a primary key set.")returnBulkUpdateQuery(db=self._db,model=self.model,q_objects=self._q_objects,annotations=self._annotations,custom_filters=self._custom_filters,limit=self._limit,orderings=self._orderings,objects=objects,fields=fields,batch_size=batch_size,)
[docs]defget_or_none(self,*args:Q,**kwargs:Any)->QuerySetSingle[Optional[MODEL]]:""" Fetch exactly one object matching the parameters. """queryset=self.filter(*args,**kwargs)queryset._limit=2queryset._single=Truereturnqueryset# type: ignore
[docs]defonly(self,*fields_for_select:str)->"QuerySet[MODEL]":""" Fetch ONLY the specified fields to create a partial model. Persisting changes on the model is allowed only when: * All the fields you want to update is specified in ``<model>.save(update_fields=[...])`` * You included the Model primary key in the `.only(...)`` To protect against common mistakes we ensure that errors get raised: * If you access a field that is not specified, you will get an ``AttributeError``. * If you do a ``<model>.save()`` a ``IncompleteInstanceError`` will be raised as the model is, as requested, incomplete. * If you do a ``<model>.save(update_fields=[...])`` and you didn't include the primary key in the ``.only(...)``, then ``IncompleteInstanceError`` will be raised indicating that updates can't be done without the primary key being known. * If you do a ``<model>.save(update_fields=[...])`` and one of the fields in ``update_fields`` was not in the ``.only(...)``, then ``IncompleteInstanceError`` as that field is not available to be updated. """queryset=self._clone()queryset._fields_for_select=fields_for_selectreturnqueryset
[docs]defselect_related(self,*fields:str)->"QuerySet[MODEL]":""" Return a new QuerySet instance that will select related objects. If fields are specified, they must be ForeignKey fields and only those related objects are included in the selection. """queryset=self._clone()forfieldinfields:queryset._select_related.add(field)returnqueryset
[docs]defforce_index(self,*index_names:str)->"QuerySet[MODEL]":""" The FORCE INDEX hint acts like USE INDEX (index_list), with the addition that a table scan is assumed to be very expensive. """ifself.capabilities.support_index_hint:queryset=self._clone()forindex_nameinindex_names:queryset._force_indexes.add(index_name)returnquerysetreturnself
[docs]defuse_index(self,*index_names:str)->"QuerySet[MODEL]":""" The USE INDEX (index_list) hint tells MySQL to use only one of the named indexes to find rows in the table. """ifself.capabilities.support_index_hint:queryset=self._clone()forindex_nameinindex_names:queryset._use_indexes.add(index_name)returnquerysetreturnself
[docs]defprefetch_related(self,*args:Union[str,Prefetch])->"QuerySet[MODEL]":""" Like ``.fetch_related()`` on instance, but works on all objects in QuerySet. :raises FieldError: If the field to prefetch on is not a relation, or not found. """queryset=self._clone()queryset._prefetch_map={}forrelationinargs:ifisinstance(relation,Prefetch):relation.resolve_for_queryset(queryset)continuefirst_level_field,__,forwarded_prefetch=relation.partition("__")iffirst_level_fieldnotinself.model._meta.fetch_fields:iffirst_level_fieldinself.model._meta.fields:raiseFieldError(f"Field {first_level_field} on {self.model._meta.full_name} is not a relation")raiseFieldError(f"Relation {first_level_field} for {self.model._meta.full_name} not found")iffirst_level_fieldnotinqueryset._prefetch_map.keys():queryset._prefetch_map[first_level_field]=set()ifforwarded_prefetch:queryset._prefetch_map[first_level_field].add(forwarded_prefetch)returnqueryset
[docs]asyncdefexplain(self)->Any:"""Fetch and return information about the query execution plan. This is done by executing an ``EXPLAIN`` query whose exact prefix depends on the database backend, as documented below. - PostgreSQL: ``EXPLAIN (FORMAT JSON, VERBOSE) ...`` - SQLite: ``EXPLAIN QUERY PLAN ...`` - MySQL: ``EXPLAIN FORMAT=JSON ...`` .. note:: This is only meant to be used in an interactive environment for debugging and query optimization. **The output format may (and will) vary greatly depending on the database backend.** """ifself._dbisNone:self._db=self._choose_db()# type: ignoreself._make_query()returnawaitself._db.executor_class(model=self.model,db=self._db).execute_explain(self.query)
[docs]defusing_db(self,_db:Optional[BaseDBAsyncClient])->"QuerySet[MODEL]":""" Executes query in provided db client. Useful for transactions workaround. """queryset=self._clone()queryset._db=_dbif_dbelsequeryset._dbreturnqueryset
def_join_table_with_select_related(self,model:"Type[Model]",table:Table,field:str,forwarded_fields:str,path:Iterable[Optional[str]],)->Tuple[Table,str]:iffieldinmodel._meta.fields_db_projectionandforwarded_fields:raiseFieldError(f'Field "{field}" for model "{model.__name__}" is not relation')field_object=cast(RelationalField,model._meta.fields_map.get(field))ifnotfield_object:raiseFieldError(f'Unknown field "{field}" for model "{model.__name__}"')table=self._join_table_by_field(table,field,field_object)related_fields=field_object.related_model._meta.db_fieldsappend_item=(field_object.related_model,len(related_fields),field,model,path,)ifappend_itemnotinself._select_related_idx:self._select_related_idx.append(append_item)forrelated_fieldinrelated_fields:self.query=self.query.select(table[related_field].as_(f"{table.get_table_name()}.{related_field}"))ifforwarded_fields:field,__,forwarded_fields_=forwarded_fields.partition("__")self.query=self._join_table_with_select_related(model=field_object.related_model,table=table,field=field,forwarded_fields=forwarded_fields_,path=(*path,field),)returnself.queryreturnself.querydef_make_query(self)->None:# clean tmp records firstself._select_related_idx=[]self._joined_tables=[]table=self.model._meta.basetableifself._fields_for_select:append_item=(self.model,len(self._fields_for_select),table,self.model,(None,),)ifappend_itemnotinself._select_related_idx:self._select_related_idx.append(append_item)db_fields_for_select=[table[self.model._meta.fields_db_projection[field]].as_(field)forfieldinself._fields_for_select]self.query=copy(self.model._meta.basequery).select(*db_fields_for_select)else:self.query=copy(self.model._meta.basequery_all_fields)append_item=(self.model,len(self.model._meta.db_fields)+len(self._annotations),table,self.model,(None,),)ifappend_itemnotinself._select_related_idx:self._select_related_idx.append(append_item)self.resolve_ordering(self.model,self.model._meta.basetable,self._orderings,self._annotations)self.resolve_filters(model=self.model,q_objects=self._q_objects,annotations=self._annotations,custom_filters=self._custom_filters,)ifself._limitisnotNone:self.query._limit=self._limitifself._offset:self.query._offset=self._offsetifself._distinct:self.query._distinct=Trueifself._select_for_update:self.query=self.query.for_update(self._select_for_update_nowait,self._select_for_update_skip_locked,self._select_for_update_of,)ifself._select_related:forfieldinself._select_related:field,__,forwarded_fields=field.partition("__")self.query=self._join_table_with_select_related(model=self.model,table=self.model._meta.basetable,field=field,forwarded_fields=forwarded_fields,path=(None,field),)ifself._force_indexes:self.query._force_indexes=[]self.query=self.query.force_index(*self._force_indexes)ifself._use_indexes:self.query._use_indexes=[]self.query=self.query.use_index(*self._use_indexes)def__await__(self)->Generator[Any,None,List[MODEL]]:ifself._dbisNone:self._db=self._choose_db(self._select_for_update)# type: ignoreself._make_query()returnself._execute().__await__()asyncdef__aiter__(self)->AsyncIterator[MODEL]:forvalinawaitself:yieldvalasyncdef_execute(self)->List[MODEL]:instance_list=awaitself._db.executor_class(model=self.model,db=self._db,prefetch_map=self._prefetch_map,prefetch_queries=self._prefetch_queries,select_related_idx=self._select_related_idx,).execute_select(self.query,custom_fields=list(self._annotations.keys()))ifself._single:iflen(instance_list)==1:returninstance_list[0]ifnotinstance_list:ifself._raise_does_not_exist:raiseDoesNotExist(self.model)returnNone# type: ignoreraiseMultipleObjectsReturned(self.model)returninstance_list
[docs]classUpdateQuery(AwaitableQuery):__slots__=("update_kwargs","q_objects","annotations","custom_filters","orderings","limit","values",)def__init__(self,model:Type[MODEL],update_kwargs:Dict[str,Any],db:BaseDBAsyncClient,q_objects:List[Q],annotations:Dict[str,Any],custom_filters:Dict[str,FilterInfoDict],limit:Optional[int],orderings:List[Tuple[str,str]],)->None:super().__init__(model)self.update_kwargs=update_kwargsself.q_objects=q_objectsself.annotations=annotationsself.custom_filters=custom_filtersself._db=dbself.limit=limitself.orderings=orderingsself.values:List[Any]=[]def_make_query(self)->None:table=self.model._meta.basetableself.query=self._db.query_class.update(table)ifself.capabilities.support_update_limit_order_byandself.limit:self.query._limit=self.limitself.resolve_ordering(self.model,table,self.orderings,self.annotations)self.resolve_filters(model=self.model,q_objects=self.q_objects,annotations=self.annotations,custom_filters=self.custom_filters,)# Need to get executor to get correct column_mapexecutor=self._db.executor_class(model=self.model,db=self._db)count=0forkey,valueinself.update_kwargs.items():field_object=self.model._meta.fields_map.get(key)ifnotfield_object:raiseFieldError(f"Unknown keyword argument {key} for model {self.model}")iffield_object.pk:raiseIntegrityError(f"Field {key} is PK and can not be updated")ifisinstance(field_object,(ForeignKeyFieldInstance,OneToOneFieldInstance)):fk_field:str=field_object.source_field# type: ignoredb_field=self.model._meta.fields_map[fk_field].source_fieldvalue=executor.column_map[fk_field](getattr(value,field_object.to_field_instance.model_field_name),None,)else:try:db_field=self.model._meta.fields_db_projection[key]exceptKeyError:raiseFieldError(f"Field {key} is virtual and can not be updated")ifisinstance(value,Term):value=F.resolver_arithmetic_expression(self.model,value)[0]elifisinstance(value,Function):value=value.resolve(self.model,table)["field"]else:value=executor.column_map[key](value,None)ifisinstance(value,Term):self.query=self.query.set(db_field,value)else:self.query=self.query.set(db_field,executor.parameter(count))self.values.append(value)count+=1def__await__(self)->Generator[Any,None,int]:ifself._dbisNone:self._db=self._choose_db(True)# type: ignoreself._make_query()returnself._execute().__await__()asyncdef_execute(self)->int:return(awaitself._db.execute_query(str(self.query),self.values))[0]
[docs]classFieldSelectQuery(AwaitableQuery):# pylint: disable=W0223def__init__(self,model:Type[MODEL],annotations:Dict[str,Any])->None:super().__init__(model)self.annotations=annotationsdef_join_table_with_forwarded_fields(self,model:Type[MODEL],table:Table,field:str,forwarded_fields:str)->Tuple[Table,str]:iffieldinmodel._meta.fields_db_projectionandnotforwarded_fields:returntable,model._meta.fields_db_projection[field]iffieldinmodel._meta.fields_db_projectionandforwarded_fields:raiseFieldError(f'Field "{field}" for model "{model.__name__}" is not relation')iffieldinself.model._meta.fetch_fieldsandnotforwarded_fields:raiseValueError('Selecting relation "{}" is not possible, select concrete '"field on related model".format(field))field_object=cast(RelationalField,model._meta.fields_map.get(field))ifnotfield_object:raiseFieldError(f'Unknown field "{field}" for model "{model.__name__}"')table=self._join_table_by_field(table,field,field_object)field,__,forwarded_fields_=forwarded_fields.partition("__")returnself._join_table_with_forwarded_fields(model=field_object.related_model,table=table,field=field,forwarded_fields=forwarded_fields_,)defadd_field_to_select_query(self,field:str,return_as:str)->None:table=self.model._meta.basetableiffieldinself.annotations:self._annotations[return_as]=self.annotations[field]returniffieldinself.model._meta.fields_db_projection:db_field=self.model._meta.fields_db_projection[field]self.query._select_field(table[db_field].as_(return_as))returniffieldinself.model._meta.fetch_fields:raiseValueError('Selecting relation "{}" is not possible, select '"concrete field on related model".format(field))field_,__,forwarded_fields=field.partition("__")iffield_inself.model._meta.fetch_fields:related_table,related_db_field=self._join_table_with_forwarded_fields(model=self.model,table=table,field=field_,forwarded_fields=forwarded_fields,)self.query._select_field(related_table[related_db_field].as_(return_as))returnraiseFieldError(f'Unknown field "{field}" for model "{self.model.__name__}"')defresolve_to_python_value(self,model:Type[MODEL],field:str)->Callable:iffieldinmodel._meta.fetch_fields:# return as is to get whole model objectsreturnlambdax:xiffieldin(x[1]forxinmodel._meta.db_native_fields):returnlambdax:xiffieldinself.annotations:annotation=self.annotations[field]field_object=getattr(annotation,"field_object",None)iffield_object:returnfield_object.to_python_valuereturnlambdax:xiffieldinmodel._meta.fields_map:returnmodel._meta.fields_map[field].to_python_valuefield_,__,forwarded_fields=field.partition("__")iffield_inmodel._meta.fetch_fields:new_model=model._meta.fields_map[field_].related_model# type: ignorereturnself.resolve_to_python_value(new_model,forwarded_fields)raiseFieldError(f'Unknown field "{field}" for model "{model}"')def_resolve_group_bys(self,*field_names:str):group_bys=[]forfield_nameinfield_names:iffield_nameinself._annotations:group_bys.append(Term(field_name))continuefield,__,forwarded_fields=field_name.partition("__")related_table,related_db_field=self._join_table_with_forwarded_fields(model=self.model,table=self.model._meta.basetable,field=field,forwarded_fields=forwarded_fields,)field=related_table[related_db_field].as_(field_name)group_bys.append(field)returngroup_bys
[docs]classValuesListQuery(FieldSelectQuery,Generic[SINGLE]):__slots__=("flat","fields","limit","offset","distinct","orderings","annotations","custom_filters","q_objects","single","raise_does_not_exist","fields_for_select_list","group_bys","force_indexes","use_indexes",)def__init__(self,model:Type[MODEL],db:BaseDBAsyncClient,q_objects:List[Q],single:bool,raise_does_not_exist:bool,fields_for_select_list:Union[Tuple[str,...],List[str]],limit:Optional[int],offset:Optional[int],distinct:bool,orderings:List[Tuple[str,str]],flat:bool,annotations:Dict[str,Any],custom_filters:Dict[str,FilterInfoDict],group_bys:Tuple[str,...],force_indexes:Set[str],use_indexes:Set[str],)->None:super().__init__(model,annotations)ifflatand(len(fields_for_select_list)!=1):raiseTypeError("You can flat value_list only if contains one field")fields_for_select={str(i):fieldfori,fieldinenumerate(fields_for_select_list)}self.fields=fields_for_selectself.limit=limitself.offset=offsetself.distinct=distinctself.orderings=orderingsself.custom_filters=custom_filtersself.q_objects=q_objectsself.single=singleself.raise_does_not_exist=raise_does_not_existself.fields_for_select_list=fields_for_select_listself.flat=flatself._db=dbself.group_bys=group_bysself.force_indexes=force_indexesself.use_indexes=use_indexesdef_make_query(self)->None:self._joined_tables=[]self.query=copy(self.model._meta.basequery)forpositional_number,fieldinself.fields.items():self.add_field_to_select_query(field,positional_number)self.resolve_ordering(model=self.model,table=self.model._meta.basetable,orderings=self.orderings,annotations=self.annotations,)self.resolve_filters(model=self.model,q_objects=self.q_objects,annotations=self.annotations,custom_filters=self.custom_filters,)ifself.limit:self.query._limit=self.limitifself.offset:self.query._offset=self.offsetifself.distinct:self.query._distinct=Trueifself.group_bys:self.query._groupbys=self._resolve_group_bys(*self.group_bys)ifself.force_indexes:self.query._force_indexes=[]self.query=self.query.force_index(*self.force_indexes)ifself.use_indexes:self.query._use_indexes=[]self.query=self.query.use_index(*self.use_indexes)@overloaddef__await__(self:"ValuesListQuery[Literal[False]]",)->Generator[Any,None,List[Tuple[Any,...]]]:...@overloaddef__await__(self:"ValuesListQuery[Literal[True]]",)->Generator[Any,None,Tuple[Any,...]]:...def__await__(self)->Generator[Any,None,Union[List[Any],Tuple[Any,...]]]:ifself._dbisNone:self._db=self._choose_db()# type: ignoreself._make_query()returnself._execute().__await__()# pylint: disable=E1101asyncdef__aiter__(self:"ValuesListQuery[Any]")->AsyncIterator[Any]:forvalinawaitself:yieldvalasyncdef_execute(self)->Union[List[Any],Tuple]:_,result=awaitself._db.execute_query(str(self.query))columns=[(key,self.resolve_to_python_value(self.model,name))forkey,nameinself.fields.items()]ifself.flat:func=columns[0][1]flatmap=lambdaentry:func(entry["0"])# noqalst_values=list(map(flatmap,result))else:listmap=lambdaentry:tuple(func(entry[column])forcolumn,funcincolumns)# noqalst_values=list(map(listmap,result))ifself.single:iflen(lst_values)==1:returnlst_values[0]ifnotlst_values:ifself.raise_does_not_exist:raiseDoesNotExist(self.model)returnNone# type: ignoreraiseMultipleObjectsReturned(self.model)returnlst_values