|
|
|
|
|
import os |
|
|
import re |
|
|
from pprint import pprint |
|
|
from io import StringIO |
|
|
from typing import Literal, Optional |
|
|
import tkinter as tk |
|
|
from tkinter import filedialog, messagebox, ttk |
|
|
from urllib.error import HTTPError as URLLibHTTPError |
|
|
|
|
|
|
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
|
|
|
from sklearn.ensemble import RandomForestClassifier |
|
|
from sklearn import svm |
|
|
from sklearn.model_selection import RandomizedSearchCV |
|
|
from sklearn.metrics import ( |
|
|
classification_report, |
|
|
accuracy_score, |
|
|
f1_score, |
|
|
recall_score, |
|
|
precision_score, |
|
|
confusion_matrix, |
|
|
) |
|
|
from sklearn.decomposition import PCA |
|
|
from sklearn.preprocessing import StandardScaler, LabelEncoder |
|
|
from sklearn.pipeline import Pipeline |
|
|
from sklearn.manifold import TSNE |
|
|
import umap |
|
|
from sklearn.model_selection import train_test_split |
|
|
from sklearn.base import BaseEstimator |
|
|
|
|
|
import requests |
|
|
from requests.exceptions import HTTPError as RequestsHTTPError |
|
|
from Bio import Entrez |
|
|
from Bio import SeqIO |
|
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
import seaborn as sns |
|
|
import matplotlib.pyplot as plt |
|
|
from matplotlib.figure import Figure |
|
|
|
|
|
from esm.models.esmc import ESMC |
|
|
from esm.sdk.api import ESMProtein, LogitsConfig, ESMProteinError, LogitsOutput |
|
|
from transformers import T5Tokenizer, T5EncoderModel, PreTrainedModel |
|
|
|
|
|
from joblib import load |
|
|
|
|
|
import torch |
|
|
|
|
|
import sys |
|
|
import os |
|
|
|
|
|
project_root = os.path.abspath(os.path.join(os.getcwd(), "..")) |
|
|
|
|
|
def load_emb(path: str, acc: list[str]) -> np.ndarray: |
|
|
""" |
|
|
Loads and processes embedding files from a specified directory for a list of accession identifiers. |
|
|
Each embedding is expected to be stored as a .npy file named after its accession in the given path. |
|
|
- If the embedding has 3 dimensions, it is squeezed along the first axis and then averaged along the next axis. |
|
|
- If the embedding has 2 dimensions, it is averaged along the first axis. |
|
|
- Otherwise, the embedding is used as is. |
|
|
Args: |
|
|
path (str): Directory path where the embedding .npy files are stored. |
|
|
acc (list[str]): List of accession identifiers corresponding to the embedding files. |
|
|
Returns: |
|
|
np.ndarray: A 2D array where each row corresponds to the processed embedding of an accession. |
|
|
Raises: |
|
|
FileNotFoundError: If the specified path does not exist. |
|
|
""" |
|
|
|
|
|
if not os.path.exists(path): |
|
|
raise FileNotFoundError(f"The specified path does not exist: {path}") |
|
|
|
|
|
total_files = len([f for f in os.listdir(path) if f.endswith('.npy')]) |
|
|
|
|
|
x = [] |
|
|
|
|
|
for a in tqdm(acc, desc = 'Cargando embeddings', total=total_files): |
|
|
|
|
|
emb : np.ndarray = np.load(os.path.join(path, f"{a}.npy")) |
|
|
|
|
|
if len(emb.shape) == 3: |
|
|
emb = emb.squeeze(axis = 0) |
|
|
emb = emb.mean(axis = 0) |
|
|
x.append(emb) |
|
|
|
|
|
elif len(emb.shape) == 2: |
|
|
emb = emb.mean(axis = 0) |
|
|
x.append(emb) |
|
|
|
|
|
else: |
|
|
x.append(emb) |
|
|
|
|
|
return np.vstack(x) |
|
|
|
|
|
def confusion(title : str, y_true: np.ndarray, y_pred: np.ndarray) -> Figure: |
|
|
|
|
|
""" |
|
|
Plot a confusion matrix for the given true and predicted labels. |
|
|
Args: |
|
|
title (str): Title for the confusion matrix plot. |
|
|
y_true (np.ndarray): True labels. |
|
|
y_pred (np.ndarray): Predicted labels. |
|
|
""" |
|
|
|
|
|
cm = confusion_matrix(y_true = y_true, |
|
|
y_pred = y_pred, |
|
|
normalize = 'pred') |
|
|
|
|
|
class_names = list(np.unique(y_true)) |
|
|
fig = plt.figure(figsize=(10, 10)) |
|
|
sns.heatmap(cm, annot=True, fmt='.2f', cmap='Greys', |
|
|
xticklabels=class_names, yticklabels=class_names) |
|
|
|
|
|
plt.xlabel('Predicted Label') |
|
|
plt.ylabel('True Label') |
|
|
plt.title(f'Confusion Matrix - {title}') |
|
|
plt.tight_layout() |
|
|
plt.show() |
|
|
|
|
|
return fig |
|
|
|
|
|
def plot_umap(x: np.ndarray, y: np.ndarray, title: str) -> None: |
|
|
""" |
|
|
Plots a 2D UMAP projection of high-dimensional data with class labels. |
|
|
Parameters: |
|
|
x (np.ndarray): The input feature matrix of shape (n_samples, n_features). |
|
|
y (np.ndarray): The array of labels corresponding to each sample. |
|
|
title (str): The title for the plot. |
|
|
Returns: |
|
|
None: Displays a scatter plot of the UMAP embedding colored by label. |
|
|
""" |
|
|
|
|
|
reducer = umap.UMAP(n_neighbors=30, random_state=42) |
|
|
|
|
|
scaled_x = StandardScaler().fit_transform(x) |
|
|
embedding = reducer.fit_transform(scaled_x) |
|
|
embedding = np.array(embedding) |
|
|
|
|
|
df_plot = pd.DataFrame({ |
|
|
'UMAP1': embedding[:, 0], |
|
|
'UMAP2': embedding[:, 1], |
|
|
'Label' : y |
|
|
}) |
|
|
|
|
|
plt.figure(figsize=(14, 6)) |
|
|
fig = sns.scatterplot(data=df_plot, x='UMAP1', y='UMAP2', hue='Label', alpha=0.7) |
|
|
fig.set_title(title) |
|
|
fig.set_xlabel('UMAP Component 1') |
|
|
fig.set_ylabel('UMAP Component 2') |
|
|
plt.legend(title='Labels', bbox_to_anchor=(1.05, 1), loc='upper left') |
|
|
plt.tight_layout() |
|
|
plt.show() |
|
|
|
|
|
|
|
|
|
|
|
def plot_pca(x : np.ndarray, labels: np.ndarray, title: str) -> None: |
|
|
""" |
|
|
Plots the first two principal components of the given data using PCA. |
|
|
|
|
|
Parameters: |
|
|
X (np.ndarray): Input data array of shape (n_samples, n_features). |
|
|
labels (list[str]): List of class or group labels for each sample. |
|
|
title (str): Title for the plot. |
|
|
org (list[str]): List of organism or sample identifiers for hover information. |
|
|
|
|
|
Returns: |
|
|
None: Displays an interactive scatter plot of the first two principal components. |
|
|
""" |
|
|
pca = PCA(n_components=2, random_state=42) |
|
|
|
|
|
|
|
|
pipe = Pipeline([('scaler', StandardScaler()), ('pca', pca)]) |
|
|
scaled_x = pipe.fit_transform(x) |
|
|
explained = pipe.named_steps['pca'].explained_variance_ratio_ |
|
|
|
|
|
|
|
|
df_plot = pd.DataFrame({ |
|
|
'PC1': scaled_x[:, 0], |
|
|
'PC2': scaled_x[:, 1], |
|
|
'Label': labels |
|
|
}) |
|
|
|
|
|
plt.figure(figsize=(14, 6)) |
|
|
fig = sns.scatterplot(data=df_plot, x='PC1', y='PC2', hue='Label', alpha=0.7) |
|
|
fig.set_title(f'{title} - Explained Variance: {explained[0]:.2f}, {explained[1]:.2f}') |
|
|
fig.set_xlabel('First Principal Component') |
|
|
fig.set_ylabel('Second Principal Component') |
|
|
plt.legend(title='Labels', bbox_to_anchor=(1.05, 1), loc='upper left') |
|
|
plt.tight_layout() |
|
|
plt.show() |
|
|
|
|
|
|
|
|
def tsne_plot(x: np.ndarray, labels: np.ndarray, title: str) -> None: |
|
|
""" |
|
|
Plots a 2D t-SNE projection of high-dimensional data with color-coded labels. |
|
|
|
|
|
Args: |
|
|
x (list[np.ndarray]): List of feature arrays to be concatenated and visualized. |
|
|
labels (list[str]): List of labels corresponding to each sample in x. |
|
|
title (str): Title for the plot. |
|
|
""" |
|
|
x_scaled = StandardScaler().fit_transform(x) |
|
|
tsne = TSNE(n_components=2, perplexity=60, random_state=42) |
|
|
tsne_fit = tsne.fit_transform(x_scaled) |
|
|
|
|
|
df_plot = pd.DataFrame({ |
|
|
't-SNE1': tsne_fit[:, 0], |
|
|
't-SNE2': tsne_fit[:, 1], |
|
|
'Label': labels |
|
|
}) |
|
|
|
|
|
|
|
|
plt.figure(figsize=(14, 6)) |
|
|
fig = sns.scatterplot(data=df_plot, x='t-SNE1', y='t-SNE2', hue='Label', alpha=0.7) |
|
|
fig.set_title(title) |
|
|
fig.set_xlabel('First t-SNE Component') |
|
|
fig.set_ylabel('Second t-SNE Component') |
|
|
plt.legend(title='Labels', bbox_to_anchor=(1.05, 1), loc='upper left') |
|
|
plt.tight_layout() |
|
|
plt.show() |
|
|
|
|
|
def plot_emb(x: np.ndarray, labels : np.ndarray, model_name: str): |
|
|
|
|
|
""" Plot embeddings using PCA, t-SNE, and UMAP. |
|
|
Args: |
|
|
X (list[np.ndarray]): List of feature arrays to be concatenated and visualized. |
|
|
y (list[str]): List of labels corresponding to each sample in X, used for coloring the scatter plot. |
|
|
model_name (str): Name of the model used for generating embeddings. |
|
|
org (list[str]): List of organism or group identifiers for each sample, shown in hover data. |
|
|
""" |
|
|
|
|
|
print(f"Plotting embeddings for: {model_name}") |
|
|
plot_pca(x, labels, title=f'PCA - {model_name}') |
|
|
tsne_plot(x, labels, title=f't-SNE - {model_name}') |
|
|
plot_umap(x, labels, title=f'UMAP - {model_name}') |
|
|
|
|
|
def evaluate(model: BaseEstimator, |
|
|
x_test: np.ndarray, |
|
|
y_test: np.ndarray) -> dict: |
|
|
|
|
|
""" |
|
|
Evaluates a classification model on test data and computes performance metrics. |
|
|
Parameters: |
|
|
model: A trained classification model with a `predict` method. |
|
|
X_test: Features of the test dataset. |
|
|
y_test: True labels for the test dataset. |
|
|
Returns: |
|
|
dict: A dictionary containing the following evaluation metrics: |
|
|
- 'Accuracy': Overall accuracy of the model. |
|
|
- 'Recall': Weighted recall score. |
|
|
- 'Precision': Weighted precision score. |
|
|
- 'F1': Weighted F1 score. |
|
|
Side Effects: |
|
|
Prints the evaluation metrics using pprint. |
|
|
""" |
|
|
|
|
|
result = {} |
|
|
y_pred = model.predict(x_test) |
|
|
|
|
|
|
|
|
result['Recall_weighted'] = recall_score(y_test, y_pred, average = 'weighted') |
|
|
result['Precision_weighted'] = precision_score(y_test, y_pred, average='weighted') |
|
|
result['F1_weighted'] = f1_score(y_test, y_pred, average='weighted') |
|
|
|
|
|
result['Recall_micro'] = recall_score(y_test, y_pred, average = 'macro') |
|
|
result['Precision_micro'] = precision_score(y_test, y_pred, average='macro') |
|
|
result['F1_micro'] = f1_score(y_test, y_pred, average='macro') |
|
|
|
|
|
result['Recall_macro'] = recall_score(y_test, y_pred, average = 'macro') |
|
|
result['Precision_macro'] = precision_score(y_test, y_pred, average='macro') |
|
|
result['F1_macro'] = f1_score(y_test, y_pred, average='macro') |
|
|
|
|
|
pprint(result) |
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def train_rf(title: str, |
|
|
x: np.ndarray, |
|
|
y : np.ndarray, |
|
|
params: dict) -> tuple[RandomForestClassifier, dict, LabelEncoder]: |
|
|
|
|
|
""" |
|
|
Trains a RandomForestClassifier on the provided data, evaluates its performance, and displays results. |
|
|
Args: |
|
|
title (str): Title for the confusion matrix plot. |
|
|
x (np.ndarray): Feature matrix for training and testing. |
|
|
y (np.ndarray): Target labels corresponding to the feature matrix. |
|
|
params (dict): Parameters to initialize the RandomForestClassifier. |
|
|
Returns: |
|
|
tuple[RandomForestClassifier, dict, LabelEncoder]: |
|
|
- Trained RandomForestClassifier instance, |
|
|
- Evaluation metrics as a dictionary, |
|
|
- Fitted LabelEncoder for label transformations. |
|
|
Side Effects: |
|
|
- Prints a classification report to stdout. |
|
|
- Displays a confusion matrix plot. |
|
|
""" |
|
|
|
|
|
le = LabelEncoder() |
|
|
y_encoded = le.fit_transform(y) |
|
|
|
|
|
x_train, x_test, y_train, y_test = train_test_split(x, y_encoded, test_size=0.33, stratify=y_encoded, random_state=42) |
|
|
|
|
|
|
|
|
classifier: RandomForestClassifier = RandomForestClassifier(**params) |
|
|
|
|
|
|
|
|
classifier.fit(x_train, y_train) |
|
|
|
|
|
|
|
|
y_pred = classifier.predict(x_test) |
|
|
|
|
|
evaluation = evaluate(classifier, x_test, y_test) |
|
|
|
|
|
classification = classification_report(y_test, |
|
|
y_pred, |
|
|
zero_division=0, |
|
|
target_names = le.classes_, |
|
|
output_dict=True) |
|
|
pd.DataFrame(classification).to_csv(os.path.join(project_root, 'Classification_Reports', f'{title}_classification_report.csv'), index=True) |
|
|
|
|
|
print(classification_report(y_test, |
|
|
y_pred, |
|
|
zero_division=0, |
|
|
target_names = le.classes_)) |
|
|
|
|
|
y_pred_str = le.inverse_transform(y_pred) |
|
|
y_test_str = le.inverse_transform(y_test) |
|
|
|
|
|
fig = confusion(title=title, y_true=y_test_str, y_pred=y_pred_str) |
|
|
|
|
|
fig.savefig(os.path.join(project_root, 'Plots', f'{title}_confusion_matrix.png')) |
|
|
|
|
|
return classifier, evaluation, le |
|
|
|
|
|
def train_svm(title: str, x: np.ndarray, y: np.ndarray, params: dict) -> tuple[Pipeline, dict, LabelEncoder]: |
|
|
""" |
|
|
Trains an SVM classifier using the provided data and parameters, evaluates its performance, and returns the trained pipeline, evaluation metrics, and label encoder. |
|
|
|
|
|
Args: |
|
|
title (str): Title for the confusion matrix plot. |
|
|
x (np.ndarray): Feature matrix for training and testing. |
|
|
y (np.ndarray): Target labels corresponding to the feature matrix. |
|
|
params (dict): Dictionary of parameters for the SVM classifier. SVM-specific parameters should be prefixed with 'svm__'. |
|
|
|
|
|
Returns: |
|
|
tuple[Pipeline, dict, LabelEncoder]: |
|
|
- Trained scikit-learn Pipeline object containing the scaler and SVM. |
|
|
- Dictionary with evaluation metrics from the `evaluate` function. |
|
|
- Fitted LabelEncoder instance for encoding and decoding labels. |
|
|
|
|
|
Side Effects: |
|
|
- Displays a confusion matrix plot using the provided title. |
|
|
- Prints a classification report to the standard output. |
|
|
""" |
|
|
|
|
|
le = LabelEncoder() |
|
|
y_encoded = le.fit_transform(y) |
|
|
|
|
|
x_train, x_test, y_train, y_test = train_test_split( |
|
|
x, y_encoded, test_size=0.33, stratify=y_encoded, random_state=42 |
|
|
) |
|
|
|
|
|
svc_params = {k.replace('svm__', ''): v for k, v in params.items() if k.startswith('svm__')} |
|
|
pipeline = Pipeline([ |
|
|
('scaler', StandardScaler()), |
|
|
('svm', svm.SVC(**svc_params)) |
|
|
]) |
|
|
|
|
|
pipeline.fit(x_train, y_train) |
|
|
|
|
|
y_pred = pipeline.predict(x_test) |
|
|
|
|
|
evaluation = evaluate(model=pipeline, x_test=x_test, y_test=y_test) |
|
|
|
|
|
y_pred_str = le.inverse_transform(y_pred) |
|
|
y_test_str = le.inverse_transform(y_test) |
|
|
|
|
|
fig = confusion(title=title, y_true=y_test_str, y_pred=y_pred_str) |
|
|
fig.savefig(os.path.join(project_root, 'Plots', f'{title}_confusion_matrix.png')) |
|
|
|
|
|
|
|
|
classification = classification_report(y_test, |
|
|
y_pred, |
|
|
zero_division=0, |
|
|
target_names = le.classes_, |
|
|
output_dict=True) |
|
|
pd.DataFrame(classification).to_csv(os.path.join(project_root, 'Classification_Reports', f'{title}_classification_report.csv'), index=True) |
|
|
|
|
|
return pipeline, evaluation, le |
|
|
|
|
|
|
|
|
def randomSVM(x: np.ndarray, y: np.ndarray) -> dict: |
|
|
|
|
|
""" |
|
|
Performs randomized hyperparameter search for an SVM classifier using a pipeline with feature scaling. |
|
|
|
|
|
Args: |
|
|
x (np.ndarray): Feature matrix of shape (n_samples, n_features). |
|
|
y (np.ndarray): Target labels of shape (n_samples,). |
|
|
|
|
|
Returns: |
|
|
dict: The best hyperparameters found during randomized search. |
|
|
|
|
|
The function encodes the target labels, splits the data for training, constructs a pipeline with a StandardScaler and SVM, |
|
|
and performs RandomizedSearchCV over a predefined hyperparameter space using weighted F1 score as the evaluation metric. |
|
|
""" |
|
|
|
|
|
le = LabelEncoder() |
|
|
y_encoded = le.fit_transform(y) |
|
|
x_train, _, y_train, _ = train_test_split(x, |
|
|
y_encoded, |
|
|
test_size=0.33, |
|
|
stratify=y_encoded, |
|
|
random_state=42) |
|
|
|
|
|
pipeline = Pipeline([('scaler', StandardScaler()), |
|
|
('svm', svm.SVC())]) |
|
|
|
|
|
param_distributions = { |
|
|
'svm__C': [0.001, 0.01, 0.1, 1, 10, 100, 1000], |
|
|
'svm__kernel': ['rbf'], |
|
|
'svm__gamma': ['scale', 'auto', 0.001, 0.01, 0.1, 1, 10], |
|
|
'svm__shrinking': [True, False], |
|
|
'svm__class_weight': ['balanced'], |
|
|
'svm__probability' : [True], |
|
|
'svm__tol': [1e-5, 1e-4, 1e-3, 1e-2], |
|
|
'svm__max_iter': [-1, 5000, 7500, 10000], |
|
|
'svm__decision_function_shape': ['ovr', 'ovo'], |
|
|
'svm__cache_size': [200, 400, 600] |
|
|
} |
|
|
|
|
|
random_search = RandomizedSearchCV( |
|
|
estimator=pipeline, |
|
|
param_distributions=param_distributions, |
|
|
n_iter=10, |
|
|
scoring='f1_weighted', |
|
|
cv=3, |
|
|
verbose=2, |
|
|
random_state=42, |
|
|
n_jobs=-1 |
|
|
) |
|
|
|
|
|
random_search.fit(x_train, y_train) |
|
|
|
|
|
pprint(random_search.best_params_) |
|
|
|
|
|
return random_search.best_params_ |
|
|
|
|
|
def randomSearch(x: np.ndarray, y: np.ndarray) -> dict: |
|
|
|
|
|
""" |
|
|
Performs a randomized hyperparameter search for a RandomForestClassifier using the provided feature matrix and labels. |
|
|
Args: |
|
|
x (np.ndarray): Feature matrix of shape (n_samples, n_features). |
|
|
y (np.ndarray): Target labels of shape (n_samples,). |
|
|
Returns: |
|
|
dict: The best hyperparameters found during the randomized search. |
|
|
Notes: |
|
|
- The function encodes the labels, splits the data for training, and uses RandomizedSearchCV to optimize hyperparameters. |
|
|
- The search is performed using weighted F1 score and 3-fold cross-validation. |
|
|
- Prints the best parameters found during the search. |
|
|
""" |
|
|
|
|
|
le = LabelEncoder() |
|
|
y_encoded = le.fit_transform(y) |
|
|
|
|
|
x_train, _, y_train, _ = train_test_split(x, y_encoded, test_size=0.33, stratify=y_encoded, random_state=42) |
|
|
classifier : RandomForestClassifier = RandomForestClassifier(random_state=42) |
|
|
|
|
|
param_grid = { |
|
|
'n_estimators': [100, 200, 300, 400, 500], |
|
|
'max_depth': [None, 10, 20, 30, 40, 50], |
|
|
'min_samples_split': [2, 5, 10, 15, 20], |
|
|
'min_samples_leaf': [1, 2, 4, 8, 10], |
|
|
'max_features': ['sqrt', 'log2', None, 0.3, 0.5, 0.7], |
|
|
'bootstrap': [True, False], |
|
|
'criterion': ['gini', 'entropy'], |
|
|
'max_leaf_nodes': [None, 10, 50, 100, 200], |
|
|
'class_weight' : ['balanced'] |
|
|
} |
|
|
|
|
|
rf_random = RandomizedSearchCV(estimator = classifier, |
|
|
param_distributions = param_grid, |
|
|
n_iter= 10, |
|
|
scoring = 'f1_weighted', |
|
|
cv = 3, |
|
|
verbose = 2, |
|
|
n_jobs = -1) |
|
|
|
|
|
rf_random.fit(X = x_train, y = y_train) |
|
|
|
|
|
print('Best Params') |
|
|
pprint(rf_random.best_params_) |
|
|
|
|
|
|
|
|
return rf_random.best_params_ |
|
|
|
|
|
def fetch_uniprot_sequence(uniprot_id: str): |
|
|
|
|
|
""" |
|
|
Fetch the protein sequence for the given UniProt ID. |
|
|
Returns the raw amino-acid sequence as a string. |
|
|
Args: |
|
|
uniprot_id: UniProt ID to fetch the sequence for. |
|
|
Returns: |
|
|
str: Amino-acid sequence in FASTA format. |
|
|
Raises: |
|
|
HTTPError: If the request to UniProt fails. |
|
|
Note: |
|
|
This function first tries to fetch the sequence from the standard UniProt endpoint. |
|
|
If that fails, it falls back to the UniSave endpoint. |
|
|
If both fail, it returns None and prints an error message. |
|
|
""" |
|
|
|
|
|
url = f"https://rest.uniprot.org/uniprotkb/{uniprot_id}.fasta" |
|
|
response = requests.get(url, timeout=60) |
|
|
|
|
|
if response.status_code == 200: |
|
|
try: |
|
|
|
|
|
fasta_io = StringIO(response.text) |
|
|
record = SeqIO.read(fasta_io, "fasta") |
|
|
return str(record.seq) |
|
|
|
|
|
except ValueError: |
|
|
|
|
|
url = f"https://rest.uniprot.org/unisave/{uniprot_id}.fasta" |
|
|
response = requests.get(url, timeout=60) |
|
|
|
|
|
if response.status_code == 200: |
|
|
try: |
|
|
|
|
|
entries = re.split(r"(?=>)", response.text.strip()) |
|
|
fasta_io = StringIO(entries[1]) |
|
|
record = SeqIO.read(fasta_io, "fasta") |
|
|
return str(record.seq) |
|
|
except ValueError: |
|
|
print(f'No se pudo obtener la entrada FASTA para {uniprot_id} desde UniSave') |
|
|
else: |
|
|
print(f'UniSave URL inválido: {url}') |
|
|
else: |
|
|
print(f'URL inválido o no accesible: {url}') |
|
|
|
|
|
def fetch_refseq_sequence(refseq_id: str) -> str | None: |
|
|
""" |
|
|
Fetch the protein sequence for the given RefSeq ID using NCBI Entrez. |
|
|
Returns the raw amino-acid sequence as a string, or None on failure. |
|
|
""" |
|
|
|
|
|
Entrez.email = "" |
|
|
|
|
|
Entrez.api_key = "" |
|
|
|
|
|
|
|
|
try: |
|
|
handle = Entrez.efetch( |
|
|
db="protein", |
|
|
id=refseq_id, |
|
|
rettype="fasta", |
|
|
retmode="text" |
|
|
) |
|
|
record = SeqIO.read(handle, "fasta") |
|
|
handle.close() |
|
|
return str(record.seq) |
|
|
|
|
|
except (URLLibHTTPError, ValueError) as e: |
|
|
|
|
|
print(f"[Entrez] Failed for `{refseq_id}`: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
url = f"https://www.rcsb.org/fasta/entry/{refseq_id}" |
|
|
response = requests.get(url, timeout=60) |
|
|
response.raise_for_status() |
|
|
fasta_io = StringIO(response.text) |
|
|
record = SeqIO.read(fasta_io, "fasta") |
|
|
return str(record.seq) |
|
|
|
|
|
except (RequestsHTTPError, ValueError) as e2: |
|
|
print(f"[RCSB] Failed for `{refseq_id}`: {e2}") |
|
|
|
|
|
|
|
|
return None |
|
|
|
|
|
def _fetch_sequence_for_row(idx, row): |
|
|
""" |
|
|
Helper to fetch sequence for a single row. Returns (idx, sequence). |
|
|
""" |
|
|
sequence = None |
|
|
|
|
|
swiss_id = row.get('SwissProt_ID') |
|
|
if swiss_id and not pd.isna(swiss_id): |
|
|
try: |
|
|
sequence = fetch_uniprot_sequence(swiss_id) |
|
|
except (URLLibHTTPError, RequestsHTTPError) as e: |
|
|
print(f"Warning: SwissProt fetch failed for {swiss_id} with HTTP {e}") |
|
|
sequence = None |
|
|
|
|
|
|
|
|
if not sequence and row.get('Refseq_Accession') and not pd.isna(row['Refseq_Accession']): |
|
|
try: |
|
|
sequence = fetch_refseq_sequence(row['Refseq_Accession']) |
|
|
except (URLLibHTTPError, RequestsHTTPError) as e: |
|
|
print(f"Warning: RefSeq fetch failed for {row['Refseq_Accession']} with HTTP {e}") |
|
|
sequence = None |
|
|
|
|
|
|
|
|
if not sequence and row.get('Other_Accession') and not pd.isna(row['Other_Accession']): |
|
|
try: |
|
|
sequence = fetch_refseq_sequence(row['Other_Accession']) |
|
|
except (URLLibHTTPError, RequestsHTTPError) as e: |
|
|
print(f"Warning: RefSeq fetch failed for {row['Other_Accession']} with HTTP {e}") |
|
|
sequence = None |
|
|
|
|
|
return idx, sequence |
|
|
|
|
|
|
|
|
def fetch_sequences_for_dataframe(df: pd.DataFrame) -> pd.DataFrame: |
|
|
""" |
|
|
Add a 'sequence' column to the dataframe by fetching sequences from |
|
|
SwissProt or RefSeq based on available IDs, processing rows sequentially. |
|
|
|
|
|
Args: |
|
|
df: Input DataFrame with ID columns. |
|
|
|
|
|
Returns: |
|
|
DataFrame with added 'sequence' column. |
|
|
""" |
|
|
result_df = df.copy() |
|
|
if 'sequence' not in result_df.columns: |
|
|
result_df['sequence'] = None |
|
|
|
|
|
total_rows = len(result_df) |
|
|
|
|
|
for idx, row in tqdm(result_df.iterrows(), total=total_rows, desc="Retrieving sequences", unit="row"): |
|
|
_, seq = _fetch_sequence_for_row(idx, row) |
|
|
result_df.at[idx, 'sequence'] = seq |
|
|
|
|
|
print("Sequence retrieval complete") |
|
|
success_count = result_df['sequence'].notna().sum() |
|
|
print(f"Successfully retrieved {success_count} out of {total_rows} sequences " |
|
|
f"({round(success_count / total_rows * 100, 2)}%)") |
|
|
return result_df |
|
|
|
|
|
|
|
|
|
|
|
def esm_embed(model: ESMC, |
|
|
seq : str, |
|
|
acc : str, |
|
|
device : torch.device = torch.device( |
|
|
'cuda' if torch.cuda.is_available() |
|
|
else 'cpu' |
|
|
)) -> Optional[np.ndarray]: |
|
|
|
|
|
""" |
|
|
Generates an embedding for a given protein sequence using an ESM model. |
|
|
Args: |
|
|
model (ESMC): The ESM model used for encoding and generating embeddings. |
|
|
seq (str): The amino acid sequence of the protein. |
|
|
acc (str): The accession identifier for the protein (used for error reporting). |
|
|
device (torch.device, optional): The device to run the computation on. Defaults to CUDA if available, otherwise CPU. |
|
|
Returns: |
|
|
Optional[np.ndarray]: The embedding vector for the protein sequence, or None if embedding could not be generated. |
|
|
Raises: |
|
|
ESMProteinError: If there is an error during protein encoding or embedding generation. |
|
|
Side Effects: |
|
|
Displays an error message using `messagebox.showerror` if an error occurs during processing. |
|
|
""" |
|
|
|
|
|
protein : ESMProtein = ESMProtein(sequence = seq) |
|
|
protein_tensor = model.encode(protein).to(device) |
|
|
|
|
|
if isinstance(protein_tensor, ESMProteinError): |
|
|
messagebox.showerror("Error", f"Error processing {acc}: {protein_tensor}") |
|
|
raise protein_tensor |
|
|
|
|
|
try: |
|
|
|
|
|
output : LogitsOutput = model.logits(protein_tensor, |
|
|
LogitsConfig(sequence=True, |
|
|
return_embeddings=True)) |
|
|
|
|
|
if output is not None and output.embeddings is not None: |
|
|
arr_output : np.ndarray = output.embeddings.cpu().numpy() |
|
|
|
|
|
if len(arr_output.shape) == 3: |
|
|
arr_output = arr_output.squeeze(axis=0).mean(axis=0) |
|
|
|
|
|
elif len(arr_output.shape) == 2: |
|
|
arr_output = arr_output.mean(axis=0) |
|
|
|
|
|
return arr_output |
|
|
except (ESMProteinError, RuntimeError) as e: |
|
|
messagebox.showerror("Error", f"Error processing {acc}: {e}") |
|
|
return |
|
|
|
|
|
def predict_with_esm(fasta_path : str, |
|
|
model : Literal['esmc_600m', 'esmc_300m'], |
|
|
device : torch.device = torch.device('cuda' if torch.cuda.is_available() |
|
|
else 'cpu') |
|
|
) -> None: |
|
|
|
|
|
""" |
|
|
Predicts protein subcellular localization using ESM embeddings and a pre-trained random forest model. |
|
|
This function performs the following steps: |
|
|
1. Prompts the user to select an output directory via a dialog. |
|
|
2. Validates the provided FASTA file path. |
|
|
3. Extracts sequences and their IDs from the FASTA file. |
|
|
4. Displays a progress bar while generating ESM embeddings for each sequence. |
|
|
5. Loads a pre-trained random forest model for prediction. |
|
|
6. Predicts class probabilities for each sequence embedding. |
|
|
7. Sorts and saves the predictions to a text file in the selected output directory. |
|
|
8. Displays sample predictions in the console. |
|
|
Args: |
|
|
fasta_path (str): Path to the input FASTA file containing protein sequences. |
|
|
model (Literal['esmc_600m', 'esmc_300m']): Name of the ESM model to use for embedding. |
|
|
device (torch.device, optional): Device to run the model on (CPU or CUDA). Defaults to CUDA if available. |
|
|
Returns: |
|
|
None |
|
|
Raises: |
|
|
Shows error dialogs for invalid input or missing files. |
|
|
Prints errors to the console if the random forest model file is not found. |
|
|
Side Effects: |
|
|
- Opens file dialogs and message boxes. |
|
|
- Creates and updates a Tkinter progress bar window. |
|
|
- Saves prediction results to a text file. |
|
|
- Prints sample predictions to the console. |
|
|
""" |
|
|
|
|
|
output_dir = filedialog.askdirectory(title="Select output directory") |
|
|
if not output_dir: |
|
|
return |
|
|
|
|
|
if fasta_path is None or not os.path.exists(fasta_path): |
|
|
messagebox.showerror("Error", "Invalid FASTA file path.") |
|
|
return |
|
|
|
|
|
result = fasta_to_seq(fasta_path) |
|
|
if result is None: |
|
|
messagebox.showerror("Error", "No sequences found in FASTA file.") |
|
|
return |
|
|
seq, ids = result |
|
|
total = len(seq) |
|
|
|
|
|
|
|
|
root = tk.Tk() |
|
|
root.withdraw() |
|
|
|
|
|
progress_win = tk.Toplevel(root) |
|
|
progress_win.title("Embedding Progress") |
|
|
progress_label = tk.Label(progress_win, text="Embedding sequences...") |
|
|
progress_label.pack(padx=10, pady=5) |
|
|
progress = ttk.Progressbar(progress_win, length=300, mode='determinate', maximum=total) |
|
|
progress.pack(padx=10, pady=10) |
|
|
|
|
|
client: ESMC = ESMC.from_pretrained(model).to(device) |
|
|
embeddings = {} |
|
|
for i, (sequence, acc) in enumerate(zip(seq, ids)): |
|
|
emb = esm_embed(model=client, |
|
|
seq=sequence, |
|
|
acc=acc, |
|
|
device=device) |
|
|
if emb is not None: |
|
|
embeddings[acc] = emb |
|
|
|
|
|
|
|
|
progress['value'] = i + 1 |
|
|
progress_win.update_idletasks() |
|
|
|
|
|
progress_label.config(text="Embedding complete!") |
|
|
tk.Button(progress_win, text="Close", command=progress_win.destroy).pack(pady=5) |
|
|
|
|
|
|
|
|
messagebox.showinfo("Info", "Loading SVM for predictions...") |
|
|
project_root: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) |
|
|
model_path = os.path.join(project_root, 'Models/ESMC-300m_svm.joblib' |
|
|
if |
|
|
model == 'esmc_300m' |
|
|
else 'Models/ESMC-600m_svm.joblib') |
|
|
le_path = os.path.join(project_root, 'Models/esm_300m_le_svm.joblib' |
|
|
if |
|
|
model == 'esmc_300m' |
|
|
else 'Models/ESMC-600m_le_svm.joblib') |
|
|
|
|
|
try: |
|
|
predictor = load(model_path) |
|
|
le: LabelEncoder = load(le_path) |
|
|
|
|
|
except FileNotFoundError: |
|
|
print(f"Error: Could not find the model file '{model_path}'") |
|
|
return |
|
|
|
|
|
sequence_ids = list(embeddings.keys()) |
|
|
x = np.array(list(embeddings.values())) |
|
|
messagebox.showinfo("Info", "Making predictions...") |
|
|
y_pred_proba = predictor.predict_proba(x) |
|
|
|
|
|
|
|
|
if hasattr(predictor, 'classes_'): |
|
|
class_names = le.inverse_transform(predictor.classes_) |
|
|
else: |
|
|
class_names = [f"Class_{i}" for i in range(y_pred_proba.shape[1])] |
|
|
|
|
|
predictions_dict = {} |
|
|
for i, seq_id in enumerate(sequence_ids): |
|
|
class_prob_pairs = sorted(zip(class_names, y_pred_proba[i]), key=lambda x: x[1], reverse=True) |
|
|
sorted_classes, sorted_probs = zip(*class_prob_pairs) |
|
|
predictions_dict[seq_id] = (list(sorted_classes), list(sorted_probs)) |
|
|
|
|
|
|
|
|
input_filename = f"{os.path.splitext(os.path.basename(fasta_path))[0]}_predictions.txt" |
|
|
output_file = filedialog.asksaveasfilename(title="Save Predictions", |
|
|
initialdir=output_dir, |
|
|
initialfile=input_filename, |
|
|
defaultextension=".txt", |
|
|
filetypes=[("Text files", "*.txt"), ("All files", "*.*")] |
|
|
) |
|
|
|
|
|
print(f"Saving predictions to {output_file}...") |
|
|
save_predictions_to_txt(predictions_dict, output_file) |
|
|
print("Predictions saved successfully!") |
|
|
print(f"Total sequences processed: {len(embeddings)}") |
|
|
|
|
|
print("\nSample predictions:") |
|
|
for i, (seq_id, (classes, probs)) in enumerate(list(predictions_dict.items())[:3]): |
|
|
pred_str = ", ".join([f"{cls} ({prob:.4f})" for cls, prob in zip(classes, probs)]) |
|
|
print(f"{seq_id}: {pred_str}") |
|
|
|
|
|
def prost_embed_sequence(seq : str, |
|
|
acc : str, |
|
|
tokenizer : T5Tokenizer, |
|
|
model : PreTrainedModel, |
|
|
device : torch.device = torch.device( |
|
|
'cuda:0' |
|
|
if torch.cuda.is_available() |
|
|
else 'cpu' |
|
|
))-> Optional[np.ndarray]: |
|
|
|
|
|
""" |
|
|
Embeds a protein sequence using the ProstT5 model and returns the averaged embedding as a NumPy array. |
|
|
Args: |
|
|
seq (str): The amino acid sequence to embed. Non-standard amino acids (U, Z, O, B) are replaced with 'X'. |
|
|
acc (str): Accession or identifier for the sequence, used for logging. |
|
|
device (torch.device, optional): The device to run the model on. Defaults to CUDA if available, otherwise CPU. |
|
|
Returns: |
|
|
Optional[np.ndarray]: The averaged embedding vector for the input sequence, or None if an error occurs. |
|
|
Notes: |
|
|
- Uses half-precision on GPU and full precision on CPU for efficiency. |
|
|
- Returns None and prints an error message if the sequence is too short or if a runtime/value error occurs. |
|
|
""" |
|
|
|
|
|
model = model.to(device) |
|
|
model = model.half() if str(device) != 'cpu' else model.float() |
|
|
|
|
|
seq = re.sub(r"[UZOB]", "X", seq) |
|
|
seq = " ".join(list(seq)) |
|
|
|
|
|
try: |
|
|
|
|
|
ids = tokenizer(seq, add_special_tokens=True, return_tensors='pt') |
|
|
|
|
|
|
|
|
ids = {k: v.to(device) for k, v in ids.items()} |
|
|
|
|
|
with torch.no_grad(): |
|
|
|
|
|
with torch.no_grad(): |
|
|
embedding_repr = model( |
|
|
ids['input_ids'], |
|
|
attention_mask=ids['attention_mask'] |
|
|
) |
|
|
|
|
|
real_len = ids['attention_mask'][0].sum().item() - 1 |
|
|
|
|
|
if real_len <= 0: |
|
|
print(f"Sequence too short after tokenization for {acc}") |
|
|
|
|
|
|
|
|
emb = embedding_repr.last_hidden_state[0, 1:real_len] |
|
|
emb_avg = emb.mean(dim=0).cpu().numpy() |
|
|
|
|
|
return emb_avg |
|
|
|
|
|
except RuntimeError as e: |
|
|
print(f"RuntimeError while processing {acc}: {e}") |
|
|
return None |
|
|
except ValueError as e: |
|
|
print(f"ValueError while processing {acc}: {e}") |
|
|
return None |
|
|
|
|
|
def fasta_to_seq(fasta_file: str) -> Optional[tuple[list[str], list[str]]]: |
|
|
""" |
|
|
Reads a FASTA file and extracts the sequences as a list of strings. |
|
|
Args: |
|
|
fasta_file (str): Path to the FASTA file to be read. |
|
|
Returns: |
|
|
list[str]: A list containing the sequences from the FASTA file as strings. |
|
|
Returns an empty list if there is an error reading the file. |
|
|
Raises: |
|
|
ValueError: If the file cannot be parsed as FASTA. |
|
|
""" |
|
|
|
|
|
sequences = [] |
|
|
ids = [] |
|
|
|
|
|
with open(fasta_file, 'r', encoding='utf-8') as f: |
|
|
|
|
|
try: |
|
|
|
|
|
for record in SeqIO.parse(f, "fasta"): |
|
|
sequences.append(str(record.seq)) |
|
|
ids.append(str(record.id)) |
|
|
return sequences, ids |
|
|
|
|
|
except ValueError as e: |
|
|
print(f"Error reading {fasta_file}: {e}") |
|
|
return None |
|
|
|
|
|
def save_predictions_to_txt(predictions_dict: dict[str, tuple[list[str], list[float]]], |
|
|
output_file: str) -> None: |
|
|
""" |
|
|
Save predictions to a text file in the specified format. |
|
|
|
|
|
Args: |
|
|
predictions_dict: Dictionary with sequence_id as key and (class_names, probabilities) as value |
|
|
output_file: Path to the output text file |
|
|
""" |
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
f.write("Sequence_ID,Prediction 1,Prediction 2,Prediction 3,Prediction 4,Prediction 5,Prediction 6\n") |
|
|
|
|
|
for seq_id, (class_names, probabilities) in predictions_dict.items(): |
|
|
|
|
|
|
|
|
class_prob_pairs = list(zip(class_names, probabilities)) |
|
|
class_prob_pairs.sort(key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
|
pred_strings = [f"{cls} ({prob:.4f})" for cls, prob in zip(class_names, probabilities)] |
|
|
pred_line = ",".join(pred_strings) |
|
|
|
|
|
f.write(f"{seq_id},{pred_line}\n") |
|
|
|
|
|
def predict_with_prost(fasta_path: str): |
|
|
""" |
|
|
Function to embed sequences from a provided FASTA file using ProstT5 and predict locations. |
|
|
""" |
|
|
if not fasta_path or not os.path.exists(fasta_path): |
|
|
print("Invalid FASTA file path.") |
|
|
return |
|
|
|
|
|
|
|
|
root = tk.Tk() |
|
|
root.withdraw() |
|
|
|
|
|
output_dir = filedialog.askdirectory(title="Select output directory") |
|
|
if not output_dir: |
|
|
return |
|
|
|
|
|
result = fasta_to_seq(fasta_path) |
|
|
if result is None: |
|
|
messagebox.showerror("Error", "No sequences found in FASTA file.") |
|
|
return |
|
|
|
|
|
sequences, ids = result |
|
|
total = len(sequences) |
|
|
|
|
|
|
|
|
progress_win = tk.Toplevel(root) |
|
|
progress_win.title("Embedding Progress") |
|
|
progress_label = tk.Label(progress_win, text="Embedding sequences...") |
|
|
progress_label.pack(padx=10, pady=5) |
|
|
progress = ttk.Progressbar(progress_win, length=300, mode='determinate', maximum=total) |
|
|
progress.pack(padx=10, pady=10) |
|
|
|
|
|
|
|
|
try: |
|
|
progress_label.config(text="Loading ProstT5 model...") |
|
|
progress_win.update_idletasks() |
|
|
|
|
|
tokenizer = T5Tokenizer.from_pretrained("Rostlab/ProstT5", do_lower_case=False, legacy=True) |
|
|
model = T5EncoderModel.from_pretrained("Rostlab/ProstT5") |
|
|
|
|
|
progress_label.config(text="Model loaded successfully! Embedding sequences...") |
|
|
progress_win.update_idletasks() |
|
|
|
|
|
except RuntimeError as e: |
|
|
progress_win.destroy() |
|
|
if "Cannot allocate memory" in str(e): |
|
|
messagebox.showerror( |
|
|
"Memory Error", |
|
|
"Insufficient memory to load ProstT5 model.\n\n" |
|
|
"Please try:\n" |
|
|
"1. Close other applications\n" |
|
|
"2. Restart your computer\n" |
|
|
"3. Clear the model cache:\n" |
|
|
" rm -rf ~/.cache/huggingface/hub/models--Rostlab--ProstT5/" |
|
|
) |
|
|
else: |
|
|
messagebox.showerror("Runtime Error", f"Error loading model: {str(e)}") |
|
|
return |
|
|
|
|
|
embeddings = {} |
|
|
|
|
|
for i, (seq, acc) in enumerate(zip(sequences, ids)): |
|
|
emb = prost_embed_sequence(seq, acc, tokenizer, model) |
|
|
if emb is not None: |
|
|
embeddings[acc] = emb |
|
|
|
|
|
|
|
|
progress['value'] = i + 1 |
|
|
progress_win.update_idletasks() |
|
|
|
|
|
progress_label.config(text="Embedding complete!") |
|
|
tk.Button(progress_win, text="Close", command=progress_win.destroy).pack(pady=5) |
|
|
|
|
|
|
|
|
messagebox.showinfo("Info", "Loading SVM model for predictions...") |
|
|
project_root: str = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) |
|
|
model_path = os.path.join(project_root, 'Models/Prost T5_svm.joblib') |
|
|
le_path = os.path.join(project_root, 'Models/Prost T5_le_svm.joblib') |
|
|
|
|
|
try: |
|
|
predictor = load(model_path) |
|
|
le : LabelEncoder = load(le_path) |
|
|
|
|
|
except FileNotFoundError: |
|
|
print(f"Error: Could not find the model file '{model_path}'") |
|
|
return |
|
|
|
|
|
sequence_ids = list(embeddings.keys()) |
|
|
x = np.array(list(embeddings.values())) |
|
|
|
|
|
print("Making predictions...") |
|
|
y_pred_proba = predictor.predict_proba(x) |
|
|
|
|
|
|
|
|
if hasattr(predictor, 'classes_'): |
|
|
class_names = le.inverse_transform(predictor.classes_) |
|
|
else: |
|
|
class_names = [f"Class_{i}" for i in range(y_pred_proba.shape[1])] |
|
|
|
|
|
predictions_dict = {} |
|
|
for i, seq_id in enumerate(sequence_ids): |
|
|
class_prob_pairs = sorted(zip(class_names, y_pred_proba[i]), key=lambda x: x[1], reverse=True) |
|
|
sorted_classes, sorted_probs = zip(*class_prob_pairs) |
|
|
predictions_dict[seq_id] = (list(sorted_classes), list(sorted_probs)) |
|
|
|
|
|
|
|
|
input_filename = f"{os.path.splitext(os.path.basename(fasta_path))[0]}_predictions.txt" |
|
|
output_file = filedialog.asksaveasfilename(title="Save Predictions", |
|
|
initialdir=output_dir, |
|
|
initialfile=input_filename, |
|
|
defaultextension=".txt", |
|
|
filetypes=[("Text files", "*.txt"), ("All files", "*.*")] |
|
|
) |
|
|
|
|
|
print(f"Saving predictions to {output_file}...") |
|
|
save_predictions_to_txt(predictions_dict, output_file) |
|
|
print("Predictions saved successfully!") |
|
|
print(f"Total sequences processed: {len(embeddings)}") |
|
|
|
|
|
print("\nSample predictions:") |
|
|
for i, (seq_id, (classes, probs)) in enumerate(list(predictions_dict.items())[:3]): |
|
|
pred_str = ", ".join([f"{cls} ({prob:.4f})" for cls, prob in zip(classes, probs)]) |
|
|
print(f"{seq_id}: {pred_str}") |
|
|
|
|
|
|