import networkx as nx import numpy as np import pandas as pd def make_network(hdb, df, wt_nan=1e12,min_lambda = -1): """ Constructs a directed graph from HDBSCAN's condensed tree. Parameters: hdb: HDBSCAN object with condensed_tree_ attribute. df (pd.DataFrame): Dataframe containing node attributes. wt_nan (float): Weight assigned when lambda_val is NaN. Returns: nx.DiGraph: A directed graph with nodes and edges from the condensed tree. """ G = nx.DiGraph() # Add all nodes from the condensed tree all_nodes = set(hdb.condensed_tree_._raw_tree['parent']).union(set(hdb.condensed_tree_._raw_tree['child'])) G.add_nodes_from(all_nodes) # Add edges with weights for row in hdb.condensed_tree_._raw_tree: parent, child, lambda_val = int(row['parent']), int(row['child']), row['lambda_val'] weight = lambda_val if np.isfinite(lambda_val) else wt_nan if min_lambda > 0: if lambda_val > min_lambda: G.add_edge(parent, child, weight=weight)#, distance=1 / weight if weight != 0 else wt_nan) else: G.add_edge(parent, child, weight=weight)#, distance=1 / weight if weight != 0 else wt_nan) # Assign attributes to existing nodes for i, (idx, row) in enumerate(df.iterrows()): node_id = int(i) if node_id in G.nodes: G.nodes[node_id].update(row.to_dict()) return G def get_subtree_from_ancestor(graph: nx.DiGraph, accession: str, number_of_predecessors: int=2) -> nx.DiGraph: """ Given a graph and an accession, find the subtree rooted at the 2nd ancestor of the node with that accession. Parameters: - graph: A directed NetworkX graph (assumed to be a tree) - accession: The accession string to look for in node attributes Returns: - A subgraph rooted at the 2nd ancestor of the node with the given accession """ # Step 1: Find node ID by accession target_node = None for node_id, attrs in graph.nodes(data=True): if attrs.get("accession") == accession: target_node = node_id break if target_node is None: raise ValueError(f"Accession '{accession}' not found in graph.") # Step 2: Go to the 2nd ancestor current = target_node for i in range(2): predecessors = list(graph.predecessors(current)) if not predecessors: #raise ValueError(f"Node '{current}' does not have enough ancestors.") break current = predecessors[0] # Assumes single parent (tree) ancestor_node = current # Step 3: Get all descendants of this ancestor descendants = nx.descendants(graph, ancestor_node) descendants.add(ancestor_node) # Include the ancestor itself # Step 4: Return the induced subgraph return graph.subgraph(descendants).copy() def graph_to_newick(graph: nx.DiGraph) -> str: """ Converts a directed tree (DiGraph) into Newick format using the 'accession' attribute as the node label, and writes it to a file. Parameters: - graph: A directed NetworkX tree - output_file: Path to the output Newick file """ # Step 1: Identify the root node (no incoming edges) roots = [n for n in graph.nodes if graph.in_degree(n) == 0] if len(roots) != 1: raise ValueError(f"Graph must have exactly one root, found {len(roots)}.") root = roots[0] # Step 2: Recursive function to convert to Newick def to_newick(node): children = list(graph.successors(node)) label = graph.nodes[node].get("accession", str(node)) if children: return f"({','.join(to_newick(child) for child in children)}){label}" else: return label # Step 3: Convert and write to file newick_str = to_newick(root) + ";" return newick_str def convert_similarity_to_distance(graph: nx.Graph, similarity_attr: str = "weight") -> nx.Graph: """ Returns a copy of the graph where edge weights are inverted: distance = 1 / similarity. Assumes similarity > 0 for all edges. """ g = graph.copy() for u, v, data in g.edges(data=True): sim = data.get(similarity_attr, 1.0) data["distance"] = 1.0 / sim if sim > 0 else float("inf") return g def extract_weighted_subtree_around_accession( graph: nx.DiGraph, accession: str, n: int, weight: str = "weight" ) -> tuple[nx.DiGraph, pd.DataFrame]: """ Extract a subtree of the n closest nodes around a given node identified by 'accession', and return a DataFrame of nearby accessions and distances. Parameters: - graph: NetworkX DiGraph - accession: Accession string (center node) - n: Number of closest nodes to include (including source) - weight: Edge attribute to use for distance (None for unweighted) Returns: - subgraph: Directed subgraph (nx.DiGraph) - df: pandas DataFrame with columns ['accession', 'distance'], excluding source """ accession_column_name = "Accession" # Accession → node accession_to_node = { data[accession_column_name]: node for node, data in graph.nodes(data=True) if accession_column_name in data } node_to_accession = {v: k for k, v in accession_to_node.items()} source_node = accession_to_node.get(accession) if source_node is None: raise ValueError(f"Accession '{accession}' not found in graph.") # Undirected for symmetric path search undirected = graph.to_undirected() # Compute distances if weight is None: path_lengths = nx.single_source_shortest_path_length(undirected, source_node) else: path_lengths = nx.single_source_dijkstra_path_length(undirected, source_node, weight=weight) # Sort by distance, select top N sorted_nodes = sorted(path_lengths.items(), key=lambda x: x[1]) selected_nodes = [node for node, _ in sorted_nodes[:n]] # Subgraph from original directed graph subgraph = graph.subgraph(selected_nodes).copy() # Create DataFrame of distances (excluding self) records = [ {"accession": node_to_accession[node], "distance": dist} for node, dist in sorted_nodes[1:n] # Skip source node if node in node_to_accession ] df = pd.DataFrame(records) return subgraph, df