from__future__importannotationsfromcopyimportcopyfromtypingimportTYPE_CHECKING,List,Optional,TuplefrompypikaimportTablefrompypika.termsimportCriterionfromtortoise.exceptionsimportOperationalErrorfromtortoise.fields.relationalimport(BackwardFKRelation,ManyToManyFieldInstance,RelationalField,)ifTYPE_CHECKING:# pragma: nocoveragefromtortoise.querysetimportQuerySetTableCriterionTuple=Tuple[Table,Criterion]def_get_joins_for_related_field(table:Table,related_field:RelationalField,related_field_name:str)->List[TableCriterionTuple]:required_joins=[]related_table:Table=related_field.related_model._meta.basetableifisinstance(related_field,ManyToManyFieldInstance):through_table=Table(related_field.through)required_joins.append((through_table,table[related_field.model._meta.db_pk_column]==through_table[related_field.backward_key],))required_joins.append((related_table,through_table[related_field.forward_key]==related_table[related_field.related_model._meta.db_pk_column],))elifisinstance(related_field,BackwardFKRelation):to_field_source_field=(related_field.to_field_instance.source_fieldorrelated_field.to_field_instance.model_field_name)iftable==related_table:related_table=related_table.as_(f"{table.get_table_name()}__{related_field_name}")required_joins.append((related_table,table[to_field_source_field]==related_table[related_field.relation_source_field],))else:to_field_source_field=(related_field.to_field_instance.source_fieldorrelated_field.to_field_instance.model_field_name)from_field=related_field.model._meta.fields_map[related_field.source_field]# type: ignorefrom_field_source_field=from_field.source_fieldorfrom_field.model_field_namerelated_table=related_table.as_(f"{table.get_table_name()}__{related_field_name}")required_joins.append((related_table,related_table[to_field_source_field]==table[from_field_source_field],))returnrequired_joinsclassEmptyCriterion(Criterion):# type:ignore[misc]def__or__(self,other:Criterion)->Criterion:returnotherdef__and__(self,other:Criterion)->Criterion:returnotherdef__bool__(self)->bool:returnFalsedef_and(left:Criterion,right:Criterion)->Criterion:ifleftandnotright:returnleftreturnleft&rightdef_or(left:Criterion,right:Criterion)->Criterion:ifleftandnotright:returnleftreturnleft|rightclassQueryModifier:""" Internal structure used to generate SQL Queries. """def__init__(self,where_criterion:Optional[Criterion]=None,joins:Optional[List[TableCriterionTuple]]=None,having_criterion:Optional[Criterion]=None,)->None:self.where_criterion:Criterion=where_criterionorEmptyCriterion()self.joins=joinsor[]self.having_criterion:Criterion=having_criterionorEmptyCriterion()def__and__(self,other:QueryModifier)->QueryModifier:returnself.__class__(where_criterion=_and(self.where_criterion,other.where_criterion),joins=self.joins+other.joins,having_criterion=_and(self.having_criterion,other.having_criterion),)def_and_criterion(self)->Criterion:return_and(self.where_criterion,self.having_criterion)def__or__(self,other:QueryModifier)->QueryModifier:where_criterion=having_criterion=Noneifself.having_criterionorother.having_criterion:having_criterion=_or(self._and_criterion(),other._and_criterion())else:where_criterion=((self.where_criterion|other.where_criterion)ifself.where_criterionandother.where_criterionelse(self.where_criterionorother.where_criterion))returnself.__class__(where_criterion,self.joins+other.joins,having_criterion)def__invert__(self)->QueryModifier:where_criterion=having_criterion=Noneifself.having_criterion:having_criterion=(self.where_criterion&self.having_criterion).negate()elifself.where_criterion:where_criterion=self.where_criterion.negate()returnself.__class__(where_criterion,self.joins,having_criterion)defget_query_modifiers(self)->Tuple[Criterion,List[TableCriterionTuple],Criterion]:""" Returns a tuple of the query criterion. """returnself.where_criterion,self.joins,self.having_criterion
[docs]classPrefetch:""" Prefetcher container. One would directly use this when wanting to attach a custom QuerySet for specialised prefetching. :param relation: Related field name. :param queryset: Custom QuerySet to use for prefetching. :param to_attr: Sets the result of the prefetch operation to a custom attribute. """__slots__=("relation","queryset","to_attr")def__init__(self,relation:str,queryset:"QuerySet",to_attr:Optional[str]=None)->None:self.to_attr=to_attrself.relation=relationself.queryset=querysetself.queryset.query=copy(self.queryset.model._meta.basequery)
[docs]defresolve_for_queryset(self,queryset:"QuerySet")->None:""" Called internally to generate prefetching query. :param queryset: Custom QuerySet to use for prefetching. :raises OperationalError: If field does not exist in model. """first_level_field,__,forwarded_prefetch=self.relation.partition("__")iffirst_level_fieldnotinqueryset.model._meta.fetch_fields:raiseOperationalError(f"relation {first_level_field} for {queryset.model._meta.db_table} not found")ifforwarded_prefetch:iffirst_level_fieldnotinqueryset._prefetch_map:queryset._prefetch_map[first_level_field]=set()queryset._prefetch_map[first_level_field].add(Prefetch(forwarded_prefetch,self.queryset,to_attr=self.to_attr))else:queryset._prefetch_queries.setdefault(first_level_field,[]).append((self.to_attr,self.queryset))