File size: 6,374 Bytes
e2ab8a5 83322d0 e2ab8a5 83322d0 e2ab8a5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 | import networkx as nx
import numpy as np
import pandas as pd
def make_network(hdb, df, wt_nan=1e12,min_lambda = -1):
"""
Constructs a directed graph from HDBSCAN's condensed tree.
Parameters:
hdb: HDBSCAN object with condensed_tree_ attribute.
df (pd.DataFrame): Dataframe containing node attributes.
wt_nan (float): Weight assigned when lambda_val is NaN.
Returns:
nx.DiGraph: A directed graph with nodes and edges from the condensed tree.
"""
G = nx.DiGraph()
# Add all nodes from the condensed tree
all_nodes = set(hdb.condensed_tree_._raw_tree['parent']).union(set(hdb.condensed_tree_._raw_tree['child']))
G.add_nodes_from(all_nodes)
# Add edges with weights
for row in hdb.condensed_tree_._raw_tree:
parent, child, lambda_val = int(row['parent']), int(row['child']), row['lambda_val']
weight = lambda_val if np.isfinite(lambda_val) else wt_nan
if min_lambda > 0:
if lambda_val > min_lambda:
G.add_edge(parent, child, weight=weight)#, distance=1 / weight if weight != 0 else wt_nan)
else:
G.add_edge(parent, child, weight=weight)#, distance=1 / weight if weight != 0 else wt_nan)
# Assign attributes to existing nodes
for i, (idx, row) in enumerate(df.iterrows()):
node_id = int(i)
if node_id in G.nodes:
G.nodes[node_id].update(row.to_dict())
return G
def get_subtree_from_ancestor(graph: nx.DiGraph, accession: str, number_of_predecessors: int=2) -> nx.DiGraph:
"""
Given a graph and an accession, find the subtree rooted at the 2nd ancestor of the node with that accession.
Parameters:
- graph: A directed NetworkX graph (assumed to be a tree)
- accession: The accession string to look for in node attributes
Returns:
- A subgraph rooted at the 2nd ancestor of the node with the given accession
"""
# Step 1: Find node ID by accession
target_node = None
for node_id, attrs in graph.nodes(data=True):
if attrs.get("accession") == accession:
target_node = node_id
break
if target_node is None:
raise ValueError(f"Accession '{accession}' not found in graph.")
# Step 2: Go to the 2nd ancestor
current = target_node
for i in range(2):
predecessors = list(graph.predecessors(current))
if not predecessors:
#raise ValueError(f"Node '{current}' does not have enough ancestors.")
break
current = predecessors[0] # Assumes single parent (tree)
ancestor_node = current
# Step 3: Get all descendants of this ancestor
descendants = nx.descendants(graph, ancestor_node)
descendants.add(ancestor_node) # Include the ancestor itself
# Step 4: Return the induced subgraph
return graph.subgraph(descendants).copy()
def graph_to_newick(graph: nx.DiGraph) -> str:
"""
Converts a directed tree (DiGraph) into Newick format using the 'accession'
attribute as the node label, and writes it to a file.
Parameters:
- graph: A directed NetworkX tree
- output_file: Path to the output Newick file
"""
# Step 1: Identify the root node (no incoming edges)
roots = [n for n in graph.nodes if graph.in_degree(n) == 0]
if len(roots) != 1:
raise ValueError(f"Graph must have exactly one root, found {len(roots)}.")
root = roots[0]
# Step 2: Recursive function to convert to Newick
def to_newick(node):
children = list(graph.successors(node))
label = graph.nodes[node].get("accession", str(node))
if children:
return f"({','.join(to_newick(child) for child in children)}){label}"
else:
return label
# Step 3: Convert and write to file
newick_str = to_newick(root) + ";"
return newick_str
def convert_similarity_to_distance(graph: nx.Graph, similarity_attr: str = "weight") -> nx.Graph:
"""
Returns a copy of the graph where edge weights are inverted: distance = 1 / similarity.
Assumes similarity > 0 for all edges.
"""
g = graph.copy()
for u, v, data in g.edges(data=True):
sim = data.get(similarity_attr, 1.0)
data["distance"] = 1.0 / sim if sim > 0 else float("inf")
return g
def extract_weighted_subtree_around_accession(
graph: nx.DiGraph,
accession: str,
n: int,
weight: str = "weight"
) -> tuple[nx.DiGraph, pd.DataFrame]:
"""
Extract a subtree of the n closest nodes around a given node identified by 'accession',
and return a DataFrame of nearby accessions and distances.
Parameters:
- graph: NetworkX DiGraph
- accession: Accession string (center node)
- n: Number of closest nodes to include (including source)
- weight: Edge attribute to use for distance (None for unweighted)
Returns:
- subgraph: Directed subgraph (nx.DiGraph)
- df: pandas DataFrame with columns ['accession', 'distance'], excluding source
"""
accession_column_name = "Accession"
# Accession → node
accession_to_node = {
data[accession_column_name]: node for node, data in graph.nodes(data=True) if accession_column_name in data
}
node_to_accession = {v: k for k, v in accession_to_node.items()}
source_node = accession_to_node.get(accession)
if source_node is None:
raise ValueError(f"Accession '{accession}' not found in graph.")
# Undirected for symmetric path search
undirected = graph.to_undirected()
# Compute distances
if weight is None:
path_lengths = nx.single_source_shortest_path_length(undirected, source_node)
else:
path_lengths = nx.single_source_dijkstra_path_length(undirected, source_node, weight=weight)
# Sort by distance, select top N
sorted_nodes = sorted(path_lengths.items(), key=lambda x: x[1])
selected_nodes = [node for node, _ in sorted_nodes[:n]]
# Subgraph from original directed graph
subgraph = graph.subgraph(selected_nodes).copy()
# Create DataFrame of distances (excluding self)
records = [
{"accession": node_to_accession[node], "distance": dist}
for node, dist in sorted_nodes[1:n] # Skip source node
if node in node_to_accession
]
df = pd.DataFrame(records)
return subgraph, df
|