| | |
| | import os |
| | import re |
| | from pprint import pprint |
| | from io import StringIO |
| | from typing import Literal, Optional |
| | import tkinter as tk |
| | from tkinter import filedialog, messagebox, ttk |
| | from urllib.error import HTTPError as URLLibHTTPError |
| |
|
| |
|
| | import pandas as pd |
| | import numpy as np |
| |
|
| | from sklearn.ensemble import RandomForestClassifier |
| | from sklearn import svm |
| | from sklearn.model_selection import RandomizedSearchCV |
| | from sklearn.metrics import ( |
| | classification_report, |
| | accuracy_score, |
| | f1_score, |
| | recall_score, |
| | precision_score, |
| | confusion_matrix, |
| | ) |
| | from sklearn.decomposition import PCA |
| | from sklearn.preprocessing import StandardScaler, LabelEncoder |
| | from sklearn.pipeline import Pipeline |
| | from sklearn.manifold import TSNE |
| | import umap |
| | from sklearn.model_selection import train_test_split |
| | from sklearn.base import BaseEstimator |
| |
|
| | import requests |
| | from requests.exceptions import HTTPError as RequestsHTTPError |
| | from Bio import Entrez |
| | from Bio import SeqIO |
| | from tqdm import tqdm |
| |
|
| | |
| | import seaborn as sns |
| | import matplotlib.pyplot as plt |
| |
|
| | from esm.models.esmc import ESMC |
| | from esm.sdk.api import ESMProtein, LogitsConfig, ESMProteinError, LogitsOutput |
| | from transformers import T5Tokenizer, T5EncoderModel, PreTrainedModel |
| |
|
| | from joblib import load |
| |
|
| | import torch |
| |
|
| | def load_emb(path: str, acc: list[str]) -> np.ndarray: |
| | """ |
| | Loads and processes embedding files from a specified directory for a list of accession identifiers. |
| | Each embedding is expected to be stored as a .npy file named after its accession in the given path. |
| | - If the embedding has 3 dimensions, it is squeezed along the first axis and then averaged along the next axis. |
| | - If the embedding has 2 dimensions, it is averaged along the first axis. |
| | - Otherwise, the embedding is used as is. |
| | Args: |
| | path (str): Directory path where the embedding .npy files are stored. |
| | acc (list[str]): List of accession identifiers corresponding to the embedding files. |
| | Returns: |
| | np.ndarray: A 2D array where each row corresponds to the processed embedding of an accession. |
| | Raises: |
| | FileNotFoundError: If the specified path does not exist. |
| | """ |
| |
|
| | if not os.path.exists(path): |
| | raise FileNotFoundError(f"The specified path does not exist: {path}") |
| | |
| | total_files = len([f for f in os.listdir(path) if f.endswith('.npy')]) |
| | |
| | x = [] |
| | |
| | for a in tqdm(acc, desc = 'Cargando embeddings', total=total_files): |
| | |
| | emb : np.ndarray = np.load(os.path.join(path, f"{a}.npy")) |
| | |
| | if len(emb.shape) == 3: |
| | emb = emb.squeeze(axis = 0) |
| | emb = emb.mean(axis = 0) |
| | x.append(emb) |
| |
|
| | elif len(emb.shape) == 2: |
| | emb = emb.mean(axis = 0) |
| | x.append(emb) |
| |
|
| | else: |
| | x.append(emb) |
| | |
| | return np.vstack(x) |
| |
|
| | def confusion(title : str, y_true: np.ndarray, y_pred: np.ndarray) -> None: |
| |
|
| | """ |
| | Plot a confusion matrix for the given true and predicted labels. |
| | Args: |
| | title (str): Title for the confusion matrix plot. |
| | y_true (np.ndarray): True labels. |
| | y_pred (np.ndarray): Predicted labels. |
| | """ |
| |
|
| | cm = confusion_matrix(y_true = y_true, |
| | y_pred = y_pred, |
| | normalize = 'pred') |
| | |
| | class_names = list(np.unique(y_true)) |
| | plt.figure(figsize=(10, 10)) |
| | sns.heatmap(cm, annot=True, fmt='.2f', cmap='Greys', |
| | xticklabels=class_names, yticklabels=class_names) |
| |
|
| | plt.xlabel('Predicted Label') |
| | plt.ylabel('True Label') |
| | plt.title(f'Confusion Matrix - {title}') |
| | plt.tight_layout() |
| | plt.show() |
| |
|
| | def plot_umap(x: np.ndarray, y: np.ndarray, title: str) -> None: |
| | """ |
| | Plots a 2D UMAP projection of high-dimensional data with class labels. |
| | Parameters: |
| | x (np.ndarray): The input feature matrix of shape (n_samples, n_features). |
| | y (np.ndarray): The array of labels corresponding to each sample. |
| | title (str): The title for the plot. |
| | Returns: |
| | None: Displays a scatter plot of the UMAP embedding colored by label. |
| | """ |
| |
|
| | reducer = umap.UMAP(n_neighbors=30, random_state=42) |
| |
|
| | scaled_x = StandardScaler().fit_transform(x) |
| | embedding = reducer.fit_transform(scaled_x) |
| | embedding = np.array(embedding) |
| |
|
| | df_plot = pd.DataFrame({ |
| | 'UMAP1': embedding[:, 0], |
| | 'UMAP2': embedding[:, 1], |
| | 'Label' : y |
| | }) |
| | |
| | plt.figure(figsize=(14, 6)) |
| | fig = sns.scatterplot(data=df_plot, x='UMAP1', y='UMAP2', hue='Label', alpha=0.7) |
| | fig.set_title(title) |
| | fig.set_xlabel('UMAP Component 1') |
| | fig.set_ylabel('UMAP Component 2') |
| | plt.legend(title='Labels', bbox_to_anchor=(1.05, 1), loc='upper left') |
| | plt.tight_layout() |
| | plt.show() |
| |
|
| |
|
| |
|
| | def plot_pca(x : np.ndarray, labels: np.ndarray, title: str) -> None: |
| | """ |
| | Plots the first two principal components of the given data using PCA. |
| | |
| | Parameters: |
| | X (np.ndarray): Input data array of shape (n_samples, n_features). |
| | labels (list[str]): List of class or group labels for each sample. |
| | title (str): Title for the plot. |
| | org (list[str]): List of organism or sample identifiers for hover information. |
| | |
| | Returns: |
| | None: Displays an interactive scatter plot of the first two principal components. |
| | """ |
| | pca = PCA(n_components=2, random_state=42) |
| |
|
| |
|
| | pipe = Pipeline([('scaler', StandardScaler()), ('pca', pca)]) |
| | scaled_x = pipe.fit_transform(x) |
| | explained = pipe.named_steps['pca'].explained_variance_ratio_ |
| |
|
| |
|
| | df_plot = pd.DataFrame({ |
| | 'PC1': scaled_x[:, 0], |
| | 'PC2': scaled_x[:, 1], |
| | 'Label': labels |
| | }) |
| | |
| | plt.figure(figsize=(14, 6)) |
| | fig = sns.scatterplot(data=df_plot, x='PC1', y='PC2', hue='Label', alpha=0.7) |
| | fig.set_title(f'{title} - Explained Variance: {explained[0]:.2f}, {explained[1]:.2f}') |
| | fig.set_xlabel('First Principal Component') |
| | fig.set_ylabel('Second Principal Component') |
| | plt.legend(title='Labels', bbox_to_anchor=(1.05, 1), loc='upper left') |
| | plt.tight_layout() |
| | plt.show() |
| |
|
| |
|
| | def tsne_plot(x: np.ndarray, labels: np.ndarray, title: str) -> None: |
| | """ |
| | Plots a 2D t-SNE projection of high-dimensional data with color-coded labels. |
| | |
| | Args: |
| | x (list[np.ndarray]): List of feature arrays to be concatenated and visualized. |
| | labels (list[str]): List of labels corresponding to each sample in x. |
| | title (str): Title for the plot. |
| | """ |
| | x_scaled = StandardScaler().fit_transform(x) |
| | tsne = TSNE(n_components=2, perplexity=60, random_state=42) |
| | tsne_fit = tsne.fit_transform(x_scaled) |
| |
|
| | df_plot = pd.DataFrame({ |
| | 't-SNE1': tsne_fit[:, 0], |
| | 't-SNE2': tsne_fit[:, 1], |
| | 'Label': labels |
| | }) |
| |
|
| | |
| | plt.figure(figsize=(14, 6)) |
| | fig = sns.scatterplot(data=df_plot, x='t-SNE1', y='t-SNE2', hue='Label', alpha=0.7) |
| | fig.set_title(title) |
| | fig.set_xlabel('First t-SNE Component') |
| | fig.set_ylabel('Second t-SNE Component') |
| | plt.legend(title='Labels', bbox_to_anchor=(1.05, 1), loc='upper left') |
| | plt.tight_layout() |
| | plt.show() |
| |
|
| | def plot_emb(x: np.ndarray, labels : np.ndarray, model_name: str): |
| |
|
| | """ Plot embeddings using PCA, t-SNE, and UMAP. |
| | Args: |
| | X (list[np.ndarray]): List of feature arrays to be concatenated and visualized. |
| | y (list[str]): List of labels corresponding to each sample in X, used for coloring the scatter plot. |
| | model_name (str): Name of the model used for generating embeddings. |
| | org (list[str]): List of organism or group identifiers for each sample, shown in hover data. |
| | """ |
| | |
| | print(f"Plotting embeddings for: {model_name}") |
| | plot_pca(x, labels, title=f'PCA - {model_name}') |
| | tsne_plot(x, labels, title=f't-SNE - {model_name}') |
| | plot_umap(x, labels, title=f'UMAP - {model_name}') |
| |
|
| | def evaluate(model: BaseEstimator, |
| | x_test: np.ndarray, |
| | y_test: np.ndarray) -> dict: |
| | |
| | """ |
| | Evaluates a classification model on test data and computes performance metrics. |
| | Parameters: |
| | model: A trained classification model with a `predict` method. |
| | X_test: Features of the test dataset. |
| | y_test: True labels for the test dataset. |
| | Returns: |
| | dict: A dictionary containing the following evaluation metrics: |
| | - 'Accuracy': Overall accuracy of the model. |
| | - 'Recall': Weighted recall score. |
| | - 'Precision': Weighted precision score. |
| | - 'F1': Weighted F1 score. |
| | Side Effects: |
| | Prints the evaluation metrics using pprint. |
| | """ |
| | |
| | result = {} |
| | y_pred = model.predict(x_test) |
| | |
| | result['Accuracy'] = accuracy_score(y_test, y_pred) |
| | result['Recall'] = recall_score(y_test, y_pred, average = 'weighted') |
| | result['Precision'] = precision_score(y_test, y_pred, average='weighted') |
| | result['F1'] = f1_score(y_test, y_pred, average='weighted') |
| | |
| | pprint(result) |
| | return result |
| | |
| |
|
| |
|
| | def train_rf(title: str, |
| | x: np.ndarray, |
| | y : np.ndarray, |
| | params: dict) -> tuple[RandomForestClassifier, dict, LabelEncoder]: |
| |
|
| | """ |
| | Trains a RandomForestClassifier on the provided data, evaluates its performance, and displays results. |
| | Args: |
| | title (str): Title for the confusion matrix plot. |
| | x (np.ndarray): Feature matrix for training and testing. |
| | y (np.ndarray): Target labels corresponding to the feature matrix. |
| | params (dict): Parameters to initialize the RandomForestClassifier. |
| | Returns: |
| | tuple[RandomForestClassifier, dict, LabelEncoder]: |
| | - Trained RandomForestClassifier instance, |
| | - Evaluation metrics as a dictionary, |
| | - Fitted LabelEncoder for label transformations. |
| | Side Effects: |
| | - Prints a classification report to stdout. |
| | - Displays a confusion matrix plot. |
| | """ |
| |
|
| | le = LabelEncoder() |
| | y_encoded = le.fit_transform(y) |
| |
|
| | x_train, x_test, y_train, y_test = train_test_split(x, y_encoded, test_size=0.33, stratify=y_encoded, random_state=42) |
| |
|
| | |
| | classifier: RandomForestClassifier = RandomForestClassifier(**params) |
| | |
| | |
| | classifier.fit(x_train, y_train) |
| | |
| | |
| | y_pred = classifier.predict(x_test) |
| |
|
| | evaluation = evaluate(classifier, x_test, y_test) |
| | |
| | print(classification_report(y_test, |
| | y_pred, |
| | zero_division=0, |
| | target_names = le.classes_)) |
| | |
| | y_pred_str = le.inverse_transform(y_pred) |
| | y_test_str = le.inverse_transform(y_test) |
| |
|
| | confusion(title=title, y_true=y_test_str, y_pred=y_pred_str) |
| | |
| | return classifier, evaluation, le |
| |
|
| | def train_svm(title: str, x: np.ndarray, y: np.ndarray, params: dict) -> tuple[Pipeline, dict, LabelEncoder]: |
| | """ |
| | Trains an SVM classifier using the provided data and parameters, evaluates its performance, and returns the trained pipeline, evaluation metrics, and label encoder. |
| | |
| | Args: |
| | title (str): Title for the confusion matrix plot. |
| | x (np.ndarray): Feature matrix for training and testing. |
| | y (np.ndarray): Target labels corresponding to the feature matrix. |
| | params (dict): Dictionary of parameters for the SVM classifier. SVM-specific parameters should be prefixed with 'svm__'. |
| | |
| | Returns: |
| | tuple[Pipeline, dict, LabelEncoder]: |
| | - Trained scikit-learn Pipeline object containing the scaler and SVM. |
| | - Dictionary with evaluation metrics from the `evaluate` function. |
| | - Fitted LabelEncoder instance for encoding and decoding labels. |
| | |
| | Side Effects: |
| | - Displays a confusion matrix plot using the provided title. |
| | - Prints a classification report to the standard output. |
| | """ |
| |
|
| | le = LabelEncoder() |
| | y_encoded = le.fit_transform(y) |
| |
|
| | x_train, x_test, y_train, y_test = train_test_split( |
| | x, y_encoded, test_size=0.33, stratify=y_encoded, random_state=42 |
| | ) |
| |
|
| | svc_params = {k.replace('svm__', ''): v for k, v in params.items() if k.startswith('svm__')} |
| | pipeline = Pipeline([ |
| | ('scaler', StandardScaler()), |
| | ('svm', svm.SVC(**svc_params)) |
| | ]) |
| |
|
| | pipeline.fit(x_train, y_train) |
| |
|
| | y_pred = pipeline.predict(x_test) |
| |
|
| | evaluation = evaluate(model=pipeline, x_test=x_test, y_test=y_test) |
| |
|
| | y_pred_str = le.inverse_transform(y_pred) |
| | y_test_str = le.inverse_transform(y_test) |
| |
|
| | confusion(title=title, y_true=y_test_str, y_pred=y_pred_str) |
| | |
| |
|
| | print(classification_report(y_test, y_pred, zero_division=0, target_names = le.classes_)) |
| |
|
| | return pipeline, evaluation, le |
| |
|
| |
|
| | def randomSVM(x: np.ndarray, y: np.ndarray) -> dict: |
| | |
| | """ |
| | Performs randomized hyperparameter search for an SVM classifier using a pipeline with feature scaling. |
| | |
| | Args: |
| | x (np.ndarray): Feature matrix of shape (n_samples, n_features). |
| | y (np.ndarray): Target labels of shape (n_samples,). |
| | |
| | Returns: |
| | dict: The best hyperparameters found during randomized search. |
| | |
| | The function encodes the target labels, splits the data for training, constructs a pipeline with a StandardScaler and SVM, |
| | and performs RandomizedSearchCV over a predefined hyperparameter space using weighted F1 score as the evaluation metric. |
| | """ |
| |
|
| | le = LabelEncoder() |
| | y_encoded = le.fit_transform(y) |
| | x_train, _, y_train, _ = train_test_split(x, |
| | y_encoded, |
| | test_size=0.33, |
| | stratify=y_encoded, |
| | random_state=42) |
| |
|
| | pipeline = Pipeline([('scaler', StandardScaler()), |
| | ('svm', svm.SVC())]) |
| |
|
| | param_distributions = { |
| | 'svm__C': [0.001, 0.01, 0.1, 1, 10, 100, 1000], |
| | 'svm__kernel': ['rbf'], |
| | 'svm__gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1, 10], |
| | 'svm__shrinking': [True, False], |
| | 'svm__class_weight': ['balanced'], |
| | 'svm__probability' : [True], |
| | 'svm__tol': [1e-5, 1e-4, 1e-3, 1e-2], |
| | 'svm__max_iter': [-1, 5000, 7500, 10000], |
| | 'svm__decision_function_shape': ['ovr', 'ovo'], |
| | 'svm__cache_size': [200, 400, 600] |
| | } |
| |
|
| | random_search = RandomizedSearchCV( |
| | estimator=pipeline, |
| | param_distributions=param_distributions, |
| | n_iter=10, |
| | scoring='f1_weighted', |
| | cv=3, |
| | verbose=2, |
| | random_state=42, |
| | n_jobs=-1 |
| | ) |
| |
|
| | random_search.fit(x_train, y_train) |
| |
|
| | pprint(random_search.best_params_) |
| |
|
| | return random_search.best_params_ |
| |
|
| | def randomSearch(x: np.ndarray, y: np.ndarray) -> dict: |
| | |
| | """ |
| | Performs a randomized hyperparameter search for a RandomForestClassifier using the provided feature matrix and labels. |
| | Args: |
| | x (np.ndarray): Feature matrix of shape (n_samples, n_features). |
| | y (np.ndarray): Target labels of shape (n_samples,). |
| | Returns: |
| | dict: The best hyperparameters found during the randomized search. |
| | Notes: |
| | - The function encodes the labels, splits the data for training, and uses RandomizedSearchCV to optimize hyperparameters. |
| | - The search is performed using weighted F1 score and 3-fold cross-validation. |
| | - Prints the best parameters found during the search. |
| | """ |
| |
|
| | le = LabelEncoder() |
| | y_encoded = le.fit_transform(y) |
| | |
| | x_train, _, y_train, _ = train_test_split(x, y_encoded, test_size=0.33, stratify=y_encoded, random_state=42) |
| | classifier : RandomForestClassifier = RandomForestClassifier(random_state=42) |
| |
|
| | param_grid = { |
| | 'n_estimators': [100, 200, 300, 400, 500], |
| | 'max_depth': [None, 10, 20, 30, 40, 50], |
| | 'min_samples_split': [2, 5, 10, 15, 20], |
| | 'min_samples_leaf': [1, 2, 4, 8, 10], |
| | 'max_features': ['sqrt', 'log2', None, 0.3, 0.5, 0.7], |
| | 'bootstrap': [True, False], |
| | 'criterion': ['gini', 'entropy'], |
| | 'max_leaf_nodes': [None, 10, 50, 100, 200], |
| | 'class_weight' : ['balanced'] |
| | } |
| |
|
| | rf_random = RandomizedSearchCV(estimator = classifier, |
| | param_distributions = param_grid, |
| | n_iter= 10, |
| | scoring = 'f1_weighted', |
| | cv = 3, |
| | verbose = 2, |
| | n_jobs = -1) |
| | |
| | rf_random.fit(X = x_train, y = y_train) |
| |
|
| | print('Best Params') |
| | pprint(rf_random.best_params_) |
| |
|
| | return rf_random.best_params_ |
| |
|
| | def fetch_uniprot_sequence(uniprot_id: str): |
| |
|
| | """ |
| | Fetch the protein sequence for the given UniProt ID. |
| | Returns the raw amino-acid sequence as a string. |
| | Args: |
| | uniprot_id: UniProt ID to fetch the sequence for. |
| | Returns: |
| | str: Amino-acid sequence in FASTA format. |
| | Raises: |
| | HTTPError: If the request to UniProt fails. |
| | Note: |
| | This function first tries to fetch the sequence from the standard UniProt endpoint. |
| | If that fails, it falls back to the UniSave endpoint. |
| | If both fail, it returns None and prints an error message. |
| | """ |
| |
|
| | url = f"https://rest.uniprot.org/uniprotkb/{uniprot_id}.fasta" |
| | response = requests.get(url, timeout=60) |
| |
|
| | if response.status_code == 200: |
| | try: |
| | |
| | fasta_io = StringIO(response.text) |
| | record = SeqIO.read(fasta_io, "fasta") |
| | return str(record.seq) |
| |
|
| | except ValueError: |
| | |
| | url = f"https://rest.uniprot.org/unisave/{uniprot_id}.fasta" |
| | response = requests.get(url, timeout=60) |
| |
|
| | if response.status_code == 200: |
| | try: |
| | |
| | entries = re.split(r"(?=>)", response.text.strip()) |
| | fasta_io = StringIO(entries[1]) |
| | record = SeqIO.read(fasta_io, "fasta") |
| | return str(record.seq) |
| | except ValueError: |
| | print(f'No se pudo obtener la entrada FASTA para {uniprot_id} desde UniSave') |
| | else: |
| | print(f'UniSave URL inválido: {url}') |
| | else: |
| | print(f'URL inválido o no accesible: {url}') |
| |
|
| | def fetch_refseq_sequence(refseq_id: str) -> str | None: |
| | """ |
| | Fetch the protein sequence for the given RefSeq ID using NCBI Entrez. |
| | Returns the raw amino-acid sequence as a string, or None on failure. |
| | """ |
| | |
| | Entrez.email = "" |
| |
|
| | Entrez.api_key = "" |
| |
|
| | |
| | try: |
| | handle = Entrez.efetch( |
| | db="protein", |
| | id=refseq_id, |
| | rettype="fasta", |
| | retmode="text" |
| | ) |
| | record = SeqIO.read(handle, "fasta") |
| | handle.close() |
| | return str(record.seq) |
| |
|
| | except (URLLibHTTPError, ValueError) as e: |
| | |
| | print(f"[Entrez] Failed for `{refseq_id}`: {e}") |
| |
|
| | |
| | try: |
| | url = f"https://www.rcsb.org/fasta/entry/{refseq_id}" |
| | response = requests.get(url, timeout=60) |
| | response.raise_for_status() |
| | fasta_io = StringIO(response.text) |
| | record = SeqIO.read(fasta_io, "fasta") |
| | return str(record.seq) |
| |
|
| | except (RequestsHTTPError, ValueError) as e2: |
| | print(f"[RCSB] Failed for `{refseq_id}`: {e2}") |
| |
|
| | |
| | return None |
| |
|
| | def _fetch_sequence_for_row(idx, row): |
| | """ |
| | Helper to fetch sequence for a single row. Returns (idx, sequence). |
| | """ |
| | sequence = None |
| | |
| | swiss_id = row.get('SwissProt_ID') |
| | if swiss_id and not pd.isna(swiss_id): |
| | try: |
| | sequence = fetch_uniprot_sequence(swiss_id) |
| | except (URLLibHTTPError, RequestsHTTPError) as e: |
| | print(f"Warning: SwissProt fetch failed for {swiss_id} with HTTP {e}") |
| | sequence = None |
| |
|
| | |
| | if not sequence and row.get('Refseq_Accession') and not pd.isna(row['Refseq_Accession']): |
| | try: |
| | sequence = fetch_refseq_sequence(row['Refseq_Accession']) |
| | except (URLLibHTTPError, RequestsHTTPError) as e: |
| | print(f"Warning: RefSeq fetch failed for {row['Refseq_Accession']} with HTTP {e}") |
| | sequence = None |
| |
|
| | |
| | if not sequence and row.get('Other_Accession') and not pd.isna(row['Other_Accession']): |
| | try: |
| | sequence = fetch_refseq_sequence(row['Other_Accession']) |
| | except (URLLibHTTPError, RequestsHTTPError) as e: |
| | print(f"Warning: RefSeq fetch failed for {row['Other_Accession']} with HTTP {e}") |
| | sequence = None |
| |
|
| | return idx, sequence |
| |
|
| | |
| | def fetch_sequences_for_dataframe(df: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | Add a 'sequence' column to the dataframe by fetching sequences from |
| | SwissProt or RefSeq based on available IDs, processing rows sequentially. |
| | |
| | Args: |
| | df: Input DataFrame with ID columns. |
| | |
| | Returns: |
| | DataFrame with added 'sequence' column. |
| | """ |
| | result_df = df.copy() |
| | if 'sequence' not in result_df.columns: |
| | result_df['sequence'] = None |
| |
|
| | total_rows = len(result_df) |
| |
|
| | for idx, row in tqdm(result_df.iterrows(), total=total_rows, desc="Retrieving sequences", unit="row"): |
| | _, seq = _fetch_sequence_for_row(idx, row) |
| | result_df.at[idx, 'sequence'] = seq |
| |
|
| | print("Sequence retrieval complete") |
| | success_count = result_df['sequence'].notna().sum() |
| | print(f"Successfully retrieved {success_count} out of {total_rows} sequences " |
| | f"({round(success_count / total_rows * 100, 2)}%)") |
| | return result_df |
| |
|
| |
|
| | |
| | def esm_embed(model: ESMC, |
| | seq : str, |
| | acc : str, |
| | device : torch.device = torch.device( |
| | 'cuda' if torch.cuda.is_available() |
| | else 'cpu' |
| | )) -> Optional[np.ndarray]: |
| | |
| | """ |
| | Generates an embedding for a given protein sequence using an ESM model. |
| | Args: |
| | model (ESMC): The ESM model used for encoding and generating embeddings. |
| | seq (str): The amino acid sequence of the protein. |
| | acc (str): The accession identifier for the protein (used for error reporting). |
| | device (torch.device, optional): The device to run the computation on. Defaults to CUDA if available, otherwise CPU. |
| | Returns: |
| | Optional[np.ndarray]: The embedding vector for the protein sequence, or None if embedding could not be generated. |
| | Raises: |
| | ESMProteinError: If there is an error during protein encoding or embedding generation. |
| | Side Effects: |
| | Displays an error message using `messagebox.showerror` if an error occurs during processing. |
| | """ |
| |
|
| | protein : ESMProtein = ESMProtein(sequence = seq) |
| | protein_tensor = model.encode(protein).to(device) |
| |
|
| | if isinstance(protein_tensor, ESMProteinError): |
| | messagebox.showerror("Error", f"Error processing {acc}: {protein_tensor}") |
| | raise protein_tensor |
| | |
| | try: |
| |
|
| | output : LogitsOutput = model.logits(protein_tensor, |
| | LogitsConfig(sequence=True, |
| | return_embeddings=True)) |
| | |
| | if output is not None and output.embeddings is not None: |
| | arr_output : np.ndarray = output.embeddings.cpu().numpy() |
| | |
| | if len(arr_output.shape) == 3: |
| | arr_output = arr_output.squeeze(axis=0).mean(axis=0) |
| | |
| | elif len(arr_output.shape) == 2: |
| | arr_output = arr_output.mean(axis=0) |
| | |
| | return arr_output |
| | except (ESMProteinError, RuntimeError) as e: |
| | messagebox.showerror("Error", f"Error processing {acc}: {e}") |
| | return |
| |
|
| | def predict_with_esm(fasta_path : str, |
| | model : Literal['esmc_600m', 'esmc_300m'], |
| | device : torch.device = torch.device('cuda' if torch.cuda.is_available() |
| | else 'cpu') |
| | ) -> None: |
| | |
| | """ |
| | Predicts protein subcellular localization using ESM embeddings and a pre-trained random forest model. |
| | This function performs the following steps: |
| | 1. Prompts the user to select an output directory via a dialog. |
| | 2. Validates the provided FASTA file path. |
| | 3. Extracts sequences and their IDs from the FASTA file. |
| | 4. Displays a progress bar while generating ESM embeddings for each sequence. |
| | 5. Loads a pre-trained random forest model for prediction. |
| | 6. Predicts class probabilities for each sequence embedding. |
| | 7. Sorts and saves the predictions to a text file in the selected output directory. |
| | 8. Displays sample predictions in the console. |
| | Args: |
| | fasta_path (str): Path to the input FASTA file containing protein sequences. |
| | model (Literal['esmc_600m', 'esmc_300m']): Name of the ESM model to use for embedding. |
| | device (torch.device, optional): Device to run the model on (CPU or CUDA). Defaults to CUDA if available. |
| | Returns: |
| | None |
| | Raises: |
| | Shows error dialogs for invalid input or missing files. |
| | Prints errors to the console if the random forest model file is not found. |
| | Side Effects: |
| | - Opens file dialogs and message boxes. |
| | - Creates and updates a Tkinter progress bar window. |
| | - Saves prediction results to a text file. |
| | - Prints sample predictions to the console. |
| | """ |
| | |
| | output_dir = filedialog.askdirectory(title="Select output directory") |
| | if not output_dir: |
| | return |
| | |
| | if fasta_path is None or not os.path.exists(fasta_path): |
| | messagebox.showerror("Error", "Invalid FASTA file path.") |
| | return |
| | |
| | result = fasta_to_seq(fasta_path) |
| | if result is None: |
| | messagebox.showerror("Error", "No sequences found in FASTA file.") |
| | return |
| | seq, ids = result |
| | total = len(seq) |
| | |
| | |
| | root = tk.Tk() |
| | root.withdraw() |
| | |
| | progress_win = tk.Toplevel(root) |
| | progress_win.title("Embedding Progress") |
| | progress_label = tk.Label(progress_win, text="Embedding sequences...") |
| | progress_label.pack(padx=10, pady=5) |
| | progress = ttk.Progressbar(progress_win, length=300, mode='determinate', maximum=total) |
| | progress.pack(padx=10, pady=10) |
| |
|
| | client: ESMC = ESMC.from_pretrained(model).to(device) |
| | embeddings = {} |
| | for i, (sequence, acc) in enumerate(zip(seq, ids)): |
| | emb = esm_embed(model=client, |
| | seq=sequence, |
| | acc=acc, |
| | device=device) |
| | if emb is not None: |
| | embeddings[acc] = emb |
| | |
| | |
| | progress['value'] = i + 1 |
| | progress_win.update_idletasks() |
| |
|
| | progress_label.config(text="Embedding complete!") |
| | tk.Button(progress_win, text="Close", command=progress_win.destroy).pack(pady=5) |
| |
|
| | |
| | messagebox.showinfo("Info", "Loading SVM for predictions...") |
| | project_root: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) |
| | model_path = os.path.join(project_root, 'Models/ESMC-300m_svm.joblib' |
| | if |
| | model == 'esmc_300m' |
| | else 'Models/ESMC-600m_svm.joblib') |
| | le_path = os.path.join(project_root, 'Models/esm_300m_le_svm.joblib' |
| | if |
| | model == 'esmc_300m' |
| | else 'Models/ESMC-600m_le_svm.joblib') |
| |
|
| | try: |
| | predictor = load(model_path) |
| | le: LabelEncoder = load(le_path) |
| |
|
| | except FileNotFoundError: |
| | print(f"Error: Could not find the model file '{model_path}'") |
| | return |
| | |
| | sequence_ids = list(embeddings.keys()) |
| | x = np.array(list(embeddings.values())) |
| | messagebox.showinfo("Info", "Making predictions...") |
| | y_pred_proba = predictor.predict_proba(x) |
| |
|
| | |
| | if hasattr(predictor, 'classes_'): |
| | class_names = le.inverse_transform(predictor.classes_) |
| | else: |
| | class_names = [f"Class_{i}" for i in range(y_pred_proba.shape[1])] |
| |
|
| | predictions_dict = {} |
| | for i, seq_id in enumerate(sequence_ids): |
| | class_prob_pairs = sorted(zip(class_names, y_pred_proba[i]), key=lambda x: x[1], reverse=True) |
| | sorted_classes, sorted_probs = zip(*class_prob_pairs) |
| | predictions_dict[seq_id] = (list(sorted_classes), list(sorted_probs)) |
| |
|
| | |
| | input_filename = f"{os.path.splitext(os.path.basename(fasta_path))[0]}_predictions.txt" |
| | output_file = filedialog.asksaveasfilename(title="Save Predictions", |
| | initialdir=output_dir, |
| | initialfile=input_filename, |
| | defaultextension=".txt", |
| | filetypes=[("Text files", "*.txt"), ("All files", "*.*")] |
| | ) |
| |
|
| | print(f"Saving predictions to {output_file}...") |
| | save_predictions_to_txt(predictions_dict, output_file) |
| | print("Predictions saved successfully!") |
| | print(f"Total sequences processed: {len(embeddings)}") |
| |
|
| | print("\nSample predictions:") |
| | for i, (seq_id, (classes, probs)) in enumerate(list(predictions_dict.items())[:3]): |
| | pred_str = ", ".join([f"{cls} ({prob:.4f})" for cls, prob in zip(classes, probs)]) |
| | print(f"{seq_id}: {pred_str}") |
| |
|
| | def prost_embed_sequence(seq : str, |
| | acc : str, |
| | tokenizer : T5Tokenizer, |
| | model : PreTrainedModel, |
| | device : torch.device = torch.device( |
| | 'cuda:0' |
| | if torch.cuda.is_available() |
| | else 'cpu' |
| | ))-> Optional[np.ndarray]: |
| | |
| | """ |
| | Embeds a protein sequence using the ProstT5 model and returns the averaged embedding as a NumPy array. |
| | Args: |
| | seq (str): The amino acid sequence to embed. Non-standard amino acids (U, Z, O, B) are replaced with 'X'. |
| | acc (str): Accession or identifier for the sequence, used for logging. |
| | device (torch.device, optional): The device to run the model on. Defaults to CUDA if available, otherwise CPU. |
| | Returns: |
| | Optional[np.ndarray]: The averaged embedding vector for the input sequence, or None if an error occurs. |
| | Notes: |
| | - Uses half-precision on GPU and full precision on CPU for efficiency. |
| | - Returns None and prints an error message if the sequence is too short or if a runtime/value error occurs. |
| | """ |
| |
|
| | model = model.to(device) |
| | model = model.half() if str(device) != 'cpu' else model.float() |
| |
|
| | seq = re.sub(r"[UZOB]", "X", seq) |
| | seq = " ".join(list(seq)) |
| | |
| | try: |
| | |
| | ids = tokenizer(seq, add_special_tokens=True, return_tensors='pt') |
| |
|
| | |
| | ids = {k: v.to(device) for k, v in ids.items()} |
| |
|
| | with torch.no_grad(): |
| | |
| | with torch.no_grad(): |
| | embedding_repr = model( |
| | ids['input_ids'], |
| | attention_mask=ids['attention_mask'] |
| | ) |
| |
|
| | real_len = ids['attention_mask'][0].sum().item() - 1 |
| | |
| | if real_len <= 0: |
| | print(f"Sequence too short after tokenization for {acc}") |
| |
|
| | |
| | emb = embedding_repr.last_hidden_state[0, 1:real_len] |
| | emb_avg = emb.mean(dim=0).cpu().numpy() |
| |
|
| | return emb_avg |
| |
|
| | except RuntimeError as e: |
| | print(f"RuntimeError while processing {acc}: {e}") |
| | return None |
| | except ValueError as e: |
| | print(f"ValueError while processing {acc}: {e}") |
| | return None |
| |
|
| | def fasta_to_seq(fasta_file: str) -> Optional[tuple[list[str], list[str]]]: |
| | """ |
| | Reads a FASTA file and extracts the sequences as a list of strings. |
| | Args: |
| | fasta_file (str): Path to the FASTA file to be read. |
| | Returns: |
| | list[str]: A list containing the sequences from the FASTA file as strings. |
| | Returns an empty list if there is an error reading the file. |
| | Raises: |
| | ValueError: If the file cannot be parsed as FASTA. |
| | """ |
| |
|
| | sequences = [] |
| | ids = [] |
| |
|
| | with open(fasta_file, 'r', encoding='utf-8') as f: |
| |
|
| | try: |
| |
|
| | for record in SeqIO.parse(f, "fasta"): |
| | sequences.append(str(record.seq)) |
| | ids.append(str(record.id)) |
| | return sequences, ids |
| |
|
| | except ValueError as e: |
| | print(f"Error reading {fasta_file}: {e}") |
| | return None |
| |
|
| | def save_predictions_to_txt(predictions_dict: dict[str, tuple[list[str], list[float]]], |
| | output_file: str) -> None: |
| | """ |
| | Save predictions to a text file in the specified format. |
| | |
| | Args: |
| | predictions_dict: Dictionary with sequence_id as key and (class_names, probabilities) as value |
| | output_file: Path to the output text file |
| | """ |
| | with open(output_file, 'w', encoding='utf-8') as f: |
| | f.write("Sequence_ID,Prediction 1,Prediction 2,Prediction 3,Prediction 4,Prediction 5,Prediction 6\n") |
| | |
| | for seq_id, (class_names, probabilities) in predictions_dict.items(): |
| |
|
| | |
| | class_prob_pairs = list(zip(class_names, probabilities)) |
| | class_prob_pairs.sort(key=lambda x: x[1], reverse=True) |
| |
|
| | |
| | pred_strings = [f"{cls} ({prob:.4f})" for cls, prob in zip(class_names, probabilities)] |
| | pred_line = ",".join(pred_strings) |
| | |
| | f.write(f"{seq_id},{pred_line}\n") |
| |
|
| | def predict_with_prost(fasta_path: str): |
| | """ |
| | Function to embed sequences from a provided FASTA file using ProstT5 and predict locations. |
| | """ |
| | if not fasta_path or not os.path.exists(fasta_path): |
| | print("Invalid FASTA file path.") |
| | return |
| |
|
| | |
| | root = tk.Tk() |
| | root.withdraw() |
| |
|
| | output_dir = filedialog.askdirectory(title="Select output directory") |
| | if not output_dir: |
| | return |
| |
|
| | result = fasta_to_seq(fasta_path) |
| | if result is None: |
| | messagebox.showerror("Error", "No sequences found in FASTA file.") |
| | return |
| |
|
| | sequences, ids = result |
| | total = len(sequences) |
| | |
| | |
| | progress_win = tk.Toplevel(root) |
| | progress_win.title("Embedding Progress") |
| | progress_label = tk.Label(progress_win, text="Embedding sequences...") |
| | progress_label.pack(padx=10, pady=5) |
| | progress = ttk.Progressbar(progress_win, length=300, mode='determinate', maximum=total) |
| | progress.pack(padx=10, pady=10) |
| |
|
| | |
| | try: |
| | progress_label.config(text="Loading ProstT5 model...") |
| | progress_win.update_idletasks() |
| | |
| | tokenizer = T5Tokenizer.from_pretrained("Rostlab/ProstT5", do_lower_case=False, legacy=True) |
| | model = T5EncoderModel.from_pretrained("Rostlab/ProstT5") |
| | |
| | progress_label.config(text="Model loaded successfully! Embedding sequences...") |
| | progress_win.update_idletasks() |
| | |
| | except RuntimeError as e: |
| | progress_win.destroy() |
| | if "Cannot allocate memory" in str(e): |
| | messagebox.showerror( |
| | "Memory Error", |
| | "Insufficient memory to load ProstT5 model.\n\n" |
| | "Please try:\n" |
| | "1. Close other applications\n" |
| | "2. Restart your computer\n" |
| | "3. Clear the model cache:\n" |
| | " rm -rf ~/.cache/huggingface/hub/models--Rostlab--ProstT5/" |
| | ) |
| | else: |
| | messagebox.showerror("Runtime Error", f"Error loading model: {str(e)}") |
| | return |
| |
|
| | embeddings = {} |
| |
|
| | for i, (seq, acc) in enumerate(zip(sequences, ids)): |
| | emb = prost_embed_sequence(seq, acc, tokenizer, model) |
| | if emb is not None: |
| | embeddings[acc] = emb |
| |
|
| | |
| | progress['value'] = i + 1 |
| | progress_win.update_idletasks() |
| |
|
| | progress_label.config(text="Embedding complete!") |
| | tk.Button(progress_win, text="Close", command=progress_win.destroy).pack(pady=5) |
| |
|
| | |
| | messagebox.showinfo("Info", "Loading SVM model for predictions...") |
| | project_root: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) |
| | model_path = os.path.join(project_root, 'Models/Prost T5_svm.joblib') |
| | le_path = os.path.join(project_root, 'Models/Prost T5_le_svm.joblib') |
| |
|
| | try: |
| | predictor = load(model_path) |
| | le : LabelEncoder = load(le_path) |
| | |
| | except FileNotFoundError: |
| | print(f"Error: Could not find the model file '{model_path}'") |
| | return |
| |
|
| | sequence_ids = list(embeddings.keys()) |
| | x = np.array(list(embeddings.values())) |
| |
|
| | print("Making predictions...") |
| | y_pred_proba = predictor.predict_proba(x) |
| |
|
| | |
| | if hasattr(predictor, 'classes_'): |
| | class_names = le.inverse_transform(predictor.classes_) |
| | else: |
| | class_names = [f"Class_{i}" for i in range(y_pred_proba.shape[1])] |
| |
|
| | predictions_dict = {} |
| | for i, seq_id in enumerate(sequence_ids): |
| | class_prob_pairs = sorted(zip(class_names, y_pred_proba[i]), key=lambda x: x[1], reverse=True) |
| | sorted_classes, sorted_probs = zip(*class_prob_pairs) |
| | predictions_dict[seq_id] = (list(sorted_classes), list(sorted_probs)) |
| |
|
| | |
| | input_filename = f"{os.path.splitext(os.path.basename(fasta_path))[0]}_predictions.txt" |
| | output_file = filedialog.asksaveasfilename(title="Save Predictions", |
| | initialdir=output_dir, |
| | initialfile=input_filename, |
| | defaultextension=".txt", |
| | filetypes=[("Text files", "*.txt"), ("All files", "*.*")] |
| | ) |
| |
|
| | print(f"Saving predictions to {output_file}...") |
| | save_predictions_to_txt(predictions_dict, output_file) |
| | print("Predictions saved successfully!") |
| | print(f"Total sequences processed: {len(embeddings)}") |
| |
|
| | print("\nSample predictions:") |
| | for i, (seq_id, (classes, probs)) in enumerate(list(predictions_dict.items())[:3]): |
| | pred_str = ", ".join([f"{cls} ({prob:.4f})" for cls, prob in zip(classes, probs)]) |
| | print(f"{seq_id}: {pred_str}") |
| |
|
| |
|