import numpy as np import pandas as pd # import fastcluster import networkx as nx from community import community_louvain from scipy.spatial.distance import pdist, squareform from scipy.cluster.hierarchy import linkage, to_tree from networkx.algorithms.community import greedy_modularity_communities from Bio import Phylo from Bio.Phylo.BaseTree import Tree, Clade import matplotlib.pyplot as plt import sys import gradio as gr import os import hashlib from pathlib import Path import pandas as pd from io import StringIO from usalign_runner import USalignRunner import pandas as pd import numpy as np from rpy2.robjects import pandas2ri, r, Formula from rpy2.robjects.packages import importr from rpy2.robjects.vectors import StrVector, FloatVector, IntVector from rpy2.robjects.conversion import localconverter import rpy2.robjects as ro import os from r_functions import get_r_matrix,export_matrix_to_newick_r,export_similarity_network_r def get_TM_mat_from_df(df): chain1_unique = df['#PDBchain1'].unique() chain2_unique = df['PDBchain2'].unique() unique_chains = sorted(set(df['#PDBchain1'].unique()).union(set(df['PDBchain2'].unique()))) chain_to_idx = {chain: idx for idx, chain in enumerate(unique_chains)} n = len(unique_chains) matrix = np.eye(n) for _, row in df.iterrows(): chain1 = row['#PDBchain1'] chain2 = row['PDBchain2'] if chain1 in chain_to_idx and chain2 in chain_to_idx: i = chain_to_idx[chain1] j = chain_to_idx[chain2] matrix[j, i] = row['TM1'] matrix[i, j] = row['TM2'] columns_names = [chain.replace("/","").replace(".pdb:A","") for chain in unique_chains] df = pd.DataFrame(np.array(matrix), columns=columns_names, index=columns_names) return df # def get_cluster_z_from_df(df): # dist_matrix = pdist(df, metric='euclidean') # Z = fastcluster.linkage(dist_matrix, method='ward') # return Z def scipy_to_biopython(Z, labels): """将scipy的linkage矩阵转换为Bio.Phylo树""" tree = to_tree(Z, rd=False) def build_clade(node): if node.is_leaf(): return Clade(branch_length=node.dist, name=labels[node.id]) else: left = build_clade(node.left) right = build_clade(node.right) return Clade(branch_length=node.dist, clades=[left, right]) root = build_clade(tree) return Tree(root) def write_str_to_file(s:str,file_path:str): with open(file_path,'w',encoding="utf8") as f: f.write(s) def build_graph_from_mat_df(TM_score_matrix,threshold = 0.75): G = nx.Graph() G.add_nodes_from(TM_score_matrix.index) matrix_values = TM_score_matrix.values # np.fill_diagonal(matrix_values, 0) # 排除自环 rows, cols = np.where(matrix_values >= threshold) edges = [(TM_score_matrix.index[i], TM_score_matrix.index[j]) for i, j in zip(rows, cols) if i != j] G.add_edges_from(edges) return G def fill_community_to_graph(G): partition = community_louvain.best_partition(G) nx.set_node_attributes(G, partition, 'cluster') return partition def get_graph_fig(G,partition): plt.figure(figsize=(12, 10)) pos = nx.spring_layout(G) nx.draw_networkx_nodes(G, pos, node_size=50, cmap=plt.cm.tab20, node_color=list(partition.values())) nx.draw_networkx_edges(G, pos, alpha=0.3) plt.title("Structure Similarity Network") plt.axis('off') fig = plt.gcf() return fig def calculate_md5(files): """ Calculate MD5 hash for a list of files. The hash is calculated by combining the content of all files in sorted order. Args: files: List of file objects from Gradio upload Returns: str: MD5 hash of the combined file contents """ hash_md5 = hashlib.md5() # Sort files by name to ensure consistent hash regardless of upload order sorted_files = sorted(files, key=lambda x: x.name) for file in sorted_files: with open(file.name, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def save_pdb_files(files, data_dir='./data'): """Save uploaded PDB files to the specified directory.""" if not files: return "No files uploaded" # Create data directory if it doesn't exist data_path = Path(data_dir) data_path.mkdir(parents=True, exist_ok=True) # Calculate MD5 hash for all files md5_hash = calculate_md5(files) file_dir = os.path.join(data_path , md5_hash ) # file_dir.mkdir(exist_ok=True) try: os.mkdir(file_dir) except: pass file_dir = os.path.join(data_path , md5_hash , "pdb") try: os.mkdir(file_dir) except: pass print(f"Created directory: {file_dir}") # Create list file list_file = os.path.join(data_path , md5_hash , "pdb_list") filenames = [] results = [] for file in files: # Get original filename original_filename = os.path.basename(file.name) filenames.append(original_filename) # Check if file already exists target_path = os.path.join(file_dir,original_filename ) print(f"Saving to: {target_path}") # Save the file with open(target_path, "wb") as f: f.write(open(file.name, "rb").read()) results.append(f"Saved {original_filename}") # Write list file with open(list_file, "w") as f: f.write("\n".join(filenames)) results.append(f"Created list file: {list_file}") return "\n".join(results) def run_usalign(md5_hash): """Run USalign on the uploaded PDB files and return results as DataFrame.""" try: runner = USalignRunner() data_path = Path("./data") pdb_dir = os.path.join(data_path , md5_hash , "pdb") list_file = os.path.join(data_path , md5_hash , "pdb_list") print(str(pdb_dir)) print(str(list_file)) return_code, stdout, stderr = runner.run_alignment( target_dir=str(pdb_dir), pdb_list_file=str(list_file) ) print(stdout) print(stderr) if return_code == 0: # Handle potential encoding issues df = pd.read_csv(StringIO(stdout), sep="\t", encoding=sys.getdefaultencoding()) # Clean up any potential encoding artifacts in column names df.columns = [col.strip() for col in df.columns] return df else: return pd.DataFrame({"Error": [stderr]}) except Exception as e: return pd.DataFrame({"Error": [stderr]}) def run_community_analysis(results_df, data_dir, md5_hash,threshold): """Run community analysis pipeline and return results.""" try: # Generate TM matrix tm_matrix = get_TM_mat_from_df(results_df) tm_file = os.path.join("data",md5_hash,"tm_matrix.csv") newick_file = os.path.join("data",md5_hash,"clustering.newick") # network_file = os.path.join("data",md5_hash,"network.svg") network_edges_file = os.path.join("data",md5_hash,"network_cytoscape_export.xlsx") cluster_file = os.path.join("data",md5_hash,"cluster_assignments.csv") with localconverter(ro.default_converter + pandas2ri.converter): r_tm_matrix = ro.conversion.py2rpy(tm_matrix) result = export_matrix_to_newick_r(r_tm_matrix, newick_file) newick_str = result[0] export_similarity_network_r(threshold, r_tm_matrix,network_edges_file, cluster_file) # cluster_df.to_csv(cluster_file,index=False) # combined_df.to_csv(network_edges_file,index=False) tm_matrix.to_csv(tm_file) # with open(newick_file, "w") as f: # f.write(newick_str) # Phylo.write(tree, newick_file, "newick") # fig.savefig(network_file, format="svg", bbox_inches="tight") # plt.close(fig) return { "tm_matrix": tm_matrix, "newick_str": newick_str, # "network_fig": fig, "files":[ tm_file, newick_file, # network_file, network_edges_file, cluster_file ] } except Exception as e: print("Error", str(e)) return {"Error": str(e)} def get_dataframe_from_network(G,partition): edges_data = [list(edge) for edge in G.edges()] edges_df = pd.DataFrame(edges_data, columns=["Source", "Target"]) cluster_membership = {} for idx, comm in enumerate(partition): for node in comm: cluster_membership[node] = f"cluster_{idx+1}" singleton_nodes = [n for n in G.nodes if G.degree[n] == 0] for node in singleton_nodes: cluster_membership[node] = "singleton" # 创建孤立节点的数据 singleton_data = [[node, ""] for node in singleton_nodes] singleton_df = pd.DataFrame(singleton_data, columns=["Source", "Target"]) # 合并数据 combined_df = pd.concat([edges_df, singleton_df], ignore_index=True) return combined_df # # 导出为 CSV 文件 # combined_df.to_csv("structure_based_similarity_network_cytoscape_export.csv", index=False)