|
|
import networkx as nx |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
def make_network(hdb, df, wt_nan=1e12,min_lambda = -1): |
|
|
""" |
|
|
Constructs a directed graph from HDBSCAN's condensed tree. |
|
|
|
|
|
Parameters: |
|
|
hdb: HDBSCAN object with condensed_tree_ attribute. |
|
|
df (pd.DataFrame): Dataframe containing node attributes. |
|
|
wt_nan (float): Weight assigned when lambda_val is NaN. |
|
|
|
|
|
Returns: |
|
|
nx.DiGraph: A directed graph with nodes and edges from the condensed tree. |
|
|
""" |
|
|
G = nx.DiGraph() |
|
|
|
|
|
|
|
|
all_nodes = set(hdb.condensed_tree_._raw_tree['parent']).union(set(hdb.condensed_tree_._raw_tree['child'])) |
|
|
G.add_nodes_from(all_nodes) |
|
|
|
|
|
|
|
|
for row in hdb.condensed_tree_._raw_tree: |
|
|
parent, child, lambda_val = int(row['parent']), int(row['child']), row['lambda_val'] |
|
|
weight = lambda_val if np.isfinite(lambda_val) else wt_nan |
|
|
if min_lambda > 0: |
|
|
if lambda_val > min_lambda: |
|
|
G.add_edge(parent, child, weight=weight) |
|
|
else: |
|
|
G.add_edge(parent, child, weight=weight) |
|
|
|
|
|
|
|
|
for i, (idx, row) in enumerate(df.iterrows()): |
|
|
node_id = int(i) |
|
|
if node_id in G.nodes: |
|
|
G.nodes[node_id].update(row.to_dict()) |
|
|
|
|
|
return G |
|
|
|
|
|
|
|
|
|
|
|
def get_subtree_from_ancestor(graph: nx.DiGraph, accession: str, number_of_predecessors: int=2) -> nx.DiGraph: |
|
|
""" |
|
|
Given a graph and an accession, find the subtree rooted at the 2nd ancestor of the node with that accession. |
|
|
|
|
|
Parameters: |
|
|
- graph: A directed NetworkX graph (assumed to be a tree) |
|
|
- accession: The accession string to look for in node attributes |
|
|
|
|
|
Returns: |
|
|
- A subgraph rooted at the 2nd ancestor of the node with the given accession |
|
|
""" |
|
|
|
|
|
target_node = None |
|
|
for node_id, attrs in graph.nodes(data=True): |
|
|
if attrs.get("accession") == accession: |
|
|
target_node = node_id |
|
|
break |
|
|
if target_node is None: |
|
|
raise ValueError(f"Accession '{accession}' not found in graph.") |
|
|
|
|
|
|
|
|
current = target_node |
|
|
for i in range(2): |
|
|
predecessors = list(graph.predecessors(current)) |
|
|
if not predecessors: |
|
|
|
|
|
break |
|
|
current = predecessors[0] |
|
|
|
|
|
ancestor_node = current |
|
|
|
|
|
|
|
|
descendants = nx.descendants(graph, ancestor_node) |
|
|
descendants.add(ancestor_node) |
|
|
|
|
|
|
|
|
return graph.subgraph(descendants).copy() |
|
|
|
|
|
|
|
|
def graph_to_newick(graph: nx.DiGraph) -> str: |
|
|
""" |
|
|
Converts a directed tree (DiGraph) into Newick format using the 'accession' |
|
|
attribute as the node label, and writes it to a file. |
|
|
|
|
|
Parameters: |
|
|
- graph: A directed NetworkX tree |
|
|
- output_file: Path to the output Newick file |
|
|
""" |
|
|
|
|
|
roots = [n for n in graph.nodes if graph.in_degree(n) == 0] |
|
|
if len(roots) != 1: |
|
|
raise ValueError(f"Graph must have exactly one root, found {len(roots)}.") |
|
|
root = roots[0] |
|
|
|
|
|
|
|
|
def to_newick(node): |
|
|
children = list(graph.successors(node)) |
|
|
label = graph.nodes[node].get("accession", str(node)) |
|
|
if children: |
|
|
return f"({','.join(to_newick(child) for child in children)}){label}" |
|
|
else: |
|
|
return label |
|
|
|
|
|
|
|
|
newick_str = to_newick(root) + ";" |
|
|
return newick_str |
|
|
|
|
|
|
|
|
|
|
|
def convert_similarity_to_distance(graph: nx.Graph, similarity_attr: str = "weight") -> nx.Graph: |
|
|
""" |
|
|
Returns a copy of the graph where edge weights are inverted: distance = 1 / similarity. |
|
|
Assumes similarity > 0 for all edges. |
|
|
""" |
|
|
g = graph.copy() |
|
|
for u, v, data in g.edges(data=True): |
|
|
sim = data.get(similarity_attr, 1.0) |
|
|
data["distance"] = 1.0 / sim if sim > 0 else float("inf") |
|
|
return g |
|
|
|
|
|
def extract_weighted_subtree_around_accession( |
|
|
graph: nx.DiGraph, |
|
|
accession: str, |
|
|
n: int, |
|
|
weight: str = "weight" |
|
|
) -> tuple[nx.DiGraph, pd.DataFrame]: |
|
|
""" |
|
|
Extract a subtree of the n closest nodes around a given node identified by 'accession', |
|
|
and return a DataFrame of nearby accessions and distances. |
|
|
|
|
|
Parameters: |
|
|
- graph: NetworkX DiGraph |
|
|
- accession: Accession string (center node) |
|
|
- n: Number of closest nodes to include (including source) |
|
|
- weight: Edge attribute to use for distance (None for unweighted) |
|
|
|
|
|
Returns: |
|
|
- subgraph: Directed subgraph (nx.DiGraph) |
|
|
- df: pandas DataFrame with columns ['accession', 'distance'], excluding source |
|
|
""" |
|
|
accession_column_name = "Accession" |
|
|
|
|
|
accession_to_node = { |
|
|
data[accession_column_name]: node for node, data in graph.nodes(data=True) if accession_column_name in data |
|
|
} |
|
|
node_to_accession = {v: k for k, v in accession_to_node.items()} |
|
|
|
|
|
source_node = accession_to_node.get(accession) |
|
|
if source_node is None: |
|
|
raise ValueError(f"Accession '{accession}' not found in graph.") |
|
|
|
|
|
|
|
|
undirected = graph.to_undirected() |
|
|
|
|
|
|
|
|
if weight is None: |
|
|
path_lengths = nx.single_source_shortest_path_length(undirected, source_node) |
|
|
else: |
|
|
path_lengths = nx.single_source_dijkstra_path_length(undirected, source_node, weight=weight) |
|
|
|
|
|
|
|
|
sorted_nodes = sorted(path_lengths.items(), key=lambda x: x[1]) |
|
|
selected_nodes = [node for node, _ in sorted_nodes[:n]] |
|
|
|
|
|
|
|
|
subgraph = graph.subgraph(selected_nodes).copy() |
|
|
|
|
|
|
|
|
records = [ |
|
|
{"accession": node_to_accession[node], "distance": dist} |
|
|
for node, dist in sorted_nodes[1:n] |
|
|
if node in node_to_accession |
|
|
] |
|
|
df = pd.DataFrame(records) |
|
|
|
|
|
return subgraph, df |
|
|
|