importdatetimeimportfunctoolsimportjsonimportwarningsfromdecimalimportDecimalfromenumimportEnum,IntEnumfromtypingimportTYPE_CHECKING,Any,Callable,Optional,Type,TypeVar,UnionfromuuidimportUUID,uuid4frompypikaimportfunctionsfrompypika.enumsimportSqlTypesfrompypika.termsimportTermfromtortoiseimporttimezonefromtortoise.exceptionsimportConfigurationError,FieldErrorfromtortoise.fields.baseimportFieldfromtortoise.timezoneimportget_default_timezone,get_timezone,get_use_tz,localtimefromtortoise.validatorsimportMaxLengthValidatortry:fromciso8601importparse_datetimeexceptImportError:# pragma: nocoveragefromiso8601importparse_dateparse_datetime=functools.partial(parse_date,default_timezone=None)ifTYPE_CHECKING:# pragma: nocoveragefromtortoise.modelsimportModel__all__=("BigIntField","BinaryField","BooleanField","CharEnumField","CharField","DateField","DatetimeField","DecimalField","FloatField","IntEnumField","IntField","JSONField","SmallIntField","TextField","TimeDeltaField","UUIDField",)# Doing this we can replace json dumps/loads with different implementationsJsonDumpsFunc=Callable[[Any],str]JsonLoadsFunc=Callable[[Union[str,bytes]],Any]JSON_DUMPS:JsonDumpsFunc=functools.partial(json.dumps,separators=(",",":"))JSON_LOADS:JsonLoadsFunc=json.loadstry:# Use orjson as an optional acceleratorimportorjsonJSON_DUMPS=lambdax:orjson.dumps(x).decode()# noqa: E731JSON_LOADS=orjson.loadsexceptImportError:# pragma: nocoveragepass
[docs]classIntField(Field[int],int):""" Integer field. (32-bit signed) ``primary_key`` (bool): True if field is Primary Key. """SQL_TYPE="INT"allows_generated=Truedef__init__(self,primary_key:Optional[bool]=None,**kwargs:Any)->None:ifprimary_keyorkwargs.get("pk"):kwargs["generated"]=bool(kwargs.get("generated",True))super().__init__(primary_key=primary_key,**kwargs)@propertydefconstraints(self)->dict:return{"ge":-2147483648,"le":2147483647,}class_db_postgres:GENERATED_SQL="SERIAL NOT NULL PRIMARY KEY"class_db_sqlite:GENERATED_SQL="INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL"class_db_mysql:GENERATED_SQL="INT NOT NULL PRIMARY KEY AUTO_INCREMENT"class_db_mssql:GENERATED_SQL="INT IDENTITY(1,1) NOT NULL PRIMARY KEY"class_db_oracle:GENERATED_SQL="INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY NOT NULL"
[docs]classBigIntField(IntField):""" Big integer field. (64-bit signed) ``primary_key`` (bool): True if field is Primary Key. """SQL_TYPE="BIGINT"@propertydefconstraints(self)->dict:return{"ge":-9223372036854775808,"le":9223372036854775807,}class_db_postgres:GENERATED_SQL="BIGSERIAL NOT NULL PRIMARY KEY"class_db_mysql:GENERATED_SQL="BIGINT NOT NULL PRIMARY KEY AUTO_INCREMENT"class_db_mssql:GENERATED_SQL="BIGINT IDENTITY(1,1) NOT NULL PRIMARY KEY"class_db_oracle:SQL_TYPE="INT"GENERATED_SQL="INT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY NOT NULL"
[docs]classSmallIntField(IntField):""" Small integer field. (16-bit signed) ``primary_key`` (bool): True if field is Primary Key. """SQL_TYPE="SMALLINT"@propertydefconstraints(self)->dict:return{"ge":-32768,"le":32767,}class_db_postgres:GENERATED_SQL="SMALLSERIAL NOT NULL PRIMARY KEY"class_db_mysql:GENERATED_SQL="SMALLINT NOT NULL PRIMARY KEY AUTO_INCREMENT"class_db_mssql:GENERATED_SQL="SMALLINT IDENTITY(1,1) NOT NULL PRIMARY KEY"class_db_oracle:GENERATED_SQL="SMALLINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY NOT NULL"
[docs]classCharField(Field[str]):""" Character field. You must provide the following: ``max_length`` (int): Maximum length of the field in characters. """field_type=strdef__init__(self,max_length:int,**kwargs:Any)->None:ifint(max_length)<1:raiseConfigurationError("'max_length' must be >= 1")self.max_length=int(max_length)super().__init__(**kwargs)self.validators.append(MaxLengthValidator(self.max_length))@propertydefconstraints(self)->dict:return{"max_length":self.max_length,}@propertydefSQL_TYPE(self)->str:# type: ignorereturnf"VARCHAR({self.max_length})"class_db_oracle:def__init__(self,field:"CharField")->None:self.field=field@propertydefSQL_TYPE(self)->str:returnf"NVARCHAR2({self.field.max_length})"
[docs]classTextField(Field[str],str):# type: ignore""" Large Text field. """indexable=FalseSQL_TYPE="TEXT"def__init__(self,primary_key:Optional[bool]=None,unique:bool=False,db_index:bool=False,**kwargs:Any,)->None:ifprimary_keyorkwargs.get("pk"):warnings.warn("TextField as a PrimaryKey is Deprecated, use CharField instead",DeprecationWarning,stacklevel=2,)ifunique:raiseConfigurationError("TextField doesn't support unique indexes, consider CharField or another strategy")ifdb_indexorkwargs.get("index"):raiseConfigurationError("TextField can't be indexed, consider CharField")super().__init__(primary_key=primary_key,**kwargs)class_db_mysql:SQL_TYPE="LONGTEXT"class_db_mssql:SQL_TYPE="NVARCHAR(MAX)"class_db_oracle:SQL_TYPE="NCLOB"
[docs]classBooleanField(Field[bool]):""" Boolean field. """# Bool is not subclassable, so we specify type herefield_type=boolSQL_TYPE="BOOL"class_db_sqlite:SQL_TYPE="INT"class_db_mssql:SQL_TYPE="BIT"class_db_oracle:SQL_TYPE="NUMBER(1)"
[docs]classDecimalField(Field[Decimal],Decimal):""" Accurate decimal field. You must provide the following: ``max_digits`` (int): Max digits of significance of the decimal field. ``decimal_places`` (int): How many of those significant digits is after the decimal point. """skip_to_python_if_native=Truedef__init__(self,max_digits:int,decimal_places:int,**kwargs:Any)->None:ifint(max_digits)<1:raiseConfigurationError("'max_digits' must be >= 1")ifint(decimal_places)<0:raiseConfigurationError("'decimal_places' must be >= 0")super().__init__(**kwargs)self.max_digits=max_digitsself.decimal_places=decimal_placesself.quant=Decimal("1"ifdecimal_places==0elsef"1.{('0'*decimal_places)}")defto_python_value(self,value:Any)->Optional[Decimal]:ifvalueisnotNone:value=Decimal(value).quantize(self.quant).normalize()self.validate(value)returnvalue@propertydefSQL_TYPE(self)->str:# type: ignorereturnf"DECIMAL({self.max_digits},{self.decimal_places})"class_db_sqlite:SQL_TYPE="VARCHAR(40)"deffunction_cast(self,term:Term)->Term:returnfunctions.Cast(term,SqlTypes.NUMERIC)
# In case of queryset with filter `__year`/`__month`/`__day` ..., value can be int, float or str. Example:# `await MyModel.filter(created_at__year=2024)`# `await MyModel.filter(created_at__year=2024.0)`# `await MyModel.filter(created_at__year='2024')`DatetimeFieldQueryValueType=TypeVar("DatetimeFieldQueryValueType",datetime.datetime,int,float,str)
[docs]classDatetimeField(Field[datetime.datetime],datetime.datetime):""" Datetime field. ``auto_now`` and ``auto_now_add`` is exclusive. You can opt to set neither or only ONE of them. ``auto_now`` (bool): Always set to ``datetime.utcnow()`` on save. ``auto_now_add`` (bool): Set to ``datetime.utcnow()`` on first save only. """SQL_TYPE="TIMESTAMP"class_db_mysql:SQL_TYPE="DATETIME(6)"class_db_postgres:SQL_TYPE="TIMESTAMPTZ"class_db_mssql:SQL_TYPE="DATETIME2"class_db_oracle:SQL_TYPE="TIMESTAMP WITH TIME ZONE"def__init__(self,auto_now:bool=False,auto_now_add:bool=False,**kwargs:Any)->None:ifauto_now_addandauto_now:raiseConfigurationError("You can choose only 'auto_now' or 'auto_now_add'")super().__init__(**kwargs)self.auto_now=auto_nowself.auto_now_add=auto_now|auto_now_adddefto_python_value(self,value:Any)->Optional[datetime.datetime]:ifvalueisnotNone:ifisinstance(value,datetime.datetime):value=valueelifisinstance(value,int):value=datetime.datetime.fromtimestamp(value)else:value=parse_datetime(value)iftimezone.is_naive(value):value=timezone.make_aware(value,get_timezone())else:value=localtime(value)self.validate(value)returnvaluedefto_db_value(self,value:Optional[DatetimeFieldQueryValueType],instance:"Union[Type[Model], Model]")->Optional[DatetimeFieldQueryValueType]:# Only do this if it is a Model instance, not class. Test for guaranteed instance varifhasattr(instance,"_saved_in_db")and(self.auto_nowor(self.auto_now_addandgetattr(instance,self.model_field_name)isNone)):now=timezone.now()setattr(instance,self.model_field_name,now)returnnow# type:ignore[return-value]ifvalueisnotNone:ifisinstance(value,datetime.datetime)andget_use_tz():iftimezone.is_naive(value):warnings.warn("DateTimeField %s received a naive datetime (%s)"" while time zone support is active."%(self.model_field_name,value),RuntimeWarning,)value=timezone.make_aware(value,"UTC")self.validate(value)returnvalue@propertydefconstraints(self)->dict:data={}ifself.auto_now_add:data["readOnly"]=Truereturndata
[docs]classDateField(Field[datetime.date],datetime.date):""" Date field. """skip_to_python_if_native=TrueSQL_TYPE="DATE"defto_python_value(self,value:Any)->Optional[datetime.date]:ifvalueisnotNoneandnotisinstance(value,datetime.date):value=parse_datetime(value).date()self.validate(value)returnvaluedefto_db_value(self,value:Optional[Union[datetime.date,str]],instance:"Union[Type[Model], Model]")->Optional[datetime.date]:ifvalueisnotNoneandnotisinstance(value,datetime.date):value=parse_datetime(value).date()self.validate(value)returnvalue
classTimeField(Field[datetime.time],datetime.time):""" Time field. """skip_to_python_if_native=TrueSQL_TYPE="TIME"class_db_oracle:SQL_TYPE="NVARCHAR2(8)"def__init__(self,auto_now:bool=False,auto_now_add:bool=False,**kwargs:Any)->None:ifauto_now_addandauto_now:raiseConfigurationError("You can choose only 'auto_now' or 'auto_now_add'")super().__init__(**kwargs)self.auto_now=auto_nowself.auto_now_add=auto_now|auto_now_adddefto_python_value(self,value:Any)->Optional[Union[datetime.time,datetime.timedelta]]:ifvalueisnotNone:ifisinstance(value,str):value=datetime.time.fromisoformat(value)ifisinstance(value,datetime.timedelta):returnvalueiftimezone.is_naive(value):value=value.replace(tzinfo=get_default_timezone())self.validate(value)returnvaluedefto_db_value(self,value:Optional[Union[datetime.time,datetime.timedelta]],instance:"Union[Type[Model], Model]",)->Optional[Union[datetime.time,datetime.timedelta]]:# Only do this if it is a Model instance, not class. Test for guaranteed instance varifhasattr(instance,"_saved_in_db")and(self.auto_nowor(self.auto_now_addandgetattr(instance,self.model_field_name)isNone)):now=timezone.now().time()setattr(instance,self.model_field_name,now)returnnowifvalueisnotNone:ifisinstance(value,datetime.timedelta):returnvalueifget_use_tz():iftimezone.is_naive(value):warnings.warn("TimeField %s received a naive time (%s)"" while time zone support is active."%(self.model_field_name,value),RuntimeWarning,)value=value.replace(tzinfo=get_default_timezone())self.validate(value)returnvalueclass_db_mysql:SQL_TYPE="TIME(6)"class_db_postgres:SQL_TYPE="TIMETZ"
[docs]classTimeDeltaField(Field[datetime.timedelta]):""" A field for storing time differences. """field_type=datetime.timedeltaSQL_TYPE="BIGINT"class_db_oracle:SQL_TYPE="NUMBER(19)"defto_python_value(self,value:Any)->Optional[datetime.timedelta]:self.validate(value)ifvalueisNoneorisinstance(value,datetime.timedelta):returnvaluereturndatetime.timedelta(microseconds=value)defto_db_value(self,value:Optional[datetime.timedelta],instance:"Union[Type[Model], Model]")->Optional[int]:self.validate(value)ifvalueisNone:returnNonereturn(value.days*86400000000)+(value.seconds*1000000)+value.microseconds
[docs]classJSONField(Field[Union[dict,list]],dict,list):# type: ignore""" JSON field. This field can store dictionaries or lists of any JSON-compliant structure. You can specify your own custom JSON encoder/decoder, leaving at the default should work well. If you have ``orjson`` installed, we default to using that, else the default ``json`` module will be used. ``encoder``: The custom JSON encoder. ``decoder``: The custom JSON decoder. """SQL_TYPE="JSON"indexable=Falseclass_db_postgres:SQL_TYPE="JSONB"class_db_mssql:SQL_TYPE="NVARCHAR(MAX)"class_db_oracle:SQL_TYPE="NCLOB"def__init__(self,encoder:JsonDumpsFunc=JSON_DUMPS,decoder:JsonLoadsFunc=JSON_LOADS,**kwargs:Any,)->None:super().__init__(**kwargs)self.encoder=encoderself.decoder=decoderdefto_db_value(self,value:Optional[Union[dict,list,str,bytes]],instance:"Union[Type[Model], Model]")->Optional[str]:self.validate(value)ifvalueisnotNone:ifisinstance(value,(str,bytes)):try:self.decoder(value)exceptException:raiseFieldError(f"Value {value!r} is invalid json value.")ifisinstance(value,bytes):value=value.decode()else:value=self.encoder(value)returnvaluedefto_python_value(self,value:Optional[Union[str,bytes,dict,list]])->Optional[Union[dict,list]]:ifisinstance(value,(str,bytes)):try:returnself.decoder(value)exceptException:raiseFieldError(f"Value {valueifisinstance(value,str)elsevalue.decode()} is invalid json value.")self.validate(value)returnvalue
[docs]classUUIDField(Field[UUID],UUID):""" UUID Field This field can store uuid value. If used as a primary key, it will auto-generate a UUID4 by default. """SQL_TYPE="CHAR(36)"class_db_postgres:SQL_TYPE="UUID"def__init__(self,**kwargs:Any)->None:if(kwargs.get("primary_key")orkwargs.get("pk",False))and"default"notinkwargs:kwargs["default"]=uuid4super().__init__(**kwargs)defto_db_value(self,value:Any,instance:"Union[Type[Model], Model]")->Optional[str]:returnvalueandstr(value)defto_python_value(self,value:Any)->Optional[UUID]:ifvalueisNoneorisinstance(value,UUID):returnvaluereturnUUID(value)
[docs]classBinaryField(Field[bytes],bytes):# type: ignore""" Binary field. This is for storing ``bytes`` objects. Note that filter or queryset-update operations are not supported. """indexable=FalseSQL_TYPE="BLOB"class_db_postgres:SQL_TYPE="BYTEA"class_db_mysql:SQL_TYPE="LONGBLOB"class_db_mssql:SQL_TYPE="VARBINARY(MAX)"
classIntEnumFieldInstance(SmallIntField):def__init__(self,enum_type:Type[IntEnum],description:Optional[str]=None,generated:bool=False,**kwargs:Any,)->None:# Validate valuesminimum=1ifgeneratedelse-32768foriteminenum_type:try:value=int(item.value)exceptValueError:raiseConfigurationError("IntEnumField only supports integer enums!")ifnotminimum<=value<32768:raiseConfigurationError("The valid range of IntEnumField's values is {}..32767!".format(minimum))# Automatic description for the field if not specified by the userifdescriptionisNone:description="\n".join([f"{e.name}: {int(e.value)}"foreinenum_type])[:2048]super().__init__(description=description,**kwargs)self.enum_type=enum_typedefto_python_value(self,value:Union[int,None])->Union[IntEnum,None]:value=self.enum_type(value)ifvalueisnotNoneelseNoneself.validate(value)returnvaluedefto_db_value(self,value:Union[IntEnum,None,int],instance:"Union[Type[Model], Model]")->Union[int,None]:ifisinstance(value,IntEnum):value=int(value.value)ifisinstance(value,int):value=int(self.enum_type(value))self.validate(value)returnvalueIntEnumType=TypeVar("IntEnumType",bound=IntEnum)
[docs]defIntEnumField(enum_type:Type[IntEnumType],description:Optional[str]=None,**kwargs:Any,)->IntEnumType:""" Enum Field A field representing an integer enumeration. The description of the field is set automatically if not specified to a multiline list of "name: value" pairs. **Note**: Valid int value of ``enum_type`` is acceptable. ``enum_type``: The enum class ``description``: The description of the field. It is set automatically if not specified to a multiline list of "name: value" pairs. """returnIntEnumFieldInstance(enum_type,description,**kwargs)# type: ignore
classCharEnumFieldInstance(CharField):def__init__(self,enum_type:Type[Enum],description:Optional[str]=None,max_length:int=0,**kwargs:Any,)->None:# Automatic description for the field if not specified by the userifdescriptionisNone:description="\n".join([f"{e.name}: {str(e.value)}"foreinenum_type])[:2048]# Automatic CharField max_lengthifmax_length==0:foriteminenum_type:item_len=len(str(item.value))ifitem_len>max_length:max_length=item_lensuper().__init__(description=description,max_length=max_length,**kwargs)self.enum_type=enum_typedefto_python_value(self,value:Union[str,None])->Union[Enum,None]:self.validate(value)returnself.enum_type(value)ifvalueisnotNoneelseNonedefto_db_value(self,value:Union[Enum,None,str],instance:"Union[Type[Model], Model]")->Union[str,None]:self.validate(value)ifisinstance(value,Enum):returnstr(value.value)ifisinstance(value,str):returnstr(self.enum_type(value).value)returnvalueCharEnumType=TypeVar("CharEnumType",bound=Enum)
[docs]defCharEnumField(enum_type:Type[CharEnumType],description:Optional[str]=None,max_length:int=0,**kwargs:Any,)->CharEnumType:""" Char Enum Field A field representing a character enumeration. **Warning**: If ``max_length`` is not specified or equals to zero, the size of represented char fields is automatically detected. So if later you update the enum, you need to update your table schema as well. **Note**: Valid str value of ``enum_type`` is acceptable. ``enum_type``: The enum class ``description``: The description of the field. It is set automatically if not specified to a multiline list of "name: value" pairs. ``max_length``: The length of the created CharField. If it is zero it is automatically detected from enum_type. """returnCharEnumFieldInstance(enum_type,description,max_length,**kwargs)# type: ignore