"""Versioned mixin class and other utilities."""importdatetimefromsqlalchemyimportand_fromsqlalchemyimportColumnfromsqlalchemyimportDateTimefromsqlalchemyimporteventfromsqlalchemyimportForeignKeyConstraintfromsqlalchemyimportfuncfromsqlalchemyimportinspectfromsqlalchemyimportIntegerfromsqlalchemyimportPrimaryKeyConstraintfromsqlalchemyimportselectfromsqlalchemyimportutilfromsqlalchemy.ormimportattributesfromsqlalchemy.ormimportobject_mapperfromsqlalchemy.orm.excimportUnmappedColumnErrorfromsqlalchemy.orm.relationshipsimportRelationshipPropertydefcol_references_table(col,table):forfkincol.foreign_keys:iffk.references(table):returnTruereturnFalsedef_is_versioning_col(col):return"version_meta"incol.infodef_history_mapper(local_mapper):cls=local_mapper.class_ifcls.__dict__.get("_history_mapper_configured",False):returncls._history_mapper_configured=Truesuper_mapper=local_mapper.inheritspolymorphic_on=Nonesuper_fks=[]properties=util.OrderedDict()ifsuper_mapper:super_history_mapper=super_mapper.class_.__history_mapper__else:super_history_mapper=Noneif(notsuper_mapperorlocal_mapper.local_tableisnotsuper_mapper.local_table):version_meta={"version_meta":True}# add column.info to identify# columns specific to versioninghistory_table=local_mapper.local_table.to_metadata(local_mapper.local_table.metadata,name=local_mapper.local_table.name+"_history",)foridxinhistory_table.indexes:ifidx.nameisnotNone:idx.name+="_history"idx.unique=Falsefororig_c,history_cinzip(local_mapper.local_table.c,history_table.c):orig_c.info["history_copy"]=history_chistory_c.unique=Falsehistory_c.default=history_c.server_default=Nonehistory_c.autoincrement=Falseifsuper_mapperandcol_references_table(orig_c,super_mapper.local_table):assertsuper_history_mapperisnotNonesuper_fks.append((history_c.key,list(super_history_mapper.local_table.primary_key)[0],))iforig_cislocal_mapper.polymorphic_on:polymorphic_on=history_corig_prop=local_mapper.get_property_by_column(orig_c)# carry over column re-mappingsif(len(orig_prop.columns)>1ororig_prop.columns[0].key!=orig_prop.key):properties[orig_prop.key]=tuple(col.info["history_copy"]forcolinorig_prop.columns)forconstinlist(history_table.constraints):ifnotisinstance(const,(PrimaryKeyConstraint,ForeignKeyConstraint)):history_table.constraints.discard(const)# "version" stores the integer version id. This column is# required.history_table.append_column(Column("version",Integer,primary_key=True,autoincrement=False,info=version_meta,))# "changed" column stores the UTC timestamp of when the# history row was created.# This column is optional and can be omitted.history_table.append_column(Column("changed",DateTime,default=lambda:datetime.datetime.now(datetime.timezone.utc),info=version_meta,))ifsuper_mapper:super_fks.append(("version",super_history_mapper.local_table.c.version))ifsuper_fks:history_table.append_constraint(ForeignKeyConstraint(*zip(*super_fks)))else:history_table=Nonesuper_history_table=super_mapper.local_table.metadata.tables[super_mapper.local_table.name+"_history"]# single table inheritance. take any additional columns that may have# been added and add them to the history table.forcolumninlocal_mapper.local_table.c:ifcolumn.keynotinsuper_history_table.c:col=Column(column.name,column.type,nullable=column.nullable)super_history_table.append_column(col)ifnotsuper_mapper:defdefault_version_from_history(context):# Set default value of version column to the maximum of the# version in history columns already present +1# Otherwise re-appearance of deleted rows would cause an error# with the next updatecurrent_parameters=context.get_current_parameters()returncontext.connection.scalar(select(func.coalesce(func.max(history_table.c.version),0)+1).where(and_(*[history_table.c[c.name]==current_parameters.get(c.name,None)forcininspect(local_mapper.local_table).primary_key])))local_mapper.local_table.append_column(Column("version",Integer,# if rows are not being deleted from the main table with# subsequent re-use of primary key, this default can be# "1" instead of running a query per INSERTdefault=default_version_from_history,nullable=False,),replace_existing=True,)local_mapper.add_property("version",local_mapper.local_table.c.version)ifcls.use_mapper_versioning:local_mapper.version_id_col=local_mapper.local_table.c.version# set the "active_history" flag# on on column-mapped attributes so that the old version# of the info is always loaded (currently sets it on all attributes)forpropinlocal_mapper.iterate_properties:prop.active_history=Truesuper_mapper=local_mapper.inheritsifsuper_history_mapper:bases=(super_history_mapper.class_,)ifhistory_tableisnotNone:properties["changed"]=(history_table.c.changed,)+tuple(super_history_mapper.attrs.changed.columns)else:bases=local_mapper.base_mapper.class_.__bases__versioned_cls=type("%sHistory"%cls.__name__,bases,{"_history_mapper_configured":True,"__table__":history_table,"__mapper_args__":dict(inherits=super_history_mapper,polymorphic_identity=local_mapper.polymorphic_identity,polymorphic_on=polymorphic_on,properties=properties,),},)cls.__history_mapper__=versioned_cls.__mapper__classVersioned:use_mapper_versioning=False"""if True, also assign the version column to be tracked by the mapper"""__table_args__={"sqlite_autoincrement":True}"""Use sqlite_autoincrement, to ensure unique integer values are used for new rows even for rows that have been deleted."""def__init_subclass__(cls)->None:insp=inspect(cls,raiseerr=False)ifinspisnotNone:_history_mapper(insp)else:@event.listens_for(cls,"after_mapper_constructed")def_mapper_constructed(mapper,class_):_history_mapper(mapper)super().__init_subclass__()defversioned_objects(iter_):forobjiniter_:ifhasattr(obj,"__history_mapper__"):yieldobjdefcreate_version(obj,session,deleted=False):obj_mapper=object_mapper(obj)history_mapper=obj.__history_mapper__history_cls=history_mapper.class_obj_state=attributes.instance_state(obj)attr={}obj_changed=Falseforom,hminzip(obj_mapper.iterate_to_root(),history_mapper.iterate_to_root()):ifhm.single:continueforhist_colinhm.local_table.c:if_is_versioning_col(hist_col):continueobj_col=om.local_table.c[hist_col.key]# get the value of the# attribute based on the MapperProperty related to the# mapped column. this will allow usage of MapperProperties# that have a different keyname than that of the mapped column.try:prop=obj_mapper.get_property_by_column(obj_col)exceptUnmappedColumnError:# in the case of single table inheritance, there may be# columns on the mapped table intended for the subclass only.# the "unmapped" status of the subclass column on the# base class is a feature of the declarative module.continue# expired object attributes and also deferred cols might not# be in the dict. force it to load no matter what by# using getattr().ifprop.keynotinobj_state.dict:getattr(obj,prop.key)a,u,d=attributes.get_history(obj,prop.key)ifd:attr[prop.key]=d[0]obj_changed=Trueelifu:attr[prop.key]=u[0]elifa:# if the attribute had no value.attr[prop.key]=a[0]obj_changed=Trueifnotobj_changed:# not changed, but we have relationships. OK# check those tooforpropinobj_mapper.iterate_properties:if(isinstance(prop,RelationshipProperty)andattributes.get_history(obj,prop.key,passive=attributes.PASSIVE_NO_INITIALIZE).has_changes()):forpinprop.local_columns:ifp.foreign_keys:obj_changed=Truebreakifobj_changedisTrue:breakifnotobj_changedandnotdeleted:returnattr["version"]=obj.versionhist=history_cls()forkey,valueinattr.items():setattr(hist,key,value)session.add(hist)obj.version+=1defversioned_session(session):@event.listens_for(session,"before_flush")defbefore_flush(session,flush_context,instances):forobjinversioned_objects(session.dirty):create_version(obj,session)forobjinversioned_objects(session.deleted):create_version(obj,session,deleted=True)