| import numpy as np
|
| import pandas as pd
|
|
|
| import networkx as nx
|
| from community import community_louvain
|
| from scipy.spatial.distance import pdist, squareform
|
| from scipy.cluster.hierarchy import linkage, to_tree
|
| from networkx.algorithms.community import greedy_modularity_communities
|
| from Bio import Phylo
|
| from Bio.Phylo.BaseTree import Tree, Clade
|
| import matplotlib.pyplot as plt
|
| import sys
|
| import gradio as gr
|
| import os
|
| import hashlib
|
| from pathlib import Path
|
| import pandas as pd
|
| from io import StringIO
|
| from usalign_runner import USalignRunner
|
| import pandas as pd
|
| import numpy as np
|
| from rpy2.robjects import pandas2ri, r, Formula
|
| from rpy2.robjects.packages import importr
|
| from rpy2.robjects.vectors import StrVector, FloatVector, IntVector
|
| from rpy2.robjects.conversion import localconverter
|
| import rpy2.robjects as ro
|
| import os
|
|
|
| from r_functions import get_r_matrix,export_matrix_to_newick_r,export_similarity_network_r
|
|
|
| def get_TM_mat_from_df(df):
|
| chain1_unique = df['#PDBchain1'].unique()
|
| chain2_unique = df['PDBchain2'].unique()
|
| unique_chains = sorted(set(df['#PDBchain1'].unique()).union(set(df['PDBchain2'].unique())))
|
| chain_to_idx = {chain: idx for idx, chain in enumerate(unique_chains)}
|
| n = len(unique_chains)
|
| matrix = np.eye(n)
|
| for _, row in df.iterrows():
|
| chain1 = row['#PDBchain1']
|
| chain2 = row['PDBchain2']
|
| if chain1 in chain_to_idx and chain2 in chain_to_idx:
|
| i = chain_to_idx[chain1]
|
| j = chain_to_idx[chain2]
|
| matrix[j, i] = row['TM1']
|
| matrix[i, j] = row['TM2']
|
|
|
| columns_names = [chain.replace("/","").replace(".pdb:A","") for chain in unique_chains]
|
| df = pd.DataFrame(np.array(matrix),
|
| columns=columns_names,
|
| index=columns_names)
|
| return df
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| def scipy_to_biopython(Z, labels):
|
| """将scipy的linkage矩阵转换为Bio.Phylo树"""
|
| tree = to_tree(Z, rd=False)
|
|
|
| def build_clade(node):
|
| if node.is_leaf():
|
| return Clade(branch_length=node.dist, name=labels[node.id])
|
| else:
|
| left = build_clade(node.left)
|
| right = build_clade(node.right)
|
| return Clade(branch_length=node.dist, clades=[left, right])
|
|
|
| root = build_clade(tree)
|
| return Tree(root)
|
|
|
| def write_str_to_file(s:str,file_path:str):
|
| with open(file_path,'w',encoding="utf8") as f:
|
| f.write(s)
|
|
|
|
|
| def build_graph_from_mat_df(TM_score_matrix,threshold = 0.75):
|
|
|
| G = nx.Graph()
|
| G.add_nodes_from(TM_score_matrix.index)
|
| matrix_values = TM_score_matrix.values
|
|
|
| rows, cols = np.where(matrix_values >= threshold)
|
| edges = [(TM_score_matrix.index[i], TM_score_matrix.index[j])
|
| for i, j in zip(rows, cols) if i != j]
|
| G.add_edges_from(edges)
|
| return G
|
|
|
| def fill_community_to_graph(G):
|
| partition = community_louvain.best_partition(G)
|
| nx.set_node_attributes(G, partition, 'cluster')
|
| return partition
|
|
|
|
|
| def get_graph_fig(G,partition):
|
| plt.figure(figsize=(12, 10))
|
| pos = nx.spring_layout(G)
|
| nx.draw_networkx_nodes(G, pos, node_size=50,
|
| cmap=plt.cm.tab20, node_color=list(partition.values()))
|
| nx.draw_networkx_edges(G, pos, alpha=0.3)
|
| plt.title("Structure Similarity Network")
|
| plt.axis('off')
|
| fig = plt.gcf()
|
| return fig
|
|
|
|
|
|
|
| def calculate_md5(files):
|
| """
|
| Calculate MD5 hash for a list of files.
|
| The hash is calculated by combining the content of all files in sorted order.
|
|
|
| Args:
|
| files: List of file objects from Gradio upload
|
|
|
| Returns:
|
| str: MD5 hash of the combined file contents
|
| """
|
| hash_md5 = hashlib.md5()
|
|
|
|
|
| sorted_files = sorted(files, key=lambda x: x.name)
|
|
|
| for file in sorted_files:
|
| with open(file.name, "rb") as f:
|
| for chunk in iter(lambda: f.read(4096), b""):
|
| hash_md5.update(chunk)
|
|
|
| return hash_md5.hexdigest()
|
|
|
| def save_pdb_files(files, data_dir='./data'):
|
| """Save uploaded PDB files to the specified directory."""
|
| if not files:
|
| return "No files uploaded"
|
|
|
|
|
| data_path = Path(data_dir)
|
| data_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
| md5_hash = calculate_md5(files)
|
|
|
| file_dir = os.path.join(data_path , md5_hash )
|
|
|
| try:
|
| os.mkdir(file_dir)
|
| except:
|
| pass
|
| file_dir = os.path.join(data_path , md5_hash , "pdb")
|
| try:
|
| os.mkdir(file_dir)
|
| except:
|
| pass
|
| print(f"Created directory: {file_dir}")
|
|
|
|
|
| list_file = os.path.join(data_path , md5_hash , "pdb_list")
|
|
|
| filenames = []
|
|
|
| results = []
|
| for file in files:
|
|
|
| original_filename = os.path.basename(file.name)
|
| filenames.append(original_filename)
|
|
|
| target_path = os.path.join(file_dir,original_filename )
|
| print(f"Saving to: {target_path}")
|
|
|
|
|
| with open(target_path, "wb") as f:
|
| f.write(open(file.name, "rb").read())
|
| results.append(f"Saved {original_filename}")
|
|
|
|
|
| with open(list_file, "w") as f:
|
| f.write("\n".join(filenames))
|
| results.append(f"Created list file: {list_file}")
|
|
|
| return "\n".join(results)
|
|
|
| def run_usalign(md5_hash):
|
| """Run USalign on the uploaded PDB files and return results as DataFrame."""
|
| try:
|
| runner = USalignRunner()
|
| data_path = Path("./data")
|
| pdb_dir = os.path.join(data_path , md5_hash , "pdb")
|
| list_file = os.path.join(data_path , md5_hash , "pdb_list")
|
| print(str(pdb_dir))
|
| print(str(list_file))
|
| return_code, stdout, stderr = runner.run_alignment(
|
| target_dir=str(pdb_dir),
|
| pdb_list_file=str(list_file)
|
| )
|
| print(stdout)
|
| print(stderr)
|
| if return_code == 0:
|
|
|
| df = pd.read_csv(StringIO(stdout), sep="\t", encoding=sys.getdefaultencoding())
|
|
|
|
|
| df.columns = [col.strip() for col in df.columns]
|
| return df
|
| else:
|
| return pd.DataFrame({"Error": [stderr]})
|
| except Exception as e:
|
| return pd.DataFrame({"Error": [stderr]})
|
|
|
| def run_community_analysis(results_df, data_dir, md5_hash,threshold):
|
| """Run community analysis pipeline and return results."""
|
| try:
|
|
|
| tm_matrix = get_TM_mat_from_df(results_df)
|
|
|
| tm_file = os.path.join("data",md5_hash,"tm_matrix.csv")
|
| newick_file = os.path.join("data",md5_hash,"clustering.newick")
|
|
|
| network_edges_file = os.path.join("data",md5_hash,"network_cytoscape_export.xlsx")
|
| cluster_file = os.path.join("data",md5_hash,"cluster_assignments.csv")
|
|
|
| with localconverter(ro.default_converter + pandas2ri.converter):
|
| r_tm_matrix = ro.conversion.py2rpy(tm_matrix)
|
|
|
| result = export_matrix_to_newick_r(r_tm_matrix, newick_file)
|
| newick_str = result[0]
|
|
|
| export_similarity_network_r(threshold, r_tm_matrix,network_edges_file, cluster_file)
|
|
|
|
|
|
|
|
|
| tm_matrix.to_csv(tm_file)
|
|
|
|
|
|
|
|
|
|
|
|
|
| return {
|
| "tm_matrix": tm_matrix,
|
| "newick_str": newick_str,
|
|
|
| "files":[
|
| tm_file,
|
| newick_file,
|
|
|
| network_edges_file,
|
| cluster_file
|
| ]
|
| }
|
| except Exception as e:
|
| print("Error", str(e))
|
| return {"Error": str(e)}
|
|
|
|
|
|
|
| def get_dataframe_from_network(G,partition):
|
| edges_data = [list(edge) for edge in G.edges()]
|
| edges_df = pd.DataFrame(edges_data, columns=["Source", "Target"])
|
| cluster_membership = {}
|
| for idx, comm in enumerate(partition):
|
| for node in comm:
|
| cluster_membership[node] = f"cluster_{idx+1}"
|
|
|
| singleton_nodes = [n for n in G.nodes if G.degree[n] == 0]
|
| for node in singleton_nodes:
|
| cluster_membership[node] = "singleton"
|
|
|
|
|
| singleton_data = [[node, ""] for node in singleton_nodes]
|
| singleton_df = pd.DataFrame(singleton_data, columns=["Source", "Target"])
|
|
|
|
|
| combined_df = pd.concat([edges_df, singleton_df], ignore_index=True)
|
| return combined_df
|
|
|
|
|
| |