"""Task Annotations.Annotations is a nice term for monkey-patching task classesin the configuration.This prepares and performs the annotations in the:setting:`task_annotations` setting."""fromcelery.utils.functionalimportfirstmethod,mlazyfromcelery.utils.importsimportinstantiate_first_match=firstmethod('annotate')_first_match_any=firstmethod('annotate_any')__all__=('MapAnnotation','prepare','resolve_all')
[文档]defprepare(annotations):"""Expand the :setting:`task_annotations` setting."""defexpand_annotation(annotation):ifisinstance(annotation,dict):returnMapAnnotation(annotation)elifisinstance(annotation,str):returnmlazy(instantiate,annotation)returnannotationifannotationsisNone:return()elifnotisinstance(annotations,(list,tuple)):annotations=(annotations,)return[expand_annotation(anno)forannoinannotations]
[文档]defresolve_all(anno,task):"""Resolve all pending annotations."""return(xforxin(_first_match(anno,task),_first_match_any(anno))ifx)