importoperatorfromtypingimport(TYPE_CHECKING,Any,Dict,Iterator,List,Optional,Tuple,Type,Union,cast,)frompypikaimportCaseasPypikaCasefrompypikaimportFieldasPypikaFieldfrompypikaimportTablefrompypika.functionsimportDistinctOptionFunctionfrompypika.termsimportArithmeticExpression,Criterionfrompypika.termsimportFunctionasPypikaFunctionfrompypika.termsimportTermfrompypika.utilsimportformat_alias_sqlfromtortoise.exceptionsimportConfigurationError,FieldError,OperationalErrorfromtortoise.fields.relationalimport(BackwardFKRelation,ForeignKeyFieldInstance,RelationalField,)fromtortoise.filtersimportFilterInfoDictfromtortoise.query_utilsimportQueryModifier,_get_joins_for_related_fieldifTYPE_CHECKING:# pragma: nocoveragefrompypika.queriesimportSelectablefromtortoise.fields.baseimportFieldfromtortoise.modelsimportModelfromtortoise.querysetimportAwaitableQueryclassF(PypikaField):# type: ignore@classmethoddefresolver_arithmetic_expression(cls,model:"Type[Model]",arithmetic_expression_or_field:Term,)->Tuple[Term,Optional[PypikaField]]:field_object=Noneifisinstance(arithmetic_expression_or_field,PypikaField):name=arithmetic_expression_or_field.nametry:arithmetic_expression_or_field.name=model._meta.fields_db_projection[name]field_object=model._meta.fields_map.get(name,None)iffield_object:func=field_object.get_for_dialect(model._meta.db.capabilities.dialect,"function_cast")iffunc:arithmetic_expression_or_field=func(field_object,arithmetic_expression_or_field)exceptKeyError:raiseFieldError(f"There is no non-virtual field {name} on Model {model.__name__}")elifisinstance(arithmetic_expression_or_field,ArithmeticExpression):left=arithmetic_expression_or_field.leftright=arithmetic_expression_or_field.right(arithmetic_expression_or_field.left,left_field_object,)=cls.resolver_arithmetic_expression(model,left)ifleft_field_object:iffield_objectandtype(field_object)isnottype(left_field_object):raiseFieldError("Cannot use arithmetic expression between different field type")field_object=left_field_object(arithmetic_expression_or_field.right,right_field_object,)=cls.resolver_arithmetic_expression(model,right)ifright_field_object:iffield_objectandtype(field_object)isnottype(right_field_object):raiseFieldError("Cannot use arithmetic expression between different field type")field_object=right_field_objectreturnarithmetic_expression_or_field,field_objectclassSubquery(Term):# type: ignoredef__init__(self,query:"AwaitableQuery"):super().__init__()self.query=querydefget_sql(self,**kwargs:Any)->str:returnself.query.as_query().get_sql(**kwargs)defas_(self,alias:str)->"Selectable":returnself.query.as_query().as_(alias)classRawSQL(Term):# type: ignoredef__init__(self,sql:str):super().__init__()self.sql=sqldefget_sql(self,with_alias:bool=False,**kwargs:Any)->str:ifwith_alias:returnformat_alias_sql(sql=self.sql,alias=self.alias,**kwargs)returnself.sqlclassExpression:""" Parent class for expressions """defresolve(self,model:"Type[Model]",table:Table)->Any:raiseNotImplementedError()
[docs]classQ(Expression):""" Q Expression container. Q Expressions are a useful tool to compose a query from many small parts. :param join_type: Is the join an AND or OR join type? :param args: Inner ``Q`` expressions that you want to wrap. :param kwargs: Filter statements that this Q object should encapsulate. """__slots__=("children","filters","join_type","_is_negated","_annotations","_custom_filters",)AND="AND"OR="OR"def__init__(self,*args:"Q",join_type:str=AND,**kwargs:Any)->None:ifargsandkwargs:newarg=Q(join_type=join_type,**kwargs)args=(newarg,)+argskwargs={}ifnotall(isinstance(node,Q)fornodeinargs):raiseOperationalError("All ordered arguments must be Q nodes")#: Contains the sub-Q's that this Q is made up ofself.children:Tuple[Q,...]=args#: Contains the filters applied to this Qself.filters:Dict[str,FilterInfoDict]=kwargsifjoin_typenotin{self.AND,self.OR}:raiseOperationalError("join_type must be AND or OR")#: Specifies if this Q does an AND or OR on its childrenself.join_type=join_typeself._is_negated=Falseself._annotations:Dict[str,Any]={}self._custom_filters:Dict[str,FilterInfoDict]={}
[docs]def__and__(self,other:"Q")->"Q":""" Returns a binary AND of Q objects, use ``AND`` operator. :raises OperationalError: AND operation requires a Q node """ifnotisinstance(other,Q):raiseOperationalError("AND operation requires a Q node")returnQ(self,other,join_type=self.AND)
[docs]def__or__(self,other:"Q")->"Q":""" Returns a binary OR of Q objects, use ``OR`` operator. :raises OperationalError: OR operation requires a Q node """ifnotisinstance(other,Q):raiseOperationalError("OR operation requires a Q node")returnQ(self,other,join_type=self.OR)
[docs]def__invert__(self)->"Q":""" Returns a negated instance of the Q object, use ``~`` operator. """q=Q(*self.children,join_type=self.join_type,**self.filters)q.negate()returnq
[docs]defnegate(self)->None:""" Negates the current Q object. (mutation) """self._is_negated=notself._is_negated
def_resolve_nested_filter(self,model:"Type[Model]",key:str,value:Any,table:Table)->QueryModifier:related_field_name,__,forwarded_fields=key.partition("__")related_field=cast(RelationalField,model._meta.fields_map[related_field_name])required_joins=_get_joins_for_related_field(table,related_field,related_field_name)q=Q(**{forwarded_fields:value})q._annotations=self._annotationsq._custom_filters=self._custom_filtersmodifier=q.resolve(model=related_field.related_model,table=required_joins[-1][0],)returnQueryModifier(joins=required_joins)&modifierdef_resolve_custom_kwarg(self,model:"Type[Model]",key:str,value:Any,table:Table)->QueryModifier:having_info=self._custom_filters[key]annotation=self._annotations[having_info["field"]]ifisinstance(annotation,Term):annotation_info={"field":annotation}else:annotation_info=annotation.resolve(model,table)operator=having_info["operator"]overridden_operator=model._meta.db.executor_class.get_overridden_filter_func(filter_func=operator)ifoverridden_operator:operator=overridden_operatorifannotation_info["field"].is_aggregate:modifier=QueryModifier(having_criterion=operator(annotation_info["field"],value))else:modifier=QueryModifier(where_criterion=operator(annotation_info["field"],value))returnmodifierdef_process_filter_kwarg(self,model:"Type[Model]",key:str,value:Any,table:Table)->Tuple[Criterion,Optional[Tuple[Table,Criterion]]]:join=NoneifvalueisNoneandf"{key}__isnull"inmodel._meta.filters:param=model._meta.get_filter(f"{key}__isnull")value=Trueelse:param=model._meta.get_filter(key)pk_db_field=model._meta.db_pk_columnifparam.get("table"):join=(param["table"],table[pk_db_field]==param["table"][param["backward_key"]],)ifparam.get("value_encoder"):value=param["value_encoder"](value,model)criterion=param["operator"](param["table"][param["field"]],value)else:ifisinstance(value,Term):encoded_value=valueelse:field_object=model._meta.fields_map[param["field"]]encoded_value=(param["value_encoder"](value,model,field_object)ifparam.get("value_encoder")elsemodel._meta.db.executor_class._field_to_db(field_object,value,model))op=param["operator"]# this is an ugly hackifop==operator.eq:encoded_value=model._meta.db.query_class._builder()._wrapper_cls(encoded_value)criterion=op(table[param["source_field"]],encoded_value)returncriterion,joindef_resolve_regular_kwarg(self,model:"Type[Model]",key:str,value:Any,table:Table)->QueryModifier:ifkeynotinmodel._meta.filtersandkey.split("__")[0]inmodel._meta.fetch_fields:modifier=self._resolve_nested_filter(model,key,value,table)else:criterion,join=self._process_filter_kwarg(model,key,value,table)joins=[join]ifjoinelse[]modifier=QueryModifier(where_criterion=criterion,joins=joins)returnmodifierdef_get_actual_filter_params(self,model:"Type[Model]",key:str,value:Table)->Tuple[str,Any]:filter_key=keyifkeyinmodel._meta.fk_fieldsorkeyinmodel._meta.o2o_fields:field_object=model._meta.fields_map[key]ifhasattr(value,"pk"):filter_value=value.pkelse:filter_value=valuefilter_key=cast(str,field_object.source_field)elifkeyinmodel._meta.m2m_fields:ifhasattr(value,"pk"):filter_value=value.pkelse:filter_value=valueelif(key.split("__")[0]inmodel._meta.fetch_fieldsorkeyinself._custom_filtersorkeyinmodel._meta.filters):filter_value=valueelse:allowed=sorted(model._meta.fields|model._meta.fetch_fields|set(self._custom_filters))raiseFieldError(f"Unknown filter param '{key}'. Allowed base values are {allowed}")returnfilter_key,filter_valuedef_resolve_kwargs(self,model:"Type[Model]",table:Table)->QueryModifier:modifier=QueryModifier()forraw_key,raw_valueinself.filters.items():key,value=self._get_actual_filter_params(model,raw_key,raw_value)ifkeyinself._custom_filters:filter_modifier=self._resolve_custom_kwarg(model,key,value,table)else:filter_modifier=self._resolve_regular_kwarg(model,key,value,table)ifself.join_type==self.AND:modifier&=filter_modifierelse:modifier|=filter_modifierifself._is_negated:modifier=~modifierreturnmodifierdef_resolve_children(self,model:"Type[Model]",table:Table)->QueryModifier:modifier=QueryModifier()fornodeinself.children:node._annotations=self._annotationsnode._custom_filters=self._custom_filtersnode_modifier=node.resolve(model,table)ifself.join_type==self.AND:modifier&=node_modifierelse:modifier|=node_modifierifself._is_negated:modifier=~modifierreturnmodifier
[docs]defresolve(self,model:"Type[Model]",table:Table,)->QueryModifier:""" Resolves the logical Q chain into the parts of a SQL statement. :param model: The Model this Q Expression should be resolved on. :param table: ``pypika.Table`` to keep track of the virtual SQL table (to allow self referential joins) """ifself.filters:returnself._resolve_kwargs(model,table)returnself._resolve_children(model,table)
[docs]classFunction(Expression):""" Function/Aggregate base. :param field: Field name :param default_values: Extra parameters to the function. .. attribute:: database_func :annotation: pypika.terms.Function The pypika function this represents. .. attribute:: populate_field_object :annotation: bool = False Enable populate_field_object where we want to try and preserve the field type. """__slots__=("field","field_object","default_values")database_func=PypikaFunction# Enable populate_field_object where we want to try and preserve the field type.populate_field_object=Falsedef__init__(self,field:Union[str,F,ArithmeticExpression,"Function"],*default_values:Any)->None:self.field=fieldself.field_object:"Optional[Field]"=Noneself.default_values=default_valuesdef_get_function_field(self,field:Union[ArithmeticExpression,PypikaField,str],*default_values):returnself.database_func(field,*default_values)def_resolve_field_for_model(self,model:"Type[Model]",table:Table,field:str)->dict:joins=[]fields=field.split("__")foriter_fieldinfields[:-1]:ifiter_fieldnotinmodel._meta.fetch_fields:raiseConfigurationError(f"{field} not resolvable")related_field=cast(RelationalField,model._meta.fields_map[iter_field])joins.append((table,iter_field,related_field))model=related_field.related_modelrelated_table:Table=related_field.related_model._meta.basetableifisinstance(related_field,ForeignKeyFieldInstance):# Only FK's can be to same table, so we only auto-alias FK join tablesrelated_table=related_table.as_(f"{table.get_table_name()}__{iter_field}")table=related_tablelast_field=fields[-1]iflast_fieldinmodel._meta.fetch_fields:related_field=cast(RelationalField,model._meta.fields_map[last_field])related_field_meta=related_field.related_model._metajoins.append((table,last_field,related_field))related_table=related_field_meta.basetableifisinstance(related_field,BackwardFKRelation):iftable==related_table:related_table=related_table.as_(f"{table.get_table_name()}__{last_field}")field=related_table[related_field_meta.db_pk_column]else:field_object=model._meta.fields_map[last_field]iffield_object.source_field:field=table[field_object.source_field]else:field=table[last_field]ifself.populate_field_object:self.field_object=model._meta.fields_map.get(last_field,None)ifself.field_object:# pragma: nobranchfunc=self.field_object.get_for_dialect(model._meta.db.capabilities.dialect,"function_cast")iffunc:field=func(self.field_object,field)return{"joins":joins,"field":field}def_resolve_default_values(self,model:"Type[Model]",table:Table)->Iterator[Any]:fordefault_valueinself.default_values:ifisinstance(default_value,Function):yielddefault_value.resolve(model,table)["field"]else:yielddefault_value
[docs]defresolve(self,model:"Type[Model]",table:Table)->dict:""" Used to resolve the Function statement for SQL generation. :param model: Model the function is applied on to. :param table: ``pypika.Table`` to keep track of the virtual SQL table (to allow self referential joins) :return: Dict with keys ``"joins"`` and ``"fields"`` """default_values=self._resolve_default_values(model,table)ifisinstance(self.field,str):function=self._resolve_field_for_model(model,table,self.field)function["field"]=self._get_function_field(function["field"],*default_values)returnfunctionelifisinstance(self.field,Function):function=self.field.resolve(model,table)function["field"]=self._get_function_field(function["field"],*default_values)returnfunctionfield,field_object=F.resolver_arithmetic_expression(model,self.field)ifself.populate_field_object:self.field_object=field_objectreturn{"joins":[],"field":self._get_function_field(field,*default_values)}
[docs]classAggregate(Function):""" Base for SQL Aggregates. :param field: Field name :param default_values: Extra parameters to the function. :param is_distinct: Flag for aggregate with distinction """database_func=DistinctOptionFunctiondef__init__(self,field:Union[str,F,ArithmeticExpression],*default_values:Any,distinct:bool=False,_filter:Optional[Q]=None,)->None:super().__init__(field,*default_values)self.distinct=distinctself.filter=_filterdef_get_function_field(self,field:Union[ArithmeticExpression,PypikaField,str],*default_values):ifself.distinct:returnself.database_func(field,*default_values).distinct()returnself.database_func(field,*default_values)def_resolve_field_for_model(self,model:"Type[Model]",table:Table,field:str)->dict:ret=super()._resolve_field_for_model(model,table,field)ifself.filter:modifier=QueryModifier()modifier&=self.filter.resolve(model,model._meta.basetable)where_criterion,joins,having_criterion=modifier.get_query_modifiers()ret["field"]=PypikaCase().when(where_criterion,ret["field"]).else_(None)returnret
[docs]classWhen(Expression):""" When expression. :param args: Q objects :param kwargs: keyword criterion like filter :param then: value for criterion :param negate: false (default) """def__init__(self,*args:Q,then:Union[str,F,ArithmeticExpression,Function],negate:bool=False,**kwargs:Any,)->None:self.args=argsself.then=thenself.negate=negateself.kwargs=kwargsdef_resolve_q_objects(self)->List[Q]:q_objects=[]forarginself.args:ifnotisinstance(arg,Q):raiseTypeError("expected Q objects as args")ifself.negate:q_objects.append(~arg)else:q_objects.append(arg)forkey,valueinself.kwargs.items():ifself.negate:q_objects.append(~Q(**{key:value}))else:q_objects.append(Q(**{key:value}))returnq_objectsdefresolve(self,model:"Type[Model]",table:Table)->tuple:q_objects=self._resolve_q_objects()modifier=QueryModifier()fornodeinq_objects:modifier&=node.resolve(model,model._meta.basetable)ifisinstance(self.then,Function):then=self.then.resolve(model,table)["field"]elifisinstance(self.then,Term):then=F.resolver_arithmetic_expression(model,self.then)[0]else:then=Term.wrap_constant(self.then)returnmodifier.where_criterion,then
[docs]classCase(Expression):""" Case expression. :param args: When objects :param default: value for 'CASE WHEN ... THEN ... ELSE <default> END' """def__init__(self,*args:When,default:Union[str,F,ArithmeticExpression,Function]=None)->None:self.args=argsself.default=defaultdefresolve(self,model:"Type[Model]",table:Table)->dict:case=PypikaCase()forarginself.args:ifnotisinstance(arg,When):raiseTypeError("expected When objects as args")criterion,term=arg.resolve(model,table)case=case.when(criterion,term)ifisinstance(self.default,Function):case=case.else_(self.default.resolve(model,table)["field"])elifisinstance(self.default,Term):case=case.else_(F.resolver_arithmetic_expression(model,self.default)[0])else:case=case.else_(Term.wrap_constant(self.default))return{"joins":[],"field":case}