hievi / network_utils.py
pswap's picture
accession_column_name
83322d0 verified
import networkx as nx
import numpy as np
import pandas as pd
def make_network(hdb, df, wt_nan=1e12,min_lambda = -1):
"""
Constructs a directed graph from HDBSCAN's condensed tree.
Parameters:
hdb: HDBSCAN object with condensed_tree_ attribute.
df (pd.DataFrame): Dataframe containing node attributes.
wt_nan (float): Weight assigned when lambda_val is NaN.
Returns:
nx.DiGraph: A directed graph with nodes and edges from the condensed tree.
"""
G = nx.DiGraph()
# Add all nodes from the condensed tree
all_nodes = set(hdb.condensed_tree_._raw_tree['parent']).union(set(hdb.condensed_tree_._raw_tree['child']))
G.add_nodes_from(all_nodes)
# Add edges with weights
for row in hdb.condensed_tree_._raw_tree:
parent, child, lambda_val = int(row['parent']), int(row['child']), row['lambda_val']
weight = lambda_val if np.isfinite(lambda_val) else wt_nan
if min_lambda > 0:
if lambda_val > min_lambda:
G.add_edge(parent, child, weight=weight)#, distance=1 / weight if weight != 0 else wt_nan)
else:
G.add_edge(parent, child, weight=weight)#, distance=1 / weight if weight != 0 else wt_nan)
# Assign attributes to existing nodes
for i, (idx, row) in enumerate(df.iterrows()):
node_id = int(i)
if node_id in G.nodes:
G.nodes[node_id].update(row.to_dict())
return G
def get_subtree_from_ancestor(graph: nx.DiGraph, accession: str, number_of_predecessors: int=2) -> nx.DiGraph:
"""
Given a graph and an accession, find the subtree rooted at the 2nd ancestor of the node with that accession.
Parameters:
- graph: A directed NetworkX graph (assumed to be a tree)
- accession: The accession string to look for in node attributes
Returns:
- A subgraph rooted at the 2nd ancestor of the node with the given accession
"""
# Step 1: Find node ID by accession
target_node = None
for node_id, attrs in graph.nodes(data=True):
if attrs.get("accession") == accession:
target_node = node_id
break
if target_node is None:
raise ValueError(f"Accession '{accession}' not found in graph.")
# Step 2: Go to the 2nd ancestor
current = target_node
for i in range(2):
predecessors = list(graph.predecessors(current))
if not predecessors:
#raise ValueError(f"Node '{current}' does not have enough ancestors.")
break
current = predecessors[0] # Assumes single parent (tree)
ancestor_node = current
# Step 3: Get all descendants of this ancestor
descendants = nx.descendants(graph, ancestor_node)
descendants.add(ancestor_node) # Include the ancestor itself
# Step 4: Return the induced subgraph
return graph.subgraph(descendants).copy()
def graph_to_newick(graph: nx.DiGraph) -> str:
"""
Converts a directed tree (DiGraph) into Newick format using the 'accession'
attribute as the node label, and writes it to a file.
Parameters:
- graph: A directed NetworkX tree
- output_file: Path to the output Newick file
"""
# Step 1: Identify the root node (no incoming edges)
roots = [n for n in graph.nodes if graph.in_degree(n) == 0]
if len(roots) != 1:
raise ValueError(f"Graph must have exactly one root, found {len(roots)}.")
root = roots[0]
# Step 2: Recursive function to convert to Newick
def to_newick(node):
children = list(graph.successors(node))
label = graph.nodes[node].get("accession", str(node))
if children:
return f"({','.join(to_newick(child) for child in children)}){label}"
else:
return label
# Step 3: Convert and write to file
newick_str = to_newick(root) + ";"
return newick_str
def convert_similarity_to_distance(graph: nx.Graph, similarity_attr: str = "weight") -> nx.Graph:
"""
Returns a copy of the graph where edge weights are inverted: distance = 1 / similarity.
Assumes similarity > 0 for all edges.
"""
g = graph.copy()
for u, v, data in g.edges(data=True):
sim = data.get(similarity_attr, 1.0)
data["distance"] = 1.0 / sim if sim > 0 else float("inf")
return g
def extract_weighted_subtree_around_accession(
graph: nx.DiGraph,
accession: str,
n: int,
weight: str = "weight"
) -> tuple[nx.DiGraph, pd.DataFrame]:
"""
Extract a subtree of the n closest nodes around a given node identified by 'accession',
and return a DataFrame of nearby accessions and distances.
Parameters:
- graph: NetworkX DiGraph
- accession: Accession string (center node)
- n: Number of closest nodes to include (including source)
- weight: Edge attribute to use for distance (None for unweighted)
Returns:
- subgraph: Directed subgraph (nx.DiGraph)
- df: pandas DataFrame with columns ['accession', 'distance'], excluding source
"""
accession_column_name = "Accession"
# Accession → node
accession_to_node = {
data[accession_column_name]: node for node, data in graph.nodes(data=True) if accession_column_name in data
}
node_to_accession = {v: k for k, v in accession_to_node.items()}
source_node = accession_to_node.get(accession)
if source_node is None:
raise ValueError(f"Accession '{accession}' not found in graph.")
# Undirected for symmetric path search
undirected = graph.to_undirected()
# Compute distances
if weight is None:
path_lengths = nx.single_source_shortest_path_length(undirected, source_node)
else:
path_lengths = nx.single_source_dijkstra_path_length(undirected, source_node, weight=weight)
# Sort by distance, select top N
sorted_nodes = sorted(path_lengths.items(), key=lambda x: x[1])
selected_nodes = [node for node, _ in sorted_nodes[:n]]
# Subgraph from original directed graph
subgraph = graph.subgraph(selected_nodes).copy()
# Create DataFrame of distances (excluding self)
records = [
{"accession": node_to_accession[node], "distance": dist}
for node, dist in sorted_nodes[1:n] # Skip source node
if node in node_to_accession
]
df = pd.DataFrame(records)
return subgraph, df