Spaces:
Paused
Paused
| """ | |
| Text-based visual representations of graphs | |
| """ | |
| import sys | |
| import warnings | |
| from collections import defaultdict | |
| import networkx as nx | |
| from networkx.utils import open_file | |
| __all__ = ["forest_str", "generate_network_text", "write_network_text"] | |
| class BaseGlyphs: | |
| def as_dict(cls): | |
| return { | |
| a: getattr(cls, a) | |
| for a in dir(cls) | |
| if not a.startswith("_") and a != "as_dict" | |
| } | |
| class AsciiBaseGlyphs(BaseGlyphs): | |
| empty: str = "+" | |
| newtree_last: str = "+-- " | |
| newtree_mid: str = "+-- " | |
| endof_forest: str = " " | |
| within_forest: str = ": " | |
| within_tree: str = "| " | |
| class AsciiDirectedGlyphs(AsciiBaseGlyphs): | |
| last: str = "L-> " | |
| mid: str = "|-> " | |
| backedge: str = "<-" | |
| vertical_edge: str = "!" | |
| class AsciiUndirectedGlyphs(AsciiBaseGlyphs): | |
| last: str = "L-- " | |
| mid: str = "|-- " | |
| backedge: str = "-" | |
| vertical_edge: str = "|" | |
| class UtfBaseGlyphs(BaseGlyphs): | |
| # Notes on available box and arrow characters | |
| # https://en.wikipedia.org/wiki/Box-drawing_character | |
| # https://stackoverflow.com/questions/2701192/triangle-arrow | |
| empty: str = "β" | |
| newtree_last: str = "βββ " | |
| newtree_mid: str = "βββ " | |
| endof_forest: str = " " | |
| within_forest: str = "β " | |
| within_tree: str = "β " | |
| class UtfDirectedGlyphs(UtfBaseGlyphs): | |
| last: str = "βββΌ " | |
| mid: str = "βββΌ " | |
| backedge: str = "βΎ" | |
| vertical_edge: str = "β½" | |
| class UtfUndirectedGlyphs(UtfBaseGlyphs): | |
| last: str = "βββ " | |
| mid: str = "βββ " | |
| backedge: str = "β" | |
| vertical_edge: str = "β" | |
| def generate_network_text( | |
| graph, | |
| with_labels=True, | |
| sources=None, | |
| max_depth=None, | |
| ascii_only=False, | |
| vertical_chains=False, | |
| ): | |
| """Generate lines in the "network text" format | |
| This works via a depth-first traversal of the graph and writing a line for | |
| each unique node encountered. Non-tree edges are written to the right of | |
| each node, and connection to a non-tree edge is indicated with an ellipsis. | |
| This representation works best when the input graph is a forest, but any | |
| graph can be represented. | |
| This notation is original to networkx, although it is simple enough that it | |
| may be known in existing literature. See #5602 for details. The procedure | |
| is summarized as follows: | |
| 1. Given a set of source nodes (which can be specified, or automatically | |
| discovered via finding the (strongly) connected components and choosing one | |
| node with minimum degree from each), we traverse the graph in depth first | |
| order. | |
| 2. Each reachable node will be printed exactly once on it's own line. | |
| 3. Edges are indicated in one of four ways: | |
| a. a parent "L-style" connection on the upper left. This corresponds to | |
| a traversal in the directed DFS tree. | |
| b. a backref "<-style" connection shown directly on the right. For | |
| directed graphs, these are drawn for any incoming edges to a node that | |
| is not a parent edge. For undirected graphs, these are drawn for only | |
| the non-parent edges that have already been represented (The edges that | |
| have not been represented will be handled in the recursive case). | |
| c. a child "L-style" connection on the lower right. Drawing of the | |
| children are handled recursively. | |
| d. if ``vertical_chains`` is true, and a parent node only has one child | |
| a "vertical-style" edge is drawn between them. | |
| 4. The children of each node (wrt the directed DFS tree) are drawn | |
| underneath and to the right of it. In the case that a child node has already | |
| been drawn the connection is replaced with an ellipsis ("...") to indicate | |
| that there is one or more connections represented elsewhere. | |
| 5. If a maximum depth is specified, an edge to nodes past this maximum | |
| depth will be represented by an ellipsis. | |
| 6. If a a node has a truthy "collapse" value, then we do not traverse past | |
| that node. | |
| Parameters | |
| ---------- | |
| graph : nx.DiGraph | nx.Graph | |
| Graph to represent | |
| with_labels : bool | str | |
| If True will use the "label" attribute of a node to display if it | |
| exists otherwise it will use the node value itself. If given as a | |
| string, then that attribute name will be used instead of "label". | |
| Defaults to True. | |
| sources : List | |
| Specifies which nodes to start traversal from. Note: nodes that are not | |
| reachable from one of these sources may not be shown. If unspecified, | |
| the minimal set of nodes needed to reach all others will be used. | |
| max_depth : int | None | |
| The maximum depth to traverse before stopping. Defaults to None. | |
| ascii_only : Boolean | |
| If True only ASCII characters are used to construct the visualization | |
| vertical_chains : Boolean | |
| If True, chains of nodes will be drawn vertically when possible. | |
| Yields | |
| ------ | |
| str : a line of generated text | |
| Examples | |
| -------- | |
| >>> graph = nx.path_graph(10) | |
| >>> graph.add_node('A') | |
| >>> graph.add_node('B') | |
| >>> graph.add_node('C') | |
| >>> graph.add_node('D') | |
| >>> graph.add_edge(9, 'A') | |
| >>> graph.add_edge(9, 'B') | |
| >>> graph.add_edge(9, 'C') | |
| >>> graph.add_edge('C', 'D') | |
| >>> graph.add_edge('C', 'E') | |
| >>> graph.add_edge('C', 'F') | |
| >>> nx.write_network_text(graph) | |
| βββ 0 | |
| βββ 1 | |
| βββ 2 | |
| βββ 3 | |
| βββ 4 | |
| βββ 5 | |
| βββ 6 | |
| βββ 7 | |
| βββ 8 | |
| βββ 9 | |
| βββ A | |
| βββ B | |
| βββ C | |
| βββ D | |
| βββ E | |
| βββ F | |
| >>> nx.write_network_text(graph, vertical_chains=True) | |
| βββ 0 | |
| β | |
| 1 | |
| β | |
| 2 | |
| β | |
| 3 | |
| β | |
| 4 | |
| β | |
| 5 | |
| β | |
| 6 | |
| β | |
| 7 | |
| β | |
| 8 | |
| β | |
| 9 | |
| βββ A | |
| βββ B | |
| βββ C | |
| βββ D | |
| βββ E | |
| βββ F | |
| """ | |
| from typing import Any, NamedTuple | |
| class StackFrame(NamedTuple): | |
| parent: Any | |
| node: Any | |
| indents: list | |
| this_islast: bool | |
| this_vertical: bool | |
| collapse_attr = "collapse" | |
| is_directed = graph.is_directed() | |
| if is_directed: | |
| glyphs = AsciiDirectedGlyphs if ascii_only else UtfDirectedGlyphs | |
| succ = graph.succ | |
| pred = graph.pred | |
| else: | |
| glyphs = AsciiUndirectedGlyphs if ascii_only else UtfUndirectedGlyphs | |
| succ = graph.adj | |
| pred = graph.adj | |
| if isinstance(with_labels, str): | |
| label_attr = with_labels | |
| elif with_labels: | |
| label_attr = "label" | |
| else: | |
| label_attr = None | |
| if max_depth == 0: | |
| yield glyphs.empty + " ..." | |
| elif len(graph.nodes) == 0: | |
| yield glyphs.empty | |
| else: | |
| # If the nodes to traverse are unspecified, find the minimal set of | |
| # nodes that will reach the entire graph | |
| if sources is None: | |
| sources = _find_sources(graph) | |
| # Populate the stack with each: | |
| # 1. parent node in the DFS tree (or None for root nodes), | |
| # 2. the current node in the DFS tree | |
| # 2. a list of indentations indicating depth | |
| # 3. a flag indicating if the node is the final one to be written. | |
| # Reverse the stack so sources are popped in the correct order. | |
| last_idx = len(sources) - 1 | |
| stack = [ | |
| StackFrame(None, node, [], (idx == last_idx), False) | |
| for idx, node in enumerate(sources) | |
| ][::-1] | |
| num_skipped_children = defaultdict(lambda: 0) | |
| seen_nodes = set() | |
| while stack: | |
| parent, node, indents, this_islast, this_vertical = stack.pop() | |
| if node is not Ellipsis: | |
| skip = node in seen_nodes | |
| if skip: | |
| # Mark that we skipped a parent's child | |
| num_skipped_children[parent] += 1 | |
| if this_islast: | |
| # If we reached the last child of a parent, and we skipped | |
| # any of that parents children, then we should emit an | |
| # ellipsis at the end after this. | |
| if num_skipped_children[parent] and parent is not None: | |
| # Append the ellipsis to be emitted last | |
| next_islast = True | |
| try_frame = StackFrame( | |
| node, Ellipsis, indents, next_islast, False | |
| ) | |
| stack.append(try_frame) | |
| # Redo this frame, but not as a last object | |
| next_islast = False | |
| try_frame = StackFrame( | |
| parent, node, indents, next_islast, this_vertical | |
| ) | |
| stack.append(try_frame) | |
| continue | |
| if skip: | |
| continue | |
| seen_nodes.add(node) | |
| if not indents: | |
| # Top level items (i.e. trees in the forest) get different | |
| # glyphs to indicate they are not actually connected | |
| if this_islast: | |
| this_vertical = False | |
| this_prefix = indents + [glyphs.newtree_last] | |
| next_prefix = indents + [glyphs.endof_forest] | |
| else: | |
| this_prefix = indents + [glyphs.newtree_mid] | |
| next_prefix = indents + [glyphs.within_forest] | |
| else: | |
| # Non-top-level items | |
| if this_vertical: | |
| this_prefix = indents | |
| next_prefix = indents | |
| else: | |
| if this_islast: | |
| this_prefix = indents + [glyphs.last] | |
| next_prefix = indents + [glyphs.endof_forest] | |
| else: | |
| this_prefix = indents + [glyphs.mid] | |
| next_prefix = indents + [glyphs.within_tree] | |
| if node is Ellipsis: | |
| label = " ..." | |
| suffix = "" | |
| children = [] | |
| else: | |
| if label_attr is not None: | |
| label = str(graph.nodes[node].get(label_attr, node)) | |
| else: | |
| label = str(node) | |
| # Determine if we want to show the children of this node. | |
| if collapse_attr is not None: | |
| collapse = graph.nodes[node].get(collapse_attr, False) | |
| else: | |
| collapse = False | |
| # Determine: | |
| # (1) children to traverse into after showing this node. | |
| # (2) parents to immediately show to the right of this node. | |
| if is_directed: | |
| # In the directed case we must show every successor node | |
| # note: it may be skipped later, but we don't have that | |
| # information here. | |
| children = list(succ[node]) | |
| # In the directed case we must show every predecessor | |
| # except for parent we directly traversed from. | |
| handled_parents = {parent} | |
| else: | |
| # Showing only the unseen children results in a more | |
| # concise representation for the undirected case. | |
| children = [ | |
| child for child in succ[node] if child not in seen_nodes | |
| ] | |
| # In the undirected case, parents are also children, so we | |
| # only need to immediately show the ones we can no longer | |
| # traverse | |
| handled_parents = {*children, parent} | |
| if max_depth is not None and len(indents) == max_depth - 1: | |
| # Use ellipsis to indicate we have reached maximum depth | |
| if children: | |
| children = [Ellipsis] | |
| handled_parents = {parent} | |
| if collapse: | |
| # Collapsing a node is the same as reaching maximum depth | |
| if children: | |
| children = [Ellipsis] | |
| handled_parents = {parent} | |
| # The other parents are other predecessors of this node that | |
| # are not handled elsewhere. | |
| other_parents = [p for p in pred[node] if p not in handled_parents] | |
| if other_parents: | |
| if label_attr is not None: | |
| other_parents_labels = ", ".join( | |
| [ | |
| str(graph.nodes[p].get(label_attr, p)) | |
| for p in other_parents | |
| ] | |
| ) | |
| else: | |
| other_parents_labels = ", ".join( | |
| [str(p) for p in other_parents] | |
| ) | |
| suffix = " ".join(["", glyphs.backedge, other_parents_labels]) | |
| else: | |
| suffix = "" | |
| # Emit the line for this node, this will be called for each node | |
| # exactly once. | |
| if this_vertical: | |
| yield "".join(this_prefix + [glyphs.vertical_edge]) | |
| yield "".join(this_prefix + [label, suffix]) | |
| if vertical_chains: | |
| if is_directed: | |
| num_children = len(set(children)) | |
| else: | |
| num_children = len(set(children) - {parent}) | |
| # The next node can be drawn vertically if it is the only | |
| # remaining child of this node. | |
| next_is_vertical = num_children == 1 | |
| else: | |
| next_is_vertical = False | |
| # Push children on the stack in reverse order so they are popped in | |
| # the original order. | |
| for idx, child in enumerate(children[::-1]): | |
| next_islast = idx == 0 | |
| try_frame = StackFrame( | |
| node, child, next_prefix, next_islast, next_is_vertical | |
| ) | |
| stack.append(try_frame) | |
| def write_network_text( | |
| graph, | |
| path=None, | |
| with_labels=True, | |
| sources=None, | |
| max_depth=None, | |
| ascii_only=False, | |
| end="\n", | |
| vertical_chains=False, | |
| ): | |
| """Creates a nice text representation of a graph | |
| This works via a depth-first traversal of the graph and writing a line for | |
| each unique node encountered. Non-tree edges are written to the right of | |
| each node, and connection to a non-tree edge is indicated with an ellipsis. | |
| This representation works best when the input graph is a forest, but any | |
| graph can be represented. | |
| Parameters | |
| ---------- | |
| graph : nx.DiGraph | nx.Graph | |
| Graph to represent | |
| path : string or file or callable or None | |
| Filename or file handle for data output. | |
| if a function, then it will be called for each generated line. | |
| if None, this will default to "sys.stdout.write" | |
| with_labels : bool | str | |
| If True will use the "label" attribute of a node to display if it | |
| exists otherwise it will use the node value itself. If given as a | |
| string, then that attribute name will be used instead of "label". | |
| Defaults to True. | |
| sources : List | |
| Specifies which nodes to start traversal from. Note: nodes that are not | |
| reachable from one of these sources may not be shown. If unspecified, | |
| the minimal set of nodes needed to reach all others will be used. | |
| max_depth : int | None | |
| The maximum depth to traverse before stopping. Defaults to None. | |
| ascii_only : Boolean | |
| If True only ASCII characters are used to construct the visualization | |
| end : string | |
| The line ending character | |
| vertical_chains : Boolean | |
| If True, chains of nodes will be drawn vertically when possible. | |
| Examples | |
| -------- | |
| >>> graph = nx.balanced_tree(r=2, h=2, create_using=nx.DiGraph) | |
| >>> nx.write_network_text(graph) | |
| βββ 0 | |
| βββΌ 1 | |
| β βββΌ 3 | |
| β βββΌ 4 | |
| βββΌ 2 | |
| βββΌ 5 | |
| βββΌ 6 | |
| >>> # A near tree with one non-tree edge | |
| >>> graph.add_edge(5, 1) | |
| >>> nx.write_network_text(graph) | |
| βββ 0 | |
| βββΌ 1 βΎ 5 | |
| β βββΌ 3 | |
| β βββΌ 4 | |
| βββΌ 2 | |
| βββΌ 5 | |
| β βββΌ ... | |
| βββΌ 6 | |
| >>> graph = nx.cycle_graph(5) | |
| >>> nx.write_network_text(graph) | |
| βββ 0 | |
| βββ 1 | |
| β βββ 2 | |
| β βββ 3 | |
| β βββ 4 β 0 | |
| βββ ... | |
| >>> graph = nx.cycle_graph(5, nx.DiGraph) | |
| >>> nx.write_network_text(graph, vertical_chains=True) | |
| βββ 0 βΎ 4 | |
| β½ | |
| 1 | |
| β½ | |
| 2 | |
| β½ | |
| 3 | |
| β½ | |
| 4 | |
| βββΌ ... | |
| >>> nx.write_network_text(graph, vertical_chains=True, ascii_only=True) | |
| +-- 0 <- 4 | |
| ! | |
| 1 | |
| ! | |
| 2 | |
| ! | |
| 3 | |
| ! | |
| 4 | |
| L-> ... | |
| >>> graph = nx.generators.barbell_graph(4, 2) | |
| >>> nx.write_network_text(graph, vertical_chains=False) | |
| βββ 4 | |
| βββ 5 | |
| β βββ 6 | |
| β βββ 7 | |
| β β βββ 8 β 6 | |
| β β β βββ 9 β 6, 7 | |
| β β βββ ... | |
| β βββ ... | |
| βββ 3 | |
| βββ 0 | |
| β βββ 1 β 3 | |
| β β βββ 2 β 0, 3 | |
| β βββ ... | |
| βββ ... | |
| >>> nx.write_network_text(graph, vertical_chains=True) | |
| βββ 4 | |
| βββ 5 | |
| β β | |
| β 6 | |
| β βββ 7 | |
| β β βββ 8 β 6 | |
| β β β β | |
| β β β 9 β 6, 7 | |
| β β βββ ... | |
| β βββ ... | |
| βββ 3 | |
| βββ 0 | |
| β βββ 1 β 3 | |
| β β β | |
| β β 2 β 0, 3 | |
| β βββ ... | |
| βββ ... | |
| >>> graph = nx.complete_graph(5, create_using=nx.Graph) | |
| >>> nx.write_network_text(graph) | |
| βββ 0 | |
| βββ 1 | |
| β βββ 2 β 0 | |
| β β βββ 3 β 0, 1 | |
| β β β βββ 4 β 0, 1, 2 | |
| β β βββ ... | |
| β βββ ... | |
| βββ ... | |
| >>> graph = nx.complete_graph(3, create_using=nx.DiGraph) | |
| >>> nx.write_network_text(graph) | |
| βββ 0 βΎ 1, 2 | |
| βββΌ 1 βΎ 2 | |
| β βββΌ 2 βΎ 0 | |
| β β βββΌ ... | |
| β βββΌ ... | |
| βββΌ ... | |
| """ | |
| if path is None: | |
| # The path is unspecified, write to stdout | |
| _write = sys.stdout.write | |
| elif hasattr(path, "write"): | |
| # The path is already an open file | |
| _write = path.write | |
| elif callable(path): | |
| # The path is a custom callable | |
| _write = path | |
| else: | |
| raise TypeError(type(path)) | |
| for line in generate_network_text( | |
| graph, | |
| with_labels=with_labels, | |
| sources=sources, | |
| max_depth=max_depth, | |
| ascii_only=ascii_only, | |
| vertical_chains=vertical_chains, | |
| ): | |
| _write(line + end) | |
| def _find_sources(graph): | |
| """ | |
| Determine a minimal set of nodes such that the entire graph is reachable | |
| """ | |
| # For each connected part of the graph, choose at least | |
| # one node as a starting point, preferably without a parent | |
| if graph.is_directed(): | |
| # Choose one node from each SCC with minimum in_degree | |
| sccs = list(nx.strongly_connected_components(graph)) | |
| # condensing the SCCs forms a dag, the nodes in this graph with | |
| # 0 in-degree correspond to the SCCs from which the minimum set | |
| # of nodes from which all other nodes can be reached. | |
| scc_graph = nx.condensation(graph, sccs) | |
| supernode_to_nodes = {sn: [] for sn in scc_graph.nodes()} | |
| # Note: the order of mapping differs between pypy and cpython | |
| # so we have to loop over graph nodes for consistency | |
| mapping = scc_graph.graph["mapping"] | |
| for n in graph.nodes: | |
| sn = mapping[n] | |
| supernode_to_nodes[sn].append(n) | |
| sources = [] | |
| for sn in scc_graph.nodes(): | |
| if scc_graph.in_degree[sn] == 0: | |
| scc = supernode_to_nodes[sn] | |
| node = min(scc, key=lambda n: graph.in_degree[n]) | |
| sources.append(node) | |
| else: | |
| # For undirected graph, the entire graph will be reachable as | |
| # long as we consider one node from every connected component | |
| sources = [ | |
| min(cc, key=lambda n: graph.degree[n]) | |
| for cc in nx.connected_components(graph) | |
| ] | |
| sources = sorted(sources, key=lambda n: graph.degree[n]) | |
| return sources | |
| def forest_str(graph, with_labels=True, sources=None, write=None, ascii_only=False): | |
| """Creates a nice utf8 representation of a forest | |
| This function has been superseded by | |
| :func:`nx.readwrite.text.generate_network_text`, which should be used | |
| instead. | |
| Parameters | |
| ---------- | |
| graph : nx.DiGraph | nx.Graph | |
| Graph to represent (must be a tree, forest, or the empty graph) | |
| with_labels : bool | |
| If True will use the "label" attribute of a node to display if it | |
| exists otherwise it will use the node value itself. Defaults to True. | |
| sources : List | |
| Mainly relevant for undirected forests, specifies which nodes to list | |
| first. If unspecified the root nodes of each tree will be used for | |
| directed forests; for undirected forests this defaults to the nodes | |
| with the smallest degree. | |
| write : callable | |
| Function to use to write to, if None new lines are appended to | |
| a list and returned. If set to the `print` function, lines will | |
| be written to stdout as they are generated. If specified, | |
| this function will return None. Defaults to None. | |
| ascii_only : Boolean | |
| If True only ASCII characters are used to construct the visualization | |
| Returns | |
| ------- | |
| str | None : | |
| utf8 representation of the tree / forest | |
| Examples | |
| -------- | |
| >>> graph = nx.balanced_tree(r=2, h=3, create_using=nx.DiGraph) | |
| >>> print(nx.forest_str(graph)) | |
| βββ 0 | |
| βββΌ 1 | |
| β βββΌ 3 | |
| β β βββΌ 7 | |
| β β βββΌ 8 | |
| β βββΌ 4 | |
| β βββΌ 9 | |
| β βββΌ 10 | |
| βββΌ 2 | |
| βββΌ 5 | |
| β βββΌ 11 | |
| β βββΌ 12 | |
| βββΌ 6 | |
| βββΌ 13 | |
| βββΌ 14 | |
| >>> graph = nx.balanced_tree(r=1, h=2, create_using=nx.Graph) | |
| >>> print(nx.forest_str(graph)) | |
| βββ 0 | |
| βββ 1 | |
| βββ 2 | |
| >>> print(nx.forest_str(graph, ascii_only=True)) | |
| +-- 0 | |
| L-- 1 | |
| L-- 2 | |
| """ | |
| msg = ( | |
| "\nforest_str is deprecated as of version 3.1 and will be removed " | |
| "in version 3.3. Use generate_network_text or write_network_text " | |
| "instead.\n" | |
| ) | |
| warnings.warn(msg, DeprecationWarning) | |
| if len(graph.nodes) > 0: | |
| if not nx.is_forest(graph): | |
| raise nx.NetworkXNotImplemented("input must be a forest or the empty graph") | |
| printbuf = [] | |
| if write is None: | |
| _write = printbuf.append | |
| else: | |
| _write = write | |
| write_network_text( | |
| graph, | |
| _write, | |
| with_labels=with_labels, | |
| sources=sources, | |
| ascii_only=ascii_only, | |
| end="", | |
| ) | |
| if write is None: | |
| # Only return a string if the custom write function was not specified | |
| return "\n".join(printbuf) | |
| def _parse_network_text(lines): | |
| """Reconstructs a graph from a network text representation. | |
| This is mainly used for testing. Network text is for display, not | |
| serialization, as such this cannot parse all network text representations | |
| because node labels can be ambiguous with the glyphs and indentation used | |
| to represent edge structure. Additionally, there is no way to determine if | |
| disconnected graphs were originally directed or undirected. | |
| Parameters | |
| ---------- | |
| lines : list or iterator of strings | |
| Input data in network text format | |
| Returns | |
| ------- | |
| G: NetworkX graph | |
| The graph corresponding to the lines in network text format. | |
| """ | |
| from itertools import chain | |
| from typing import Any, NamedTuple, Union | |
| class ParseStackFrame(NamedTuple): | |
| node: Any | |
| indent: int | |
| has_vertical_child: Union[int, None] | |
| initial_line_iter = iter(lines) | |
| is_ascii = None | |
| is_directed = None | |
| ############## | |
| # Initial Pass | |
| ############## | |
| # Do an initial pass over the lines to determine what type of graph it is. | |
| # Remember what these lines were, so we can reiterate over them in the | |
| # parsing pass. | |
| initial_lines = [] | |
| try: | |
| first_line = next(initial_line_iter) | |
| except StopIteration: | |
| ... | |
| else: | |
| initial_lines.append(first_line) | |
| # The first character indicates if it is an ASCII or UTF graph | |
| first_char = first_line[0] | |
| if first_char in { | |
| UtfBaseGlyphs.empty, | |
| UtfBaseGlyphs.newtree_mid[0], | |
| UtfBaseGlyphs.newtree_last[0], | |
| }: | |
| is_ascii = False | |
| elif first_char in { | |
| AsciiBaseGlyphs.empty, | |
| AsciiBaseGlyphs.newtree_mid[0], | |
| AsciiBaseGlyphs.newtree_last[0], | |
| }: | |
| is_ascii = True | |
| else: | |
| raise AssertionError(f"Unexpected first character: {first_char}") | |
| if is_ascii: | |
| directed_glyphs = AsciiDirectedGlyphs.as_dict() | |
| undirected_glyphs = AsciiUndirectedGlyphs.as_dict() | |
| else: | |
| directed_glyphs = UtfDirectedGlyphs.as_dict() | |
| undirected_glyphs = UtfUndirectedGlyphs.as_dict() | |
| # For both directed / undirected glyphs, determine which glyphs never | |
| # appear as substrings in the other undirected / directed glyphs. Glyphs | |
| # with this property unambiguously indicates if a graph is directed / | |
| # undirected. | |
| directed_items = set(directed_glyphs.values()) | |
| undirected_items = set(undirected_glyphs.values()) | |
| unambiguous_directed_items = [] | |
| for item in directed_items: | |
| other_items = undirected_items | |
| other_supersets = [other for other in other_items if item in other] | |
| if not other_supersets: | |
| unambiguous_directed_items.append(item) | |
| unambiguous_undirected_items = [] | |
| for item in undirected_items: | |
| other_items = directed_items | |
| other_supersets = [other for other in other_items if item in other] | |
| if not other_supersets: | |
| unambiguous_undirected_items.append(item) | |
| for line in initial_line_iter: | |
| initial_lines.append(line) | |
| if any(item in line for item in unambiguous_undirected_items): | |
| is_directed = False | |
| break | |
| elif any(item in line for item in unambiguous_directed_items): | |
| is_directed = True | |
| break | |
| if is_directed is None: | |
| # Not enough information to determine, choose undirected by default | |
| is_directed = False | |
| glyphs = directed_glyphs if is_directed else undirected_glyphs | |
| # the backedge symbol by itself can be ambiguous, but with spaces around it | |
| # becomes unambiguous. | |
| backedge_symbol = " " + glyphs["backedge"] + " " | |
| # Reconstruct an iterator over all of the lines. | |
| parsing_line_iter = chain(initial_lines, initial_line_iter) | |
| ############## | |
| # Parsing Pass | |
| ############## | |
| edges = [] | |
| nodes = [] | |
| is_empty = None | |
| noparent = object() # sentinel value | |
| # keep a stack of previous nodes that could be parents of subsequent nodes | |
| stack = [ParseStackFrame(noparent, -1, None)] | |
| for line in parsing_line_iter: | |
| if line == glyphs["empty"]: | |
| # If the line is the empty glyph, we are done. | |
| # There shouldn't be anything else after this. | |
| is_empty = True | |
| continue | |
| if backedge_symbol in line: | |
| # This line has one or more backedges, separate those out | |
| node_part, backedge_part = line.split(backedge_symbol) | |
| backedge_nodes = [u.strip() for u in backedge_part.split(", ")] | |
| # Now the node can be parsed | |
| node_part = node_part.rstrip() | |
| prefix, node = node_part.rsplit(" ", 1) | |
| node = node.strip() | |
| # Add the backedges to the edge list | |
| edges.extend([(u, node) for u in backedge_nodes]) | |
| else: | |
| # No backedge, the tail of this line is the node | |
| prefix, node = line.rsplit(" ", 1) | |
| node = node.strip() | |
| prev = stack.pop() | |
| if node in glyphs["vertical_edge"]: | |
| # Previous node is still the previous node, but we know it will | |
| # have exactly one child, which will need to have its nesting level | |
| # adjusted. | |
| modified_prev = ParseStackFrame( | |
| prev.node, | |
| prev.indent, | |
| True, | |
| ) | |
| stack.append(modified_prev) | |
| continue | |
| # The length of the string before the node characters give us a hint | |
| # about our nesting level. The only case where this doesn't work is | |
| # when there are vertical chains, which is handled explicitly. | |
| indent = len(prefix) | |
| curr = ParseStackFrame(node, indent, None) | |
| if prev.has_vertical_child: | |
| # In this case we know prev must be the parent of our current line, | |
| # so we don't have to search the stack. (which is good because the | |
| # indentation check wouldn't work in this case). | |
| ... | |
| else: | |
| # If the previous node nesting-level is greater than the current | |
| # nodes nesting-level than the previous node was the end of a path, | |
| # and is not our parent. We can safely pop nodes off the stack | |
| # until we find one with a comparable nesting-level, which is our | |
| # parent. | |
| while curr.indent <= prev.indent: | |
| prev = stack.pop() | |
| if node == "...": | |
| # The current previous node is no longer a valid parent, | |
| # keep it popped from the stack. | |
| stack.append(prev) | |
| else: | |
| # The previous and current nodes may still be parents, so add them | |
| # back onto the stack. | |
| stack.append(prev) | |
| stack.append(curr) | |
| # Add the node and the edge to its parent to the node / edge lists. | |
| nodes.append(curr.node) | |
| if prev.node is not noparent: | |
| edges.append((prev.node, curr.node)) | |
| if is_empty: | |
| # Sanity check | |
| assert len(nodes) == 0 | |
| # Reconstruct the graph | |
| cls = nx.DiGraph if is_directed else nx.Graph | |
| new = cls() | |
| new.add_nodes_from(nodes) | |
| new.add_edges_from(edges) | |
| return new | |