[文档]classDOT:"""Constants related to the dot format."""HEAD=dedent("""{IN}{type}{id} {{{INp}graph [{attrs}] """)ATTR='{name}={value}'NODE='{INp}"{0}" [{attrs}]'EDGE='{INp}"{0}" {dir} "{1}" [{attrs}]'ATTRSEP=', 'DIRS={'graph':'--','digraph':'->'}TAIL='{IN}}}'
[文档]classCycleError(Exception):"""A cycle was detected in an acyclic graph."""
[文档]classDependencyGraph:"""A directed acyclic graph of objects and their dependencies. Supports a robust topological sort to detect the order in which they must be handled. Takes an optional iterator of ``(obj, dependencies)`` tuples to build the graph from. Warning: Does not support cycle detection. """def__init__(self,it=None,formatter=None):self.formatter=formatterorGraphFormatter()self.adjacent={}ifitisnotNone:self.update(it)
[文档]defadd_arc(self,obj):"""Add an object to the graph."""self.adjacent.setdefault(obj,[])
[文档]defadd_edge(self,A,B):"""Add an edge from object ``A`` to object ``B``. I.e. ``A`` depends on ``B``. """self[A].append(B)
[文档]defconnect(self,graph):"""Add nodes from another graph."""self.adjacent.update(graph.adjacent)
[文档]deftopsort(self):"""Sort the graph topologically. Returns: List: of objects in the order in which they must be handled. """graph=DependencyGraph()components=self._tarjan72()NC={node:componentforcomponentincomponentsfornodeincomponent}forcomponentincomponents:graph.add_arc(component)fornodeinself:node_c=NC[node]forsuccessorinself[node]:successor_c=NC[successor]ifnode_c!=successor_c:graph.add_edge(node_c,successor_c)return[t[0]fortingraph._khan62()]
[文档]defvalency_of(self,obj):"""Return the valency (degree) of a vertex in the graph."""try:l=[len(self[obj])]exceptKeyError:return0fornodeinself[obj]:l.append(self.valency_of(node))returnsum(l)
[文档]defupdate(self,it):"""Update graph with data from a list of ``(obj, deps)`` tuples."""tups=list(it)forobj,_intups:self.add_arc(obj)forobj,depsintups:fordepindeps:self.add_edge(obj,dep)
[文档]defedges(self):"""Return generator that yields for all edges in the graph."""return(objforobj,adjinself.items()ifadj)
def_khan62(self):"""Perform Khan's simple topological sort algorithm from '62. See https://en.wikipedia.org/wiki/Topological_sorting """count=Counter()result=[]fornodeinself:forsuccessorinself[node]:count[successor]+=1ready=[nodefornodeinselfifnotcount[node]]whileready:node=ready.pop()result.append(node)forsuccessorinself[node]:count[successor]-=1ifcount[successor]==0:ready.append(successor)result.reverse()returnresultdef_tarjan72(self):"""Perform Tarjan's algorithm to find strongly connected components. See Also: :wikipedia:`Tarjan%27s_strongly_connected_components_algorithm` """result,stack,low=[],[],{}defvisit(node):ifnodeinlow:returnnum=len(low)low[node]=numstack_pos=len(stack)stack.append(node)forsuccessorinself[node]:visit(successor)low[node]=min(low[node],low[successor])ifnum==low[node]:component=tuple(stack[stack_pos:])stack[stack_pos:]=[]result.append(component)foritemincomponent:low[item]=len(self)fornodeinself:visit(node)returnresult
[文档]defto_dot(self,fh,formatter=None):"""Convert the graph to DOT format. Arguments: fh (IO): A file, or a file-like object to write the graph to. formatter (celery.utils.graph.GraphFormatter): Custom graph formatter to use. """seen=set()draw=formatterorself.formatterdefP(s):print(bytes_to_str(s),file=fh)defif_not_seen(fun,obj):ifdraw.label(obj)notinseen:P(fun(obj))seen.add(draw.label(obj))P(draw.head())forobj,adjacentinself.items():ifnotadjacent:if_not_seen(draw.terminal_node,obj)forreqinadjacent:if_not_seen(draw.node,obj)P(draw.edge(obj,req))P(draw.tail())