|
|
|
|
|
""" |
|
|
MD Simulation Pipeline - Flask Backend |
|
|
Provides API endpoints for protein processing and file generation |
|
|
""" |
|
|
|
|
|
from flask import Flask, request, jsonify, send_file, render_template, send_from_directory, Response, stream_with_context |
|
|
from flask_cors import CORS |
|
|
import os |
|
|
import sys |
|
|
import json |
|
|
import tempfile |
|
|
import zipfile |
|
|
from pathlib import Path |
|
|
import requests |
|
|
import subprocess |
|
|
import time |
|
|
from Bio.PDB import PDBParser, PDBList |
|
|
import logging |
|
|
import html |
|
|
from collections import defaultdict |
|
|
from .structure_preparation import ( |
|
|
prepare_structure, |
|
|
parse_structure_info, |
|
|
extract_original_residue_info, |
|
|
restore_residue_info_in_pdb, |
|
|
sanity_check_ligand_pdb, |
|
|
merge_protein_and_ligand, |
|
|
) |
|
|
from .Fill_missing_residues import ( |
|
|
get_pdb_id_from_pdb_file, |
|
|
detect_missing_residues, |
|
|
get_chain_sequences, |
|
|
run_esmfold, |
|
|
rebuild_pdb_with_esmfold, |
|
|
write_fasta_for_missing_chains, |
|
|
trim_residues_from_edges, |
|
|
trim_chains_sequences |
|
|
) |
|
|
|
|
|
_BASE = Path(__file__).parent |
|
|
app = Flask(__name__, |
|
|
template_folder=str(_BASE / "html"), |
|
|
static_folder=str(_BASE), |
|
|
static_url_path="") |
|
|
CORS(app) |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
OUTPUT_DIR = Path.cwd() / "output" |
|
|
|
|
|
def clean_and_create_output_folder(): |
|
|
"""Clean existing output folder and create a new one""" |
|
|
try: |
|
|
print(f"DEBUG: Starting cleanup. OUTPUT_DIR = {OUTPUT_DIR}") |
|
|
print(f"DEBUG: OUTPUT_DIR.exists() = {OUTPUT_DIR.exists()}") |
|
|
|
|
|
|
|
|
if OUTPUT_DIR.exists(): |
|
|
import shutil |
|
|
print(f"DEBUG: Removing existing output folder: {OUTPUT_DIR}") |
|
|
shutil.rmtree(OUTPUT_DIR) |
|
|
print(f"DEBUG: Successfully removed output folder") |
|
|
logger.info(f"Removed existing output folder: {OUTPUT_DIR}") |
|
|
|
|
|
|
|
|
print(f"DEBUG: Creating new output folder: {OUTPUT_DIR}") |
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
print(f"DEBUG: Successfully created output folder") |
|
|
logger.info(f"Created new output folder: {OUTPUT_DIR}") |
|
|
|
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"DEBUG: Error in cleanup: {str(e)}") |
|
|
logger.error(f"Error cleaning output folder: {str(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
def _ensure_docking_folder(): |
|
|
"""Ensure the docking output folder exists and return its Path.""" |
|
|
docking_dir = OUTPUT_DIR / "docking" |
|
|
docking_dir.mkdir(parents=True, exist_ok=True) |
|
|
return docking_dir |
|
|
|
|
|
|
|
|
def _minimize_esmfold_chains_streaming(pdb_id, chains_to_minimize, original_for_align=None): |
|
|
""" |
|
|
Minimize ESMFold-generated chains using AMBER with streaming logs. |
|
|
Yields log messages in real-time. |
|
|
After removing hydrogens, the minimized chain is superimposed to the |
|
|
original (true crystal) structure so it stays in the same coordinate |
|
|
frame as the ligand and the rest of the system. |
|
|
|
|
|
Args: |
|
|
pdb_id: PDB ID (e.g., '1KE5') |
|
|
chains_to_minimize: List of chain IDs to minimize (e.g., ['A', 'B']) |
|
|
original_for_align: Path to the true original PDB for superimposition. |
|
|
Use 0_original_input_backup.pdb when it exists (true crystal), |
|
|
else 0_original_input.pdb. If None, this is computed automatically. |
|
|
|
|
|
Yields: |
|
|
Log messages as formatted SSE strings |
|
|
""" |
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
if original_for_align is None: |
|
|
backup = OUTPUT_DIR / "0_original_input_backup.pdb" |
|
|
original_for_align = backup if backup.exists() else (OUTPUT_DIR / "0_original_input.pdb") |
|
|
|
|
|
for chain in chains_to_minimize: |
|
|
try: |
|
|
yield _format_log(f" Preparing minimization for chain {chain}...") |
|
|
|
|
|
|
|
|
esmfold_pdb = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.pdb" |
|
|
if not esmfold_pdb.exists(): |
|
|
yield _format_log(f" ❌ ESMFold PDB not found for chain {chain}: {esmfold_pdb}", 'error') |
|
|
continue |
|
|
|
|
|
tleap_in = OUTPUT_DIR / f"tleap_{chain}.in" |
|
|
with open(tleap_in, 'w') as f: |
|
|
f.write("source leaprc.protein.ff14SB\n") |
|
|
f.write(f"protein = loadpdb {esmfold_pdb.resolve()}\n") |
|
|
f.write(f"saveamberparm protein {pdb_id}_chain_{chain}_esmfold.prmtop {pdb_id}_chain_{chain}_esmfold.inpcrd\n") |
|
|
f.write("quit\n") |
|
|
|
|
|
|
|
|
yield _format_log(f" Running tleap for chain {chain}...") |
|
|
prmtop = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.prmtop" |
|
|
inpcrd = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.inpcrd" |
|
|
|
|
|
if not prmtop.exists() or not inpcrd.exists(): |
|
|
cmd = ["tleap", "-f", str(tleap_in)] |
|
|
process = subprocess.Popen( |
|
|
cmd, |
|
|
cwd=str(OUTPUT_DIR), |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.STDOUT, |
|
|
text=True, |
|
|
bufsize=1 |
|
|
) |
|
|
|
|
|
for line in iter(process.stdout.readline, ''): |
|
|
if line.strip(): |
|
|
yield _format_log(f" {line.strip()}") |
|
|
|
|
|
process.wait() |
|
|
if process.returncode != 0 or not prmtop.exists(): |
|
|
yield _format_log(f" ❌ tleap failed for chain {chain}", 'error') |
|
|
continue |
|
|
|
|
|
yield _format_log(f" ✅ tleap completed for chain {chain}") |
|
|
|
|
|
|
|
|
min_in = OUTPUT_DIR / f"min_{chain}.in" |
|
|
with open(min_in, 'w') as f: |
|
|
f.write("#Two-stage minimization: sidechains first\n") |
|
|
f.write(" &cntrl\n") |
|
|
f.write(" imin=1, maxcyc=300, ncyc=150,\n") |
|
|
f.write(" ntb=0, cut=10.0, igb=1\n") |
|
|
f.write(" /\n") |
|
|
|
|
|
|
|
|
yield _format_log(f" Running energy minimization (sander) for chain {chain}...") |
|
|
min_out = OUTPUT_DIR / f"min_{chain}.out" |
|
|
min_rst = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized.rst" |
|
|
|
|
|
cmd = [ |
|
|
"sander", |
|
|
"-O", |
|
|
"-i", str(min_in), |
|
|
"-o", str(min_out), |
|
|
"-p", str(prmtop), |
|
|
"-c", str(inpcrd), |
|
|
"-r", str(min_rst) |
|
|
] |
|
|
|
|
|
|
|
|
process = subprocess.Popen( |
|
|
cmd, |
|
|
cwd=str(OUTPUT_DIR), |
|
|
stdout=subprocess.DEVNULL, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True, |
|
|
) |
|
|
|
|
|
|
|
|
for _ in range(100): |
|
|
if min_out.exists(): |
|
|
break |
|
|
time.sleep(0.1) |
|
|
|
|
|
last_pos = 0 |
|
|
buffer = "" |
|
|
while True: |
|
|
if min_out.exists(): |
|
|
try: |
|
|
with open(min_out, "r") as f: |
|
|
f.seek(last_pos) |
|
|
new = f.read() |
|
|
last_pos = f.tell() |
|
|
buffer += new |
|
|
while "\n" in buffer: |
|
|
line, buffer = buffer.split("\n", 1) |
|
|
if line.strip(): |
|
|
yield _format_log(f" {line.strip()}") |
|
|
except (IOError, OSError): |
|
|
pass |
|
|
|
|
|
ret = process.poll() |
|
|
if ret is not None: |
|
|
|
|
|
if min_out.exists(): |
|
|
try: |
|
|
with open(min_out, "r") as f: |
|
|
f.seek(last_pos) |
|
|
new = f.read() |
|
|
buffer += new |
|
|
while "\n" in buffer: |
|
|
line, buffer = buffer.split("\n", 1) |
|
|
if line.strip(): |
|
|
yield _format_log(f" {line.strip()}") |
|
|
if buffer.strip(): |
|
|
yield _format_log(f" {buffer.strip()}") |
|
|
except (IOError, OSError): |
|
|
pass |
|
|
break |
|
|
time.sleep(0.2) |
|
|
|
|
|
process.wait() |
|
|
if process.returncode != 0 and process.stderr: |
|
|
err = process.stderr.read() |
|
|
if err.strip(): |
|
|
yield _format_log(f" stderr: {err.strip()}", "error") |
|
|
|
|
|
if process.returncode != 0 or not min_rst.exists(): |
|
|
yield _format_log(f" ❌ sander minimization failed for chain {chain}", 'error') |
|
|
continue |
|
|
|
|
|
yield _format_log(f" ✅ Minimization completed for chain {chain}") |
|
|
|
|
|
|
|
|
yield _format_log(f" Converting minimized structure to PDB (ambpdb) for chain {chain}...") |
|
|
min_pdb = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized.pdb" |
|
|
with open(min_pdb, 'w') as f: |
|
|
cmd = [ |
|
|
"ambpdb", |
|
|
"-p", str(prmtop), |
|
|
"-c", str(min_rst) |
|
|
] |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
stdout=f, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True, |
|
|
) |
|
|
|
|
|
if result.returncode != 0 or not min_pdb.exists(): |
|
|
yield _format_log(f" ❌ ambpdb failed for chain {chain}: {result.stderr}", 'error') |
|
|
continue |
|
|
|
|
|
yield _format_log(f" ✅ PDB conversion completed for chain {chain}") |
|
|
|
|
|
|
|
|
yield _format_log(f" Removing hydrogens using PyMOL for chain {chain}...") |
|
|
min_pdb_noH = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized_noH.pdb" |
|
|
do_superimpose = original_for_align.exists() |
|
|
if do_superimpose: |
|
|
yield _format_log(f" Superimposing minimized chain to original (true crystal) frame...") |
|
|
try: |
|
|
import tempfile |
|
|
|
|
|
superimpose_block = "" |
|
|
if do_superimpose: |
|
|
superimpose_block = f""" |
|
|
cmd.load("{original_for_align.resolve()}", "orig_ref") |
|
|
cmd.align("min_chain_{chain} and name CA", "orig_ref and chain {chain} and name CA") |
|
|
cmd.delete("orig_ref") |
|
|
""" |
|
|
pymol_script = tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) |
|
|
pymol_script.write(f""" |
|
|
from pymol import cmd |
|
|
cmd.load("{min_pdb.resolve()}", "min_chain_{chain}") |
|
|
cmd.remove("hydrogens") |
|
|
{superimpose_block} |
|
|
cmd.save("{min_pdb_noH.resolve()}", "min_chain_{chain}") |
|
|
cmd.quit() |
|
|
""") |
|
|
pymol_script.close() |
|
|
|
|
|
result = subprocess.run( |
|
|
["pymol", "-c", "-Q", pymol_script.name], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=60 |
|
|
) |
|
|
|
|
|
if result.returncode != 0 or not min_pdb_noH.exists(): |
|
|
raise Exception(f"PyMOL failed: {result.stderr}") |
|
|
|
|
|
os.unlink(pymol_script.name) |
|
|
yield _format_log(f" ✅ Hydrogens removed for chain {chain}") |
|
|
if do_superimpose: |
|
|
yield _format_log(f" ✅ Minimized chain {chain} superimposed to original frame") |
|
|
except Exception as e: |
|
|
yield _format_log(f" ⚠️ PyMOL hydrogen removal failed, using original: {e}", 'warning') |
|
|
min_pdb_noH = min_pdb |
|
|
|
|
|
|
|
|
|
|
|
yield _format_log(f" ✅ Chain {chain} minimization saved to {min_pdb_noH.name}. It will be merged into 1_protein_no_hydrogens.pdb when you run Prepare Structure.") |
|
|
|
|
|
except Exception as e: |
|
|
yield _format_log(f" ❌ Error minimizing chain {chain}: {str(e)}", 'error') |
|
|
import traceback |
|
|
logger.error(traceback.format_exc()) |
|
|
continue |
|
|
|
|
|
|
|
|
def _minimize_esmfold_chains(pdb_id, chains_to_minimize): |
|
|
""" |
|
|
Minimize ESMFold-generated chains using AMBER. |
|
|
|
|
|
Args: |
|
|
pdb_id: PDB ID (e.g., '1KE5') |
|
|
chains_to_minimize: List of chain IDs to minimize (e.g., ['A', 'B']) |
|
|
|
|
|
Returns: |
|
|
List of successfully minimized chain IDs |
|
|
""" |
|
|
minimized_chains = [] |
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
for chain in chains_to_minimize: |
|
|
try: |
|
|
|
|
|
esmfold_pdb = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.pdb" |
|
|
if not esmfold_pdb.exists(): |
|
|
logger.warning(f"ESMFold PDB not found for chain {chain}: {esmfold_pdb}") |
|
|
continue |
|
|
|
|
|
tleap_in = OUTPUT_DIR / f"tleap_{chain}.in" |
|
|
with open(tleap_in, 'w') as f: |
|
|
f.write("source leaprc.protein.ff14SB\n") |
|
|
f.write(f"protein = loadpdb {esmfold_pdb.resolve()}\n") |
|
|
f.write(f"saveamberparm protein {pdb_id}_chain_{chain}_esmfold.prmtop {pdb_id}_chain_{chain}_esmfold.inpcrd\n") |
|
|
f.write("quit\n") |
|
|
|
|
|
|
|
|
prmtop = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.prmtop" |
|
|
inpcrd = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.inpcrd" |
|
|
|
|
|
if not prmtop.exists() or not inpcrd.exists(): |
|
|
cmd = ["tleap", "-f", str(tleap_in)] |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
cwd=str(OUTPUT_DIR), |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
if result.returncode != 0 or not prmtop.exists(): |
|
|
logger.error(f"tleap failed for chain {chain}: {result.stderr}") |
|
|
continue |
|
|
|
|
|
|
|
|
min_in = OUTPUT_DIR / f"min_{chain}.in" |
|
|
with open(min_in, 'w') as f: |
|
|
f.write("#Two-stage minimization: sidechains first\n") |
|
|
f.write(" &cntrl\n") |
|
|
f.write(" imin=1, maxcyc=300, ncyc=150,\n") |
|
|
f.write(" ntb=0, cut=10.0, igb=1\n") |
|
|
f.write(" /\n") |
|
|
|
|
|
|
|
|
min_out = OUTPUT_DIR / f"min_{chain}.out" |
|
|
min_rst = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized.rst" |
|
|
|
|
|
cmd = [ |
|
|
"sander", |
|
|
"-O", |
|
|
"-i", str(min_in), |
|
|
"-o", str(min_out), |
|
|
"-p", str(prmtop), |
|
|
"-c", str(inpcrd), |
|
|
"-r", str(min_rst) |
|
|
] |
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
cwd=str(OUTPUT_DIR), |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
|
|
|
if result.returncode != 0 or not min_rst.exists(): |
|
|
logger.error(f"sander minimization failed for chain {chain}: {result.stderr}") |
|
|
continue |
|
|
|
|
|
|
|
|
min_pdb = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized.pdb" |
|
|
with open(min_pdb, 'w') as f: |
|
|
cmd = [ |
|
|
"ambpdb", |
|
|
"-p", str(prmtop), |
|
|
"-c", str(min_rst) |
|
|
] |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
stdout=f, |
|
|
stderr=subprocess.PIPE, |
|
|
text=True, |
|
|
) |
|
|
|
|
|
if result.returncode != 0 or not min_pdb.exists(): |
|
|
logger.error(f"ambpdb failed for chain {chain}: {result.stderr}") |
|
|
continue |
|
|
|
|
|
|
|
|
min_pdb_noH = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized_noH.pdb" |
|
|
try: |
|
|
import tempfile |
|
|
pymol_script = tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) |
|
|
pymol_script.write(f""" |
|
|
from pymol import cmd |
|
|
cmd.load("{min_pdb.resolve()}", "min_chain_{chain}") |
|
|
cmd.remove("hydrogens") |
|
|
cmd.save("{min_pdb_noH.resolve()}", "min_chain_{chain}") |
|
|
cmd.quit() |
|
|
""") |
|
|
pymol_script.close() |
|
|
|
|
|
result = subprocess.run( |
|
|
["pymol", "-c", "-Q", pymol_script.name], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=60 |
|
|
) |
|
|
|
|
|
if result.returncode != 0 or not min_pdb_noH.exists(): |
|
|
raise Exception(f"PyMOL failed: {result.stderr}") |
|
|
|
|
|
os.unlink(pymol_script.name) |
|
|
except Exception as e: |
|
|
logger.warning(f"PyMOL hydrogen removal failed for chain {chain}, using original: {e}") |
|
|
|
|
|
min_pdb_noH = min_pdb |
|
|
|
|
|
|
|
|
logger.info(f"Minimized chain {chain} saved to {min_pdb_noH.name}. It will be merged into 1_protein_no_hydrogens.pdb when you run Prepare Structure.") |
|
|
minimized_chains.append(chain) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error minimizing chain {chain}: {str(e)}") |
|
|
import traceback |
|
|
logger.error(traceback.format_exc()) |
|
|
continue |
|
|
|
|
|
return minimized_chains |
|
|
|
|
|
|
|
|
def _replace_chain_in_pdb(target_pdb, chain_id, source_pdb): |
|
|
""" |
|
|
Replace a specific chain in target_pdb with the chain from source_pdb. |
|
|
|
|
|
Args: |
|
|
target_pdb: Path to target PDB file (will be modified) |
|
|
chain_id: Chain ID to replace |
|
|
source_pdb: Path to source PDB file containing the new chain |
|
|
""" |
|
|
|
|
|
with open(target_pdb, 'r') as f: |
|
|
target_lines = f.readlines() |
|
|
|
|
|
|
|
|
with open(source_pdb, 'r') as f: |
|
|
source_lines = f.readlines() |
|
|
|
|
|
|
|
|
filtered_target = [] |
|
|
for line in target_lines: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
if len(line) >= 21: |
|
|
chain = line[21] |
|
|
if chain != chain_id: |
|
|
filtered_target.append(line) |
|
|
else: |
|
|
|
|
|
filtered_target.append(line) |
|
|
|
|
|
|
|
|
source_chain_lines = [] |
|
|
for line in source_lines: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
if len(line) >= 21: |
|
|
chain = line[21] |
|
|
if chain == 'A' or chain == chain_id: |
|
|
|
|
|
new_line = line[:21] + chain_id + line[22:] |
|
|
source_chain_lines.append(new_line) |
|
|
|
|
|
|
|
|
combined = [] |
|
|
for line in filtered_target: |
|
|
if line.startswith('END'): |
|
|
|
|
|
combined.extend(source_chain_lines) |
|
|
combined.append(line) |
|
|
|
|
|
|
|
|
with open(target_pdb, 'w') as f: |
|
|
f.writelines(combined) |
|
|
|
|
|
|
|
|
def _prepare_receptor_for_docking(): |
|
|
""" |
|
|
Prepare receptor files for docking using the procedure in python/docking.py: |
|
|
1. Run tleap on 1_protein_no_hydrogens.pdb to add hydrogens -> protein.pdb |
|
|
2. Run pdb4amber on receptor.pdb -> receptor_fixed.pdb |
|
|
3. Prepare receptor PDBQT with Meeko (mk_prepare_receptor.py) |
|
|
|
|
|
If ESMFold-completed structure is being used, the receptor will include: |
|
|
- Completed chains from ESMFold (for chains that were selected for completion) |
|
|
- Original chains (for chains that were not selected for completion) |
|
|
|
|
|
Returns paths (as Path objects) to receptor PDB and PDBQT. |
|
|
""" |
|
|
docking_dir = _ensure_docking_folder() |
|
|
|
|
|
protein_no_h = OUTPUT_DIR / "1_protein_no_hydrogens.pdb" |
|
|
if not protein_no_h.exists(): |
|
|
raise FileNotFoundError( |
|
|
f"1_protein_no_hydrogens.pdb not found in {OUTPUT_DIR}. " |
|
|
"Please run structure preparation first." |
|
|
) |
|
|
|
|
|
|
|
|
flag_file = OUTPUT_DIR / ".use_completed_structure" |
|
|
complete_structure_path = OUTPUT_DIR / "0_complete_structure.pdb" |
|
|
use_completed = flag_file.exists() and complete_structure_path.exists() |
|
|
|
|
|
if use_completed: |
|
|
logger.info("ESMFold-completed structure is being used for docking receptor preparation") |
|
|
logger.info(f"Completed structure includes: ESMFold-completed chains + original chains not selected for completion") |
|
|
|
|
|
|
|
|
tleap_in = docking_dir / "prepare_receptor.in" |
|
|
receptor_pdb = docking_dir / "receptor.pdb" |
|
|
|
|
|
|
|
|
regenerate_receptor = False |
|
|
if not receptor_pdb.exists(): |
|
|
regenerate_receptor = True |
|
|
elif use_completed and complete_structure_path.exists(): |
|
|
|
|
|
receptor_mtime = receptor_pdb.stat().st_mtime |
|
|
completed_mtime = complete_structure_path.stat().st_mtime |
|
|
protein_mtime = protein_no_h.stat().st_mtime |
|
|
|
|
|
if completed_mtime > receptor_mtime or protein_mtime > receptor_mtime: |
|
|
logger.info("Regenerating receptor: completed structure or protein file is newer") |
|
|
regenerate_receptor = True |
|
|
|
|
|
if regenerate_receptor: |
|
|
|
|
|
if receptor_pdb.exists(): |
|
|
receptor_pdb.unlink() |
|
|
receptor_fixed_path = docking_dir / "receptor_fixed.pdb" |
|
|
if receptor_fixed_path.exists(): |
|
|
receptor_fixed_path.unlink() |
|
|
receptor_pdbqt_path = docking_dir / "receptor.pdbqt" |
|
|
if receptor_pdbqt_path.exists(): |
|
|
receptor_pdbqt_path.unlink() |
|
|
|
|
|
|
|
|
protein_no_h_abs = str(protein_no_h.resolve()) |
|
|
with open(tleap_in, "w") as f: |
|
|
f.write("source leaprc.protein.ff14SB\n") |
|
|
f.write(f"protein = loadpdb {protein_no_h_abs}\n") |
|
|
f.write("savepdb protein receptor.pdb\n") |
|
|
f.write("quit\n") |
|
|
|
|
|
|
|
|
cmd = ["tleap", "-f", tleap_in.name] |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
cwd=docking_dir, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
if result.returncode != 0 or not receptor_pdb.exists(): |
|
|
raise RuntimeError( |
|
|
"Failed to prepare receptor with tleap.\n" |
|
|
f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" |
|
|
) |
|
|
|
|
|
|
|
|
receptor_fixed = docking_dir / "receptor_fixed.pdb" |
|
|
if regenerate_receptor or not receptor_fixed.exists(): |
|
|
cmd = [ |
|
|
"pdb4amber", |
|
|
"-i", |
|
|
str(receptor_pdb), |
|
|
"-o", |
|
|
str(receptor_fixed), |
|
|
] |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
if result.returncode != 0 or not receptor_fixed.exists(): |
|
|
raise RuntimeError( |
|
|
"Failed to run pdb4amber on receptor.\n" |
|
|
f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" |
|
|
) |
|
|
|
|
|
|
|
|
receptor_pdbqt = docking_dir / "receptor.pdbqt" |
|
|
if regenerate_receptor or not receptor_pdbqt.exists(): |
|
|
cmd = [ |
|
|
"mk_prepare_receptor.py", |
|
|
"-i", |
|
|
str(receptor_fixed), |
|
|
"-o", |
|
|
"receptor", |
|
|
"-p", |
|
|
] |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
cwd=docking_dir, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
if result.returncode != 0 or not receptor_pdbqt.exists(): |
|
|
|
|
|
error_text = result.stderr + result.stdout |
|
|
needs_minimization = ( |
|
|
"excess inter-residue bond" in error_text or |
|
|
("Expected" in error_text and "paddings" in error_text) |
|
|
) |
|
|
|
|
|
if needs_minimization: |
|
|
|
|
|
min_status_file = OUTPUT_DIR / ".chains_minimized" |
|
|
minimized_chains = [] |
|
|
if min_status_file.exists(): |
|
|
with open(min_status_file, 'r') as f: |
|
|
content = f.read().strip() |
|
|
minimized_chains = content.split(',') if content else [] |
|
|
|
|
|
error_msg = ( |
|
|
"Failed to prepare receptor PDBQT with Meeko due to clashes/disulfide bonds.\n\n" |
|
|
) |
|
|
|
|
|
if not minimized_chains: |
|
|
error_msg += ( |
|
|
"⚠️ ESMFold-generated chains need energy minimization.\n" |
|
|
"Please go back to the 'Fill Missing Residues' step and:\n" |
|
|
"1. Check the 'Energy minimize ESMFold-generated chains' option\n" |
|
|
"2. Select the chains you want to minimize\n" |
|
|
"3. Rebuild the completed structure\n" |
|
|
"4. Then try docking again.\n\n" |
|
|
) |
|
|
else: |
|
|
error_msg += ( |
|
|
f"Some chains were minimized ({', '.join(minimized_chains)}), but the error persists.\n" |
|
|
"You may need to minimize additional chains or check the structure.\n\n" |
|
|
) |
|
|
|
|
|
error_msg += f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" |
|
|
raise RuntimeError(error_msg) |
|
|
else: |
|
|
raise RuntimeError( |
|
|
"Failed to prepare receptor PDBQT with Meeko.\n" |
|
|
f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" |
|
|
) |
|
|
|
|
|
return receptor_fixed, receptor_pdbqt |
|
|
|
|
|
|
|
|
def _compute_ligand_center(pdb_path: Path): |
|
|
"""Compute geometric center of all atoms in a ligand PDB using MDAnalysis.""" |
|
|
try: |
|
|
import MDAnalysis as mda |
|
|
import numpy as np |
|
|
except Exception as e: |
|
|
raise RuntimeError( |
|
|
"MDAnalysis and NumPy are required for docking but could not be imported." |
|
|
) from e |
|
|
|
|
|
u = mda.Universe(str(pdb_path)) |
|
|
if u.atoms.n_atoms == 0: |
|
|
raise ValueError(f"No atoms found in ligand file {pdb_path}") |
|
|
coords = u.atoms.positions.astype(float) |
|
|
center = coords.mean(axis=0) |
|
|
return float(center[0]), float(center[1]), float(center[2]) |
|
|
|
|
|
|
|
|
def _prepare_docked_pose_as_ligand(original_ligand: Path, pose_pdb: Path): |
|
|
""" |
|
|
Take a docked pose PDB and sanitize it so it can replace the original ligand: |
|
|
- Restore original residue name, chain ID, and residue index |
|
|
- Run ligand sanity checks (CONECT/MASTER removal, ATOM->HETATM, distinct atom names) |
|
|
This updates the pose_pdb in place. |
|
|
""" |
|
|
if not original_ligand.exists(): |
|
|
raise FileNotFoundError(f"Original ligand file not found: {original_ligand}") |
|
|
if not pose_pdb.exists(): |
|
|
raise FileNotFoundError(f"Docked pose file not found: {pose_pdb}") |
|
|
|
|
|
residue_info = extract_original_residue_info(str(original_ligand)) |
|
|
if residue_info: |
|
|
restore_residue_info_in_pdb( |
|
|
str(pose_pdb), |
|
|
residue_info.get("resname", "LIG"), |
|
|
residue_info.get("chain_id", ""), |
|
|
residue_info.get("resnum", "1"), |
|
|
) |
|
|
|
|
|
if not sanity_check_ligand_pdb(str(pose_pdb)): |
|
|
raise RuntimeError(f"Sanity check failed for docked pose {pose_pdb}") |
|
|
|
|
|
|
|
|
def _sanitize_docked_pose_for_antechamber(pose_pdb: Path, original_residue_info: dict): |
|
|
""" |
|
|
Sanitize a docked pose PDB to make it compatible with antechamber: |
|
|
1. Remove CONECT/MASTER/REMARK records |
|
|
2. Convert all ATOM records to HETATM |
|
|
3. Restore original residue name, chain ID, and residue number |
|
|
4. Make atom names distinct (C1, C2, N1, N2, O1, O2, H1, H2, etc.) |
|
|
|
|
|
PDB Column format (1-indexed): |
|
|
1-6: RECORD (HETATM) |
|
|
7-11: ATOM # (atom serial number, right-justified) |
|
|
12: Blank |
|
|
13-16: ATOM NAME (right-justified for 1-2 char elements) |
|
|
17: RES ALT (alternate location indicator, usually blank) |
|
|
18-20: RES NAME (right-justified) |
|
|
21: Blank |
|
|
22: CHN ID (chain identifier) |
|
|
23-26: RES# (residue sequence number, right-justified) |
|
|
27: Insertion code (usually blank) |
|
|
28-30: Blank (3 spaces) |
|
|
31-38: X coordinate (8 chars, %8.3f) |
|
|
39-46: Y coordinate (8 chars, %8.3f) |
|
|
47-54: Z coordinate (8 chars, %8.3f) |
|
|
55-60: OCC (occupancy, 6 chars) |
|
|
61-66: TEMP (temperature factor, 6 chars) |
|
|
67-76: Blank (10 spaces) |
|
|
77-78: ELEMENT (right-justified) |
|
|
79-80: Charge (e.g., 1+, 1-, 2+) |
|
|
|
|
|
Args: |
|
|
pose_pdb: Path to the docked pose PDB file (modified in place) |
|
|
original_residue_info: Dict with 'resname', 'chain_id', 'resnum' from original ligand |
|
|
""" |
|
|
if not pose_pdb.exists(): |
|
|
raise FileNotFoundError(f"Docked pose file not found: {pose_pdb}") |
|
|
|
|
|
|
|
|
resname = original_residue_info.get("resname", "LIG") if original_residue_info else "LIG" |
|
|
chain_id = original_residue_info.get("chain_id", "A") if original_residue_info else "A" |
|
|
resnum = original_residue_info.get("resnum", "1") if original_residue_info else "1" |
|
|
|
|
|
|
|
|
resname = resname[:3].upper() |
|
|
chain_id = chain_id[0] if chain_id else "A" |
|
|
|
|
|
|
|
|
with open(pose_pdb, 'r') as f: |
|
|
lines = f.readlines() |
|
|
|
|
|
|
|
|
from collections import defaultdict |
|
|
element_counts = defaultdict(int) |
|
|
|
|
|
processed_lines = [] |
|
|
atom_serial = 0 |
|
|
|
|
|
for line in lines: |
|
|
|
|
|
if line.startswith(('CONECT', 'MASTER', 'REMARK', 'COMPND', 'AUTHOR', 'TER', 'HEADER', 'TITLE', 'CRYST1')): |
|
|
continue |
|
|
|
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
atom_serial += 1 |
|
|
|
|
|
|
|
|
padded_line = line.ljust(80) |
|
|
|
|
|
|
|
|
try: |
|
|
x = float(padded_line[30:38].strip()) |
|
|
y = float(padded_line[38:46].strip()) |
|
|
z = float(padded_line[46:54].strip()) |
|
|
except ValueError: |
|
|
continue |
|
|
|
|
|
|
|
|
element = padded_line[76:78].strip() |
|
|
charge = padded_line[78:80].strip() |
|
|
|
|
|
|
|
|
if element and len(element) > 2: |
|
|
import re |
|
|
match = re.match(r'^([A-Za-z]{1,2})(\d*[+-])$', element) |
|
|
if match: |
|
|
element = match.group(1).upper() |
|
|
charge = match.group(2) |
|
|
|
|
|
|
|
|
if not element: |
|
|
atom_name = padded_line[12:16].strip() |
|
|
if len(atom_name) >= 1: |
|
|
|
|
|
if len(atom_name) >= 2 and atom_name[:2].upper() in ['CL', 'BR', 'MG', 'ZN', 'FE', 'CU', 'MN']: |
|
|
element = atom_name[:2].upper() |
|
|
else: |
|
|
|
|
|
for c in atom_name: |
|
|
if c.isalpha(): |
|
|
element = c.upper() |
|
|
break |
|
|
if not element: |
|
|
element = 'X' |
|
|
|
|
|
|
|
|
element = element.upper() |
|
|
|
|
|
|
|
|
element_counts[element] += 1 |
|
|
count = element_counts[element] |
|
|
|
|
|
|
|
|
atom_name_str = f"{element}{count}" |
|
|
if len(atom_name_str) > 4: |
|
|
atom_name_str = atom_name_str[:4] |
|
|
|
|
|
|
|
|
|
|
|
new_line = ( |
|
|
f"HETATM" |
|
|
f"{atom_serial:5d}" |
|
|
f" " |
|
|
f"{atom_name_str:>4}" |
|
|
f" " |
|
|
f"{resname:>3}" |
|
|
f" " |
|
|
f"{chain_id}" |
|
|
f"{resnum:>4}" |
|
|
f" " |
|
|
f"{x:8.3f}" |
|
|
f"{y:8.3f}" |
|
|
f"{z:8.3f}" |
|
|
f" 1.00" |
|
|
f" 0.00" |
|
|
f" " |
|
|
f"{element:>2}" |
|
|
f"{charge:<2}" |
|
|
f"\n" |
|
|
) |
|
|
processed_lines.append(new_line) |
|
|
elif line.startswith('END'): |
|
|
continue |
|
|
|
|
|
|
|
|
processed_lines.append('END\n') |
|
|
|
|
|
|
|
|
with open(pose_pdb, 'w') as f: |
|
|
f.writelines(processed_lines) |
|
|
|
|
|
logger.info(f"Sanitized docked pose {pose_pdb}: resname={resname}, chain={chain_id}, resnum={resnum}, atoms={atom_serial}") |
|
|
logger.info(f"Element counts: {dict(element_counts)}") |
|
|
|
|
|
|
|
|
def _parse_vina_config(config_path: Path): |
|
|
""" |
|
|
Parse Vina config file and return a dict with parameters. |
|
|
Returns None if file doesn't exist or can't be parsed. |
|
|
""" |
|
|
if not config_path.exists(): |
|
|
return None |
|
|
|
|
|
config = {} |
|
|
try: |
|
|
for line in config_path.read_text().split('\n'): |
|
|
line = line.strip() |
|
|
|
|
|
if not line or line.startswith('#'): |
|
|
continue |
|
|
|
|
|
|
|
|
if '=' in line: |
|
|
key, value = line.split('=', 1) |
|
|
key = key.strip() |
|
|
value = value.strip() |
|
|
|
|
|
|
|
|
try: |
|
|
if '.' in value: |
|
|
config[key] = float(value) |
|
|
else: |
|
|
config[key] = int(value) |
|
|
except ValueError: |
|
|
config[key] = value |
|
|
|
|
|
return config |
|
|
except Exception as e: |
|
|
logger.warning(f"Error parsing config file {config_path}: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def _parse_vina_log(log_path: Path): |
|
|
""" |
|
|
Parse AutoDock Vina log file and extract binding energies per mode. |
|
|
Returns dict: {mode_index: energy_kcal_mol} |
|
|
""" |
|
|
energies = {} |
|
|
if not log_path.exists(): |
|
|
return energies |
|
|
|
|
|
try: |
|
|
import re |
|
|
|
|
|
with log_path.open("r") as f: |
|
|
for line in f: |
|
|
|
|
|
|
|
|
m = re.match(r"^\s*(\d+)\s+(-?\d+\.\d+)", line) |
|
|
if m: |
|
|
mode = int(m.group(1)) |
|
|
energy = float(m.group(2)) |
|
|
energies[mode] = energy |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not parse Vina log {log_path}: {e}") |
|
|
|
|
|
return energies |
|
|
|
|
|
class MDSimulationGenerator: |
|
|
"""Handles MD simulation file generation and protein processing""" |
|
|
|
|
|
def __init__(self): |
|
|
self.pdb_parser = PDBParser(QUIET=True) |
|
|
self.pdb_list = PDBList() |
|
|
|
|
|
def fetch_pdb_structure(self, pdb_id): |
|
|
"""Fetch PDB structure from RCSB""" |
|
|
try: |
|
|
|
|
|
pdb_file = self.pdb_list.retrieve_pdb_file(pdb_id, pdir=OUTPUT_DIR, file_format='pdb') |
|
|
return str(pdb_file) |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching PDB {pdb_id}: {str(e)}") |
|
|
raise |
|
|
|
|
|
def parse_pdb_structure(self, pdb_file): |
|
|
"""Parse PDB file and extract structure information""" |
|
|
try: |
|
|
structure = self.pdb_parser.get_structure('protein', pdb_file) |
|
|
|
|
|
|
|
|
atom_count = 0 |
|
|
chains = set() |
|
|
residues = set() |
|
|
|
|
|
for model in structure: |
|
|
for chain in model: |
|
|
chains.add(chain.id) |
|
|
for residue in chain: |
|
|
if residue.id[0] == ' ': |
|
|
residues.add(f"{residue.resname}{residue.id[1]}") |
|
|
for atom in residue: |
|
|
atom_count += 1 |
|
|
|
|
|
return { |
|
|
'atom_count': atom_count, |
|
|
'chains': list(chains), |
|
|
'residue_count': len(residues), |
|
|
'structure_id': Path(pdb_file).stem.upper() |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing PDB file: {str(e)}") |
|
|
raise |
|
|
|
|
|
def generate_mdp_file(self, params, step_type='production'): |
|
|
"""Generate GROMACS MDP file for different simulation steps""" |
|
|
|
|
|
if step_type == 'restrained_min': |
|
|
return f"""; Restrained Minimization Parameters |
|
|
integrator = steep |
|
|
nsteps = {params['steps']['restrainedMin']['steps']} |
|
|
emstep = 0.01 |
|
|
emtol = 1000 |
|
|
|
|
|
; Position restraints |
|
|
define = -DPOSRES |
|
|
refcoord_scaling = com |
|
|
|
|
|
; Output control |
|
|
nstxout = 100 |
|
|
nstenergy = 100 |
|
|
nstlog = 100 |
|
|
|
|
|
; Bond parameters |
|
|
constraint_algorithm = lincs |
|
|
constraints = h-bonds |
|
|
|
|
|
; Neighbor searching |
|
|
cutoff-scheme = Verlet |
|
|
ns_type = grid |
|
|
nstlist = 10 |
|
|
rlist = {params['cutoff']} |
|
|
|
|
|
; Electrostatics |
|
|
coulombtype = PME |
|
|
rcoulomb = {params['cutoff']} |
|
|
pme_order = {params['pmeOrder']} |
|
|
|
|
|
; Van der Waals |
|
|
vdwtype = Cut-off |
|
|
rvdw = {params['cutoff']} |
|
|
""" |
|
|
|
|
|
elif step_type == 'minimization': |
|
|
return f"""; Minimization Parameters |
|
|
integrator = {params['steps']['minimization']['algorithm']} |
|
|
nsteps = {params['steps']['minimization']['steps']} |
|
|
emstep = 0.01 |
|
|
emtol = 1000 |
|
|
|
|
|
; Output control |
|
|
nstxout = 100 |
|
|
nstenergy = 100 |
|
|
nstlog = 100 |
|
|
|
|
|
; Bond parameters |
|
|
constraint_algorithm = lincs |
|
|
constraints = h-bonds |
|
|
|
|
|
; Neighbor searching |
|
|
cutoff-scheme = Verlet |
|
|
ns_type = grid |
|
|
nstlist = 10 |
|
|
rlist = {params['cutoff']} |
|
|
|
|
|
; Electrostatics |
|
|
coulombtype = PME |
|
|
rcoulomb = {params['cutoff']} |
|
|
pme_order = {params['pmeOrder']} |
|
|
|
|
|
; Van der Waals |
|
|
vdwtype = Cut-off |
|
|
rvdw = {params['cutoff']} |
|
|
""" |
|
|
|
|
|
elif step_type == 'nvt': |
|
|
return f"""; NVT Equilibration Parameters |
|
|
integrator = md |
|
|
dt = {params['timestep']} |
|
|
nsteps = {params['steps']['nvt']['steps']} |
|
|
|
|
|
; Output control |
|
|
nstxout = 5000 |
|
|
nstvout = 5000 |
|
|
nstenergy = 1000 |
|
|
nstlog = 1000 |
|
|
|
|
|
; Bond parameters |
|
|
constraint_algorithm = lincs |
|
|
constraints = h-bonds |
|
|
lincs_iter = 1 |
|
|
lincs_order = 4 |
|
|
|
|
|
; Neighbor searching |
|
|
cutoff-scheme = Verlet |
|
|
ns_type = grid |
|
|
nstlist = 40 |
|
|
rlist = {params['cutoff']} |
|
|
|
|
|
; Electrostatics |
|
|
coulombtype = PME |
|
|
rcoulomb = {params['cutoff']} |
|
|
pme_order = {params['pmeOrder']} |
|
|
|
|
|
; Van der Waals |
|
|
vdwtype = Cut-off |
|
|
rvdw = {params['cutoff']} |
|
|
|
|
|
; Temperature coupling |
|
|
tcoupl = {params['couplingType']} |
|
|
tc-grps = Protein Non-Protein |
|
|
tau_t = 0.1 0.1 |
|
|
ref_t = {params['steps']['nvt']['temperature']} {params['steps']['nvt']['temperature']} |
|
|
|
|
|
; Pressure coupling (disabled for NVT) |
|
|
pcoupl = no |
|
|
|
|
|
; Velocity generation |
|
|
gen_vel = yes |
|
|
gen_temp = {params['steps']['nvt']['temperature']} |
|
|
gen_seed = -1 |
|
|
""" |
|
|
|
|
|
elif step_type == 'npt': |
|
|
return f"""; NPT Equilibration Parameters |
|
|
integrator = md |
|
|
dt = {params['timestep']} |
|
|
nsteps = {params['steps']['npt']['steps']} |
|
|
|
|
|
; Output control |
|
|
nstxout = 5000 |
|
|
nstvout = 5000 |
|
|
nstenergy = 1000 |
|
|
nstlog = 1000 |
|
|
|
|
|
; Bond parameters |
|
|
constraint_algorithm = lincs |
|
|
constraints = h-bonds |
|
|
lincs_iter = 1 |
|
|
lincs_order = 4 |
|
|
|
|
|
; Neighbor searching |
|
|
cutoff-scheme = Verlet |
|
|
ns_type = grid |
|
|
nstlist = 40 |
|
|
rlist = {params['cutoff']} |
|
|
|
|
|
; Electrostatics |
|
|
coulombtype = PME |
|
|
rcoulomb = {params['cutoff']} |
|
|
pme_order = {params['pmeOrder']} |
|
|
|
|
|
; Van der Waals |
|
|
vdwtype = Cut-off |
|
|
rvdw = {params['cutoff']} |
|
|
|
|
|
; Temperature coupling |
|
|
tcoupl = {params['couplingType']} |
|
|
tc-grps = Protein Non-Protein |
|
|
tau_t = 0.1 0.1 |
|
|
ref_t = {params['steps']['npt']['temperature']} {params['steps']['npt']['temperature']} |
|
|
|
|
|
; Pressure coupling |
|
|
pcoupl = {params['couplingType']} |
|
|
pcoupltype = isotropic |
|
|
tau_p = 2.0 |
|
|
ref_p = {params['steps']['npt']['pressure']} |
|
|
compressibility = 4.5e-5 |
|
|
|
|
|
; Velocity generation |
|
|
gen_vel = no |
|
|
""" |
|
|
|
|
|
else: |
|
|
return f"""; MD Simulation Parameters |
|
|
; Generated by MD Simulation Pipeline |
|
|
|
|
|
; Run parameters |
|
|
integrator = md |
|
|
dt = {params['timestep']} |
|
|
nsteps = {params['steps']['production']['steps']} |
|
|
|
|
|
; Output control |
|
|
nstxout = 5000 |
|
|
nstvout = 5000 |
|
|
nstenergy = 1000 |
|
|
nstlog = 1000 |
|
|
|
|
|
; Bond parameters |
|
|
constraint_algorithm = lincs |
|
|
constraints = h-bonds |
|
|
lincs_iter = 1 |
|
|
lincs_order = 4 |
|
|
|
|
|
; Neighbor searching |
|
|
cutoff-scheme = Verlet |
|
|
ns_type = grid |
|
|
nstlist = 40 |
|
|
rlist = {params['cutoff']} |
|
|
|
|
|
; Electrostatics |
|
|
coulombtype = PME |
|
|
rcoulomb = {params['cutoff']} |
|
|
pme_order = {params['pmeOrder']} |
|
|
fourierspacing = 0.16 |
|
|
|
|
|
; Van der Waals |
|
|
vdwtype = Cut-off |
|
|
rvdw = {params['cutoff']} |
|
|
|
|
|
; Temperature coupling |
|
|
tcoupl = {params['couplingType']} |
|
|
tc-grps = Protein Non-Protein |
|
|
tau_t = 0.1 0.1 |
|
|
ref_t = {params['temperature']} {params['temperature']} |
|
|
|
|
|
; Pressure coupling |
|
|
pcoupl = {params['couplingType']} |
|
|
pcoupltype = isotropic |
|
|
tau_p = 2.0 |
|
|
ref_p = {params['pressure']} |
|
|
compressibility = 4.5e-5 |
|
|
|
|
|
; Dispersion correction |
|
|
DispCorr = EnerPres |
|
|
|
|
|
; Velocity generation |
|
|
gen_vel = yes |
|
|
gen_temp = {params['temperature']} |
|
|
gen_seed = -1 |
|
|
""" |
|
|
|
|
|
def generate_pbs_script(self, protein_name, params): |
|
|
"""Generate PBS script for HPC submission""" |
|
|
total_steps = params['steps']['production']['steps'] |
|
|
time_in_ns = (total_steps * params['timestep']) / 1000 |
|
|
|
|
|
return f"""#!/bin/bash |
|
|
#PBS -N {protein_name}_md |
|
|
#PBS -l nodes=1:ppn=16 |
|
|
#PBS -l walltime=24:00:00 |
|
|
#PBS -q normal |
|
|
#PBS -j oe |
|
|
|
|
|
# Change to the directory where the job was submitted |
|
|
cd $PBS_O_WORKDIR |
|
|
|
|
|
# Load required modules |
|
|
module load gromacs/2023.2 |
|
|
module load intel/2021.4.0 |
|
|
|
|
|
# Set up environment |
|
|
export OMP_NUM_THREADS=16 |
|
|
export GMX_MAXBACKUP=-1 |
|
|
|
|
|
# Simulation parameters |
|
|
PROTEIN={protein_name} |
|
|
STEPS={total_steps} |
|
|
TIME_NS={time_in_ns:.2f} |
|
|
|
|
|
echo "Starting MD simulation for $PROTEIN" |
|
|
echo "Total simulation time: $TIME_NS ns" |
|
|
echo "Job started at: $(date)" |
|
|
|
|
|
# Run the simulation |
|
|
./run_simulation.sh $PROTEIN |
|
|
|
|
|
echo "Simulation completed at: $(date)" |
|
|
echo "Results saved in output directory" |
|
|
""" |
|
|
|
|
|
def generate_setup_script(self, protein_name, params): |
|
|
"""Generate setup script for MD simulation""" |
|
|
return f"""#!/bin/bash |
|
|
# Setup script for {protein_name} MD simulation |
|
|
# Generated by MD Simulation Pipeline |
|
|
|
|
|
set -e |
|
|
|
|
|
PROTEIN={protein_name} |
|
|
FORCE_FIELD={params['forceField']} |
|
|
WATER_MODEL={params['waterModel']} |
|
|
|
|
|
echo "Setting up MD simulation for $PROTEIN" |
|
|
|
|
|
# Create output directory |
|
|
mkdir -p output |
|
|
|
|
|
# 1. Prepare protein structure |
|
|
echo "Preparing protein structure..." |
|
|
gmx pdb2gmx -f $PROTEIN.pdb -o $PROTEIN_processed.gro -p $PROTEIN.top -ff $FORCE_FIELD -water $WATER_MODEL |
|
|
|
|
|
# 2. Define simulation box |
|
|
echo "Defining simulation box..." |
|
|
gmx editconf -f $PROTEIN_processed.gro -o $PROTEIN_box.gro -c -d {params['boxMargin']} -bt {params['boxType']} |
|
|
|
|
|
# 3. Add solvent |
|
|
echo "Adding solvent..." |
|
|
gmx solvate -cp $PROTEIN_box.gro -cs spc216.gro -o $PROTEIN_solv.gro -p $PROTEIN.top |
|
|
|
|
|
# 4. Add ions |
|
|
echo "Adding ions..." |
|
|
gmx grompp -f $PROTEIN_restrained.mdp -c $PROTEIN_solv.gro -p $PROTEIN.top -o $PROTEIN_ions.tpr |
|
|
echo "SOL" | gmx genion -s $PROTEIN_ions.tpr -o $PROTEIN_final.gro -p $PROTEIN.top -pname NA -nname CL -neutral |
|
|
|
|
|
echo "Setup completed successfully!" |
|
|
echo "Ready to run simulation with: ./run_simulation.sh $PROTEIN" |
|
|
""" |
|
|
|
|
|
def generate_analysis_script(self, protein_name): |
|
|
"""Generate analysis script for MD simulation results""" |
|
|
return f"""#!/bin/bash |
|
|
# Analysis script for {protein_name} MD simulation |
|
|
# Generated by MD Simulation Pipeline |
|
|
|
|
|
PROTEIN={protein_name} |
|
|
|
|
|
echo "Analyzing MD simulation results for $PROTEIN" |
|
|
|
|
|
# Create analysis directory |
|
|
mkdir -p analysis |
|
|
|
|
|
# 1. RMSD analysis |
|
|
echo "Calculating RMSD..." |
|
|
echo "Protein" | gmx rms -s $PROTEIN_final.tpr -f $PROTEIN_prod.xtc -o analysis/$PROTEIN_rmsd.xvg -tu ns |
|
|
|
|
|
# 2. RMSF analysis |
|
|
echo "Calculating RMSF..." |
|
|
echo "Protein" | gmx rmsf -s $PROTEIN_final.tpr -f $PROTEIN_prod.xtc -o analysis/$PROTEIN_rmsf.xvg -res |
|
|
|
|
|
# 3. Radius of gyration |
|
|
echo "Calculating radius of gyration..." |
|
|
echo "Protein" | gmx gyrate -s $PROTEIN_final.tpr -f $PROTEIN_prod.xtc -o analysis/$PROTEIN_gyrate.xvg |
|
|
|
|
|
# 4. Hydrogen bonds |
|
|
echo "Analyzing hydrogen bonds..." |
|
|
echo "Protein" | gmx hbond -s $PROTEIN_final.tpr -f $PROTEIN_prod.xtc -num analysis/$PROTEIN_hbonds.xvg |
|
|
|
|
|
# 5. Energy analysis |
|
|
echo "Analyzing energies..." |
|
|
gmx energy -f $PROTEIN_prod.edr -o analysis/$PROTEIN_energy.xvg |
|
|
|
|
|
# 6. Generate plots |
|
|
echo "Generating analysis plots..." |
|
|
python3 plot_analysis.py $PROTEIN |
|
|
|
|
|
echo "Analysis completed! Results saved in analysis/ directory" |
|
|
""" |
|
|
|
|
|
|
|
|
md_generator = MDSimulationGenerator() |
|
|
|
|
|
@app.route('/api/fetch-pdb', methods=['POST']) |
|
|
def fetch_pdb(): |
|
|
"""Fetch PDB structure from RCSB""" |
|
|
try: |
|
|
print("DEBUG: fetch-pdb endpoint called") |
|
|
data = request.get_json() |
|
|
pdb_id = data.get('pdb_id', '').upper() |
|
|
print(f"DEBUG: pdb_id = {pdb_id}") |
|
|
|
|
|
if not pdb_id or len(pdb_id) != 4: |
|
|
return jsonify({'error': 'Invalid PDB ID'}), 400 |
|
|
|
|
|
|
|
|
print("DEBUG: Calling clean_and_create_output_folder()") |
|
|
if not clean_and_create_output_folder(): |
|
|
return jsonify({'error': 'Failed to clean output folder'}), 500 |
|
|
print("DEBUG: Output folder cleanup completed successfully") |
|
|
|
|
|
|
|
|
pdb_file = md_generator.fetch_pdb_structure(pdb_id) |
|
|
|
|
|
|
|
|
structure_info = md_generator.parse_pdb_structure(pdb_file) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'structure_info': structure_info, |
|
|
'pdb_file': pdb_file |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching PDB: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/proxy-pdb/<pdb_id>', methods=['GET']) |
|
|
def proxy_pdb(pdb_id): |
|
|
"""Proxy endpoint to fetch PDB from RCSB or mirrors (avoids CORS issues)""" |
|
|
try: |
|
|
pdb_id = pdb_id.upper().strip() |
|
|
if not pdb_id or len(pdb_id) != 4: |
|
|
return jsonify({'error': 'Invalid PDB ID'}), 400 |
|
|
|
|
|
|
|
|
urls = [ |
|
|
f"https://files.rcsb.org/download/{pdb_id}.pdb", |
|
|
f"https://www.ebi.ac.uk/pdbe/entry-files/download/pdb{pdb_id.lower()}.ent", |
|
|
] |
|
|
|
|
|
for url in urls: |
|
|
try: |
|
|
print(f"DEBUG: Trying to fetch PDB from {url}") |
|
|
response = requests.get(url, timeout=30) |
|
|
if response.status_code == 200: |
|
|
content = response.text |
|
|
|
|
|
if 'ATOM' in content or 'HETATM' in content: |
|
|
print(f"DEBUG: Successfully fetched PDB from {url}") |
|
|
return Response(content, mimetype='text/plain') |
|
|
except requests.exceptions.RequestException as e: |
|
|
print(f"DEBUG: Failed to fetch from {url}: {e}") |
|
|
continue |
|
|
|
|
|
return jsonify({'error': f'PDB ID {pdb_id} not found or servers unavailable'}), 404 |
|
|
except Exception as e: |
|
|
logger.error(f"Error proxying PDB {pdb_id}: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/get-pdb-content', methods=['GET']) |
|
|
def get_pdb_content(): |
|
|
"""Return the content of a PDB file""" |
|
|
try: |
|
|
file_path = request.args.get('file', '') |
|
|
if not file_path: |
|
|
return jsonify({'success': False, 'error': 'No file path provided'}), 400 |
|
|
|
|
|
|
|
|
file_path = Path(file_path) |
|
|
if not str(file_path.resolve()).startswith(str(OUTPUT_DIR.resolve())): |
|
|
return jsonify({'success': False, 'error': 'Invalid file path'}), 400 |
|
|
|
|
|
if not file_path.exists(): |
|
|
return jsonify({'success': False, 'error': 'File not found'}), 404 |
|
|
|
|
|
content = file_path.read_text() |
|
|
return jsonify({'success': True, 'content': content}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error reading PDB content: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/parse-pdb', methods=['POST']) |
|
|
def parse_pdb(): |
|
|
"""Parse uploaded PDB file""" |
|
|
try: |
|
|
print("DEBUG: parse-pdb endpoint called") |
|
|
if 'file' not in request.files: |
|
|
return jsonify({'error': 'No file uploaded'}), 400 |
|
|
|
|
|
file = request.files['file'] |
|
|
if file.filename == '': |
|
|
return jsonify({'error': 'No file selected'}), 400 |
|
|
|
|
|
print(f"DEBUG: Processing uploaded file: {file.filename}") |
|
|
|
|
|
|
|
|
print("DEBUG: Calling clean_and_create_output_folder()") |
|
|
if not clean_and_create_output_folder(): |
|
|
return jsonify({'error': 'Failed to clean output folder'}), 500 |
|
|
print("DEBUG: Output folder cleanup completed successfully") |
|
|
|
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdb') |
|
|
file.save(temp_file.name) |
|
|
|
|
|
|
|
|
structure_info = md_generator.parse_pdb_structure(temp_file.name) |
|
|
|
|
|
|
|
|
os.unlink(temp_file.name) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'structure_info': structure_info |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing PDB: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/generate-files', methods=['POST']) |
|
|
def generate_files(): |
|
|
"""Generate MD simulation files""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
protein_name = data.get('protein_name', 'protein') |
|
|
simulation_params = data.get('simulation_params', {}) |
|
|
|
|
|
|
|
|
files = {} |
|
|
|
|
|
|
|
|
files[f'{protein_name}.mdp'] = md_generator.generate_mdp_file(simulation_params, 'production') |
|
|
files[f'{protein_name}_restrained.mdp'] = md_generator.generate_mdp_file(simulation_params, 'restrained_min') |
|
|
files[f'{protein_name}_min.mdp'] = md_generator.generate_mdp_file(simulation_params, 'minimization') |
|
|
files[f'{protein_name}_nvt.mdp'] = md_generator.generate_mdp_file(simulation_params, 'nvt') |
|
|
files[f'{protein_name}_npt.mdp'] = md_generator.generate_mdp_file(simulation_params, 'npt') |
|
|
files[f'{protein_name}_prod.mdp'] = md_generator.generate_mdp_file(simulation_params, 'production') |
|
|
|
|
|
|
|
|
files[f'{protein_name}_simulation.pbs'] = md_generator.generate_pbs_script(protein_name, simulation_params) |
|
|
files[f'setup_{protein_name}.sh'] = md_generator.generate_setup_script(protein_name, simulation_params) |
|
|
files[f'analyze_{protein_name}.sh'] = md_generator.generate_analysis_script(protein_name) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'files': files |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error generating files: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/download-zip', methods=['POST']) |
|
|
def download_zip(): |
|
|
"""Download all generated files as a ZIP archive""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
files = data.get('files', {}) |
|
|
|
|
|
|
|
|
temp_zip = tempfile.NamedTemporaryFile(delete=False, suffix='.zip') |
|
|
|
|
|
with zipfile.ZipFile(temp_zip.name, 'w') as zip_file: |
|
|
for filename, content in files.items(): |
|
|
zip_file.writestr(filename, content) |
|
|
|
|
|
return send_file( |
|
|
temp_zip.name, |
|
|
as_attachment=True, |
|
|
download_name='md_simulation_files.zip', |
|
|
mimetype='application/zip' |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating ZIP file: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/get-solvated-protein', methods=['GET']) |
|
|
def get_solvated_protein(): |
|
|
"""Get the solvated protein PDB file content""" |
|
|
try: |
|
|
solvated_file = os.path.join(OUTPUT_DIR, 'protein_solvated.pdb') |
|
|
|
|
|
if not os.path.exists(solvated_file): |
|
|
return jsonify({'success': False, 'error': 'Solvated protein file not found. Please generate files first.'}) |
|
|
|
|
|
with open(solvated_file, 'r') as f: |
|
|
content = f.read() |
|
|
|
|
|
return jsonify({'success': True, 'content': content}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error reading solvated protein file: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}) |
|
|
|
|
|
@app.route('/api/get-viewer-pdb', methods=['GET']) |
|
|
def get_viewer_pdb(): |
|
|
"""Return a single PDB for viewer: start from protein_solvated.pdb and mark ligand residues as HETATM. |
|
|
Ligand residues are detected from 4_ligands_corrected*.pdb files by (resname, chain, resi) tuples; if chains/resi not present, fallback to resname matching. |
|
|
""" |
|
|
try: |
|
|
solvated_path = OUTPUT_DIR / 'protein_solvated.pdb' |
|
|
|
|
|
|
|
|
lig_paths = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) |
|
|
|
|
|
if not lig_paths: |
|
|
single_lig_path = OUTPUT_DIR / '4_ligands_corrected.pdb' |
|
|
if single_lig_path.exists(): |
|
|
lig_paths = [single_lig_path] |
|
|
viewer_out = OUTPUT_DIR / 'viewer_protein_with_ligand.pdb' |
|
|
|
|
|
if not solvated_path.exists(): |
|
|
return jsonify({'success': False, 'error': 'protein_solvated.pdb not found'}), 400 |
|
|
|
|
|
|
|
|
ligand_keys = set() |
|
|
ligand_resnames = set() |
|
|
for lig_path in lig_paths: |
|
|
if lig_path.exists(): |
|
|
with open(lig_path, 'r') as lf: |
|
|
for line in lf: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
resn = line[17:20].strip() |
|
|
chain = line[21:22].strip() |
|
|
resi = line[22:26].strip() |
|
|
ligand_resnames.add(resn) |
|
|
if chain and resi: |
|
|
ligand_keys.add((resn, chain, resi)) |
|
|
|
|
|
|
|
|
out_lines = [] |
|
|
with open(solvated_path, 'r') as sf: |
|
|
for line in sf: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
resn = line[17:20].strip() |
|
|
chain = line[21:22].strip() |
|
|
resi = line[22:26].strip() |
|
|
is_match = False |
|
|
is_ion = resn in { 'NA', 'CL' } |
|
|
if (resn, chain, resi) in ligand_keys: |
|
|
is_match = True |
|
|
elif resn in ligand_resnames: |
|
|
|
|
|
is_match = True |
|
|
if is_match or is_ion: |
|
|
|
|
|
out_lines.append('HETATM' + line[6:]) |
|
|
else: |
|
|
out_lines.append(line) |
|
|
else: |
|
|
out_lines.append(line) |
|
|
|
|
|
|
|
|
try: |
|
|
with open(viewer_out, 'w') as vf: |
|
|
vf.writelines(out_lines) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
return jsonify({'success': True, 'content': ''.join(out_lines)}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating viewer PDB: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}) |
|
|
|
|
|
@app.route('/view-pdb') |
|
|
def view_pdb_html(): |
|
|
"""Serve PDB file as HTML page for instant viewing""" |
|
|
try: |
|
|
viewer_out = OUTPUT_DIR / 'viewer_protein_with_ligand.pdb' |
|
|
solvated_path = OUTPUT_DIR / 'protein_solvated.pdb' |
|
|
|
|
|
|
|
|
lig_paths = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) |
|
|
|
|
|
if not lig_paths: |
|
|
single_lig_path = OUTPUT_DIR / '4_ligands_corrected.pdb' |
|
|
if single_lig_path.exists(): |
|
|
lig_paths = [single_lig_path] |
|
|
|
|
|
|
|
|
if not viewer_out.exists(): |
|
|
if not solvated_path.exists(): |
|
|
return f""" |
|
|
<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<title>Error - PDB Not Found</title> |
|
|
<style> |
|
|
body {{ font-family: Arial, sans-serif; padding: 40px; text-align: center; }} |
|
|
.error {{ color: #dc3545; font-size: 18px; }} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="error"> |
|
|
<h1>PDB File Not Found</h1> |
|
|
<p>Please complete the structure preparation steps first.</p> |
|
|
</div> |
|
|
</body> |
|
|
</html> |
|
|
""", 404 |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
ligand_keys = set() |
|
|
ligand_resnames = set() |
|
|
for lig_path in lig_paths: |
|
|
if lig_path.exists(): |
|
|
with open(lig_path, 'r') as lf: |
|
|
for line in lf: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
resn = line[17:20].strip() |
|
|
chain = line[21:22].strip() |
|
|
resi = line[22:26].strip() |
|
|
ligand_resnames.add(resn) |
|
|
if chain and resi: |
|
|
ligand_keys.add((resn, chain, resi)) |
|
|
|
|
|
|
|
|
out_lines = [] |
|
|
with open(solvated_path, 'r') as sf: |
|
|
for line in sf: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
resn = line[17:20].strip() |
|
|
chain = line[21:22].strip() |
|
|
resi = line[22:26].strip() |
|
|
is_match = False |
|
|
is_ion = resn in { 'NA', 'CL' } |
|
|
if (resn, chain, resi) in ligand_keys: |
|
|
is_match = True |
|
|
elif resn in ligand_resnames: |
|
|
|
|
|
is_match = True |
|
|
if is_match or is_ion: |
|
|
|
|
|
out_lines.append('HETATM' + line[6:]) |
|
|
else: |
|
|
out_lines.append(line) |
|
|
else: |
|
|
out_lines.append(line) |
|
|
|
|
|
|
|
|
with open(viewer_out, 'w') as vf: |
|
|
vf.writelines(out_lines) |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating viewer PDB: {str(e)}") |
|
|
return f""" |
|
|
<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<title>Error</title> |
|
|
<style> |
|
|
body {{ font-family: Arial, sans-serif; padding: 40px; text-align: center; }} |
|
|
.error {{ color: #dc3545; font-size: 18px; }} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="error"> |
|
|
<h1>Error Generating PDB</h1> |
|
|
<p>Could not generate viewer PDB file: {html.escape(str(e))}</p> |
|
|
</div> |
|
|
</body> |
|
|
</html> |
|
|
""", 500 |
|
|
|
|
|
|
|
|
with open(viewer_out, 'r') as f: |
|
|
pdb_content = f.read() |
|
|
|
|
|
|
|
|
escaped_content = html.escape(pdb_content) |
|
|
|
|
|
|
|
|
html_page = f"""<!DOCTYPE html> |
|
|
<html lang="en"> |
|
|
<head> |
|
|
<meta charset="UTF-8"> |
|
|
<meta name="viewport" content="width=device-width, initial-scale=1.0"> |
|
|
<title>Viewer PDB File</title> |
|
|
<style> |
|
|
* {{ |
|
|
margin: 0; |
|
|
padding: 0; |
|
|
box-sizing: border-box; |
|
|
}} |
|
|
body {{ |
|
|
font-family: 'Courier New', monospace; |
|
|
font-size: 12px; |
|
|
line-height: 1.4; |
|
|
background: #f8f9fa; |
|
|
padding: 20px; |
|
|
}} |
|
|
.header {{ |
|
|
background: white; |
|
|
padding: 15px 20px; |
|
|
margin-bottom: 15px; |
|
|
border-radius: 4px; |
|
|
box-shadow: 0 2px 4px rgba(0,0,0,0.1); |
|
|
display: flex; |
|
|
justify-content: space-between; |
|
|
align-items: center; |
|
|
}} |
|
|
.header h1 {{ |
|
|
font-size: 18px; |
|
|
color: #333; |
|
|
}} |
|
|
.pdb-content {{ |
|
|
background: white; |
|
|
padding: 20px; |
|
|
border-radius: 4px; |
|
|
box-shadow: 0 2px 4px rgba(0,0,0,0.1); |
|
|
overflow-x: auto; |
|
|
white-space: pre; |
|
|
word-wrap: normal; |
|
|
}} |
|
|
.info {{ |
|
|
color: #666; |
|
|
font-size: 11px; |
|
|
}} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="header"> |
|
|
<h1>📄 Viewer PDB File</h1> |
|
|
<div class="info">File: viewer_protein_with_ligand.pdb</div> |
|
|
</div> |
|
|
<div class="pdb-content">{escaped_content}</div> |
|
|
</body> |
|
|
</html>""" |
|
|
|
|
|
return html_page, 200, {'Content-Type': 'text/html; charset=utf-8'} |
|
|
except Exception as e: |
|
|
logger.error(f"Error serving PDB as HTML: {str(e)}") |
|
|
return f""" |
|
|
<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<title>Error</title> |
|
|
<style> |
|
|
body {{ font-family: Arial, sans-serif; padding: 40px; text-align: center; }} |
|
|
.error {{ color: #dc3545; font-size: 18px; }} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="error"> |
|
|
<h1>Error Loading PDB</h1> |
|
|
<p>{html.escape(str(e))}</p> |
|
|
</div> |
|
|
</body> |
|
|
</html> |
|
|
""", 500 |
|
|
|
|
|
@app.route('/api/get-corrected-ligands', methods=['GET']) |
|
|
def get_corrected_ligands(): |
|
|
"""Get the corrected ligand PDB file content if present (combines all ligands)""" |
|
|
try: |
|
|
|
|
|
|
|
|
ligand_files = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) |
|
|
|
|
|
if not ligand_files: |
|
|
single_lig_file = OUTPUT_DIR / '4_ligands_corrected.pdb' |
|
|
if single_lig_file.exists(): |
|
|
ligand_files = [single_lig_file] |
|
|
|
|
|
if not ligand_files: |
|
|
|
|
|
return jsonify({'success': True, 'exists': False, 'content': ''}) |
|
|
|
|
|
|
|
|
normalized_lines = [] |
|
|
for ligand_file in ligand_files: |
|
|
with open(ligand_file, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith('ATOM'): |
|
|
|
|
|
normalized_lines.append('HETATM' + line[6:]) |
|
|
elif line.startswith('HETATM'): |
|
|
normalized_lines.append(line) |
|
|
elif line.strip() == 'END' and ligand_file != ligand_files[-1]: |
|
|
|
|
|
continue |
|
|
elif line.strip() and not line.startswith(('CRYST', 'REMARK', 'HEADER')): |
|
|
normalized_lines.append(line) |
|
|
|
|
|
|
|
|
if normalized_lines and not normalized_lines[-1].strip() == 'END': |
|
|
normalized_lines.append('END\n') |
|
|
|
|
|
content = ''.join(normalized_lines) |
|
|
return jsonify({'success': True, 'exists': True, 'content': content}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error reading corrected ligand file: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}) |
|
|
|
|
|
@app.route('/api/get-aligned-ligands', methods=['GET']) |
|
|
def get_aligned_ligands(): |
|
|
"""Return ligand coordinates aligned to protein_solvated.pdb frame using PyMOL transforms.""" |
|
|
try: |
|
|
solvated_file = OUTPUT_DIR / 'protein_solvated.pdb' |
|
|
tleap_ready = OUTPUT_DIR / 'tleap_ready.pdb' |
|
|
|
|
|
|
|
|
ligand_files = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) |
|
|
|
|
|
if not ligand_files: |
|
|
single_lig_file = OUTPUT_DIR / '4_ligands_corrected.pdb' |
|
|
if single_lig_file.exists(): |
|
|
ligand_files = [single_lig_file] |
|
|
|
|
|
if not solvated_file.exists(): |
|
|
return jsonify({'success': False, 'error': 'protein_solvated.pdb not found'}), 400 |
|
|
if not tleap_ready.exists(): |
|
|
return jsonify({'success': False, 'error': 'tleap_ready.pdb not found'}), 400 |
|
|
if not ligand_files: |
|
|
return jsonify({'success': True, 'exists': False, 'content': ''}) |
|
|
|
|
|
|
|
|
ligand_file = ligand_files[0] |
|
|
|
|
|
|
|
|
aligned_lig = OUTPUT_DIR / 'ligand_aligned_for_preview.pdb' |
|
|
try: |
|
|
if aligned_lig.exists(): |
|
|
aligned_lig.unlink() |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
pymol_script = f""" |
|
|
import pymol |
|
|
pymol.finish_launching(['pymol','-qc']) |
|
|
from pymol import cmd |
|
|
cmd.load('{solvated_file.as_posix()}', 'solv') |
|
|
cmd.load('{tleap_ready.as_posix()}', 'prep') |
|
|
cmd.load('{ligand_file.as_posix()}', 'lig') |
|
|
# Align prepared protein to solvated protein; use CA atoms to be robust |
|
|
cmd.align('prep and polymer.protein and name CA', 'solv and polymer.protein and name CA') |
|
|
# Apply same transform implicitly affects 'prep' object; we saved ligand as separate object, so match matrices |
|
|
mat = cmd.get_object_matrix('prep') |
|
|
cmd.set_object_matrix('lig', mat) |
|
|
# Save ligand in aligned frame, as HETATM |
|
|
cmd.alter('lig', 'type="HETATM"') |
|
|
cmd.save('{aligned_lig.as_posix()}', 'lig') |
|
|
cmd.quit() |
|
|
""" |
|
|
|
|
|
|
|
|
result = subprocess.run(['python3', '-c', pymol_script], capture_output=True, text=True, cwd=str(OUTPUT_DIR)) |
|
|
if result.returncode != 0: |
|
|
return jsonify({'success': False, 'error': f'PyMOL alignment failed: {result.stderr}'}), 500 |
|
|
|
|
|
if not aligned_lig.exists(): |
|
|
return jsonify({'success': False, 'error': 'Aligned ligand file was not produced'}), 500 |
|
|
|
|
|
|
|
|
normalized_lines = [] |
|
|
with open(aligned_lig, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith('ATOM'): |
|
|
normalized_lines.append('HETATM' + line[6:]) |
|
|
else: |
|
|
normalized_lines.append(line) |
|
|
content = ''.join(normalized_lines) |
|
|
return jsonify({'success': True, 'exists': True, 'content': content}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error aligning ligands: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/viewer/<filename>') |
|
|
def viewer(filename): |
|
|
"""Serve NGL viewer page""" |
|
|
|
|
|
file_path = OUTPUT_DIR / filename |
|
|
if not file_path.exists(): |
|
|
|
|
|
if filename == 'viewer_protein_with_ligand.pdb': |
|
|
try: |
|
|
|
|
|
result = get_viewer_pdb() |
|
|
if result[1] == 200: |
|
|
pass |
|
|
except: |
|
|
pass |
|
|
|
|
|
return f""" |
|
|
<!DOCTYPE html> |
|
|
<html> |
|
|
<head> |
|
|
<title>NGL Viewer - {filename}</title> |
|
|
<script src="https://cdn.jsdelivr.net/npm/ngl@2.0.0-dev.37/dist/ngl.js"></script> |
|
|
<style> |
|
|
body {{ margin: 0; padding: 0; font-family: Arial, sans-serif; }} |
|
|
#viewport {{ width: 100%; height: 100vh; }} |
|
|
.header {{ background: #f8f9fa; padding: 10px; border-bottom: 1px solid #ddd; }} |
|
|
.controls {{ padding: 10px; background: #f8f9fa; }} |
|
|
.btn {{ padding: 8px 16px; margin: 5px; border: none; border-radius: 4px; cursor: pointer; }} |
|
|
.btn-primary {{ background: #007bff; color: white; }} |
|
|
.btn-secondary {{ background: #6c757d; color: white; }} |
|
|
</style> |
|
|
</head> |
|
|
<body> |
|
|
<div class="header"> |
|
|
<h3>🧬 3D Structure Viewer - {filename}</h3> |
|
|
</div> |
|
|
<div id="viewport"></div> |
|
|
<div class="controls"> |
|
|
<button class="btn btn-primary" onclick="resetView()">Reset View</button> |
|
|
<button class="btn btn-secondary" onclick="toggleRepresentation()">Toggle Style</button> |
|
|
<button class="btn btn-secondary" onclick="toggleSpin()">Toggle Spin</button> |
|
|
</div> |
|
|
<script> |
|
|
let stage; |
|
|
let currentRepresentation = 'cartoon'; |
|
|
let isSpinning = false; |
|
|
|
|
|
async function initViewer() {{ |
|
|
try {{ |
|
|
// Check if file exists first |
|
|
const response = await fetch("/output/{filename}"); |
|
|
if (!response.ok) {{ |
|
|
throw new Error(`File not found: ${{response.status}} ${{response.statusText}}`); |
|
|
}} |
|
|
|
|
|
stage = new NGL.Stage("viewport", {{ backgroundColor: "white" }}); |
|
|
|
|
|
const component = await stage.loadFile("/output/{filename}"); |
|
|
|
|
|
// Add cartoon representation for protein |
|
|
component.addRepresentation("cartoon", {{ |
|
|
sele: "protein", |
|
|
colorScheme: "chainname", |
|
|
opacity: 0.9 |
|
|
}}); |
|
|
|
|
|
// Add ball and stick for water molecules |
|
|
component.addRepresentation("ball+stick", {{ |
|
|
sele: "water", |
|
|
color: "cyan", |
|
|
colorScheme: "uniform", |
|
|
radius: 0.1 |
|
|
}}); |
|
|
|
|
|
// Add ball and stick for ligands |
|
|
component.addRepresentation("ball+stick", {{ |
|
|
sele: "hetero", |
|
|
color: "element", |
|
|
radius: 0.15 |
|
|
}}); |
|
|
|
|
|
stage.autoView(); |
|
|
}} catch (error) {{ |
|
|
console.error('Error loading structure:', error); |
|
|
document.getElementById('viewport').innerHTML = |
|
|
'<div style="padding: 50px; text-align: center; color: #dc3545;">' + |
|
|
'<h3>Error loading structure</h3><p>' + error.message + '</p>' + |
|
|
'<p>Make sure the file exists in the output directory.</p></div>'; |
|
|
}} |
|
|
}} |
|
|
|
|
|
function resetView() {{ |
|
|
if (stage) stage.autoView(); |
|
|
}} |
|
|
|
|
|
function toggleRepresentation() {{ |
|
|
if (!stage) return; |
|
|
const components = stage.compList; |
|
|
if (components.length === 0) return; |
|
|
|
|
|
const component = components[0]; |
|
|
component.removeAllRepresentations(); |
|
|
|
|
|
if (currentRepresentation === 'cartoon') {{ |
|
|
component.addRepresentation("ball+stick", {{ |
|
|
color: "element", |
|
|
radius: 0.15 |
|
|
}}); |
|
|
currentRepresentation = 'ball+stick'; |
|
|
}} else {{ |
|
|
component.addRepresentation("cartoon", {{ |
|
|
sele: "protein", |
|
|
colorScheme: "chainname", |
|
|
opacity: 0.9 |
|
|
}}); |
|
|
component.addRepresentation("ball+stick", {{ |
|
|
sele: "water", |
|
|
color: "cyan", |
|
|
colorScheme: "uniform", |
|
|
radius: 0.1 |
|
|
}}); |
|
|
component.addRepresentation("ball+stick", {{ |
|
|
sele: "hetero", |
|
|
color: "element", |
|
|
radius: 0.15 |
|
|
}}); |
|
|
currentRepresentation = 'cartoon'; |
|
|
}} |
|
|
}} |
|
|
|
|
|
function toggleSpin() {{ |
|
|
if (!stage) return; |
|
|
isSpinning = !isSpinning; |
|
|
stage.setSpin(isSpinning); |
|
|
}} |
|
|
|
|
|
// Initialize when page loads |
|
|
document.addEventListener('DOMContentLoaded', initViewer); |
|
|
</script> |
|
|
</body> |
|
|
</html> |
|
|
""" |
|
|
|
|
|
@app.route('/output/<path:filename>') |
|
|
def serve_output(filename): |
|
|
"""Serve output files""" |
|
|
|
|
|
print(f"Requested file: {filename}") |
|
|
print(f"Full path: {OUTPUT_DIR / filename}") |
|
|
print(f"File exists: {(OUTPUT_DIR / filename).exists()}") |
|
|
print(f"Files in output dir: {list(OUTPUT_DIR.iterdir()) if OUTPUT_DIR.exists() else 'Directory not found'}") |
|
|
|
|
|
if not (OUTPUT_DIR / filename).exists(): |
|
|
abort(404) |
|
|
|
|
|
return send_from_directory(OUTPUT_DIR, filename) |
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
"""Serve the main HTML page""" |
|
|
return render_template('index.html') |
|
|
|
|
|
@app.route('/<path:filename>') |
|
|
def serve_static(filename): |
|
|
"""Serve static files (CSS, JS, etc.)""" |
|
|
return send_from_directory(str(_BASE), filename) |
|
|
|
|
|
@app.route('/api/prepare-structure', methods=['POST']) |
|
|
def prepare_structure_endpoint(): |
|
|
"""Prepare protein structure for AMBER""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
pdb_content = data.get('pdb_content', '') |
|
|
options = data.get('options', {}) |
|
|
|
|
|
|
|
|
flag_file = OUTPUT_DIR / ".use_completed_structure" |
|
|
complete_structure_path = OUTPUT_DIR / "0_complete_structure.pdb" |
|
|
|
|
|
if flag_file.exists() and complete_structure_path.exists(): |
|
|
logger.info("Using superimposed completed structure (0_complete_structure.pdb) for preparation so ligands stay in the same coordinate frame") |
|
|
with open(complete_structure_path, 'r') as f: |
|
|
pdb_content = f.read() |
|
|
elif not pdb_content: |
|
|
return jsonify({'error': 'No PDB content provided and no completed structure found'}), 400 |
|
|
|
|
|
|
|
|
result = prepare_structure(pdb_content, options, output_dir=str(OUTPUT_DIR)) |
|
|
|
|
|
|
|
|
if result.get('error'): |
|
|
logger.error(f"Structure preparation failed: {result['error']}") |
|
|
return jsonify({'error': result['error']}), 400 |
|
|
|
|
|
|
|
|
|
|
|
ligand_name_changes = validate_and_sanitize_all_ligand_files() |
|
|
|
|
|
|
|
|
response_data = { |
|
|
'success': True, |
|
|
'prepared_structure': result['prepared_structure'], |
|
|
'original_atoms': result['original_atoms'], |
|
|
'prepared_atoms': result['prepared_atoms'], |
|
|
'removed_components': result['removed_components'], |
|
|
'added_capping': result['added_capping'], |
|
|
'preserved_ligands': result['preserved_ligands'], |
|
|
'ligand_present': result.get('ligand_present', False), |
|
|
'separate_ligands': result.get('separate_ligands', False), |
|
|
'ligand_content': result.get('ligand_content', ''), |
|
|
'ligand_name_changes': ligand_name_changes |
|
|
} |
|
|
|
|
|
return jsonify(response_data) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error preparing structure: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/parse-structure', methods=['POST']) |
|
|
def parse_structure_endpoint(): |
|
|
"""Parse structure information""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
pdb_content = data.get('pdb_content', '') |
|
|
|
|
|
if not pdb_content: |
|
|
return jsonify({'error': 'No PDB content provided'}), 400 |
|
|
|
|
|
|
|
|
structure_info = parse_structure_info(pdb_content) |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'structure_info': structure_info |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error parsing structure: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/docking/run', methods=['POST']) |
|
|
@stream_with_context |
|
|
def run_docking(): |
|
|
""" |
|
|
Run ligand docking for preserved ligands using Vina and Meeko with streaming logs. |
|
|
All outputs are stored under OUTPUT_DIR/docking. |
|
|
Returns a summary of ligands and available poses (file-based, no contents). |
|
|
""" |
|
|
def generate(): |
|
|
try: |
|
|
docking_dir = _ensure_docking_folder() |
|
|
yield _format_log(f"Working directory: {os.getcwd()}") |
|
|
yield _format_log(f"Output directory: {OUTPUT_DIR}") |
|
|
yield _format_log(f"Docking directory: {docking_dir}") |
|
|
|
|
|
|
|
|
flag_file = OUTPUT_DIR / ".use_completed_structure" |
|
|
complete_structure_path = OUTPUT_DIR / "0_complete_structure.pdb" |
|
|
if flag_file.exists() and complete_structure_path.exists(): |
|
|
yield _format_log("ℹ️ Using ESMFold-completed structure for receptor") |
|
|
yield _format_log(" (Completed chains from ESMFold + original chains not selected for completion)") |
|
|
|
|
|
yield _format_log("Preparing receptor for docking...") |
|
|
receptor_fixed, receptor_pdbqt = _prepare_receptor_for_docking() |
|
|
yield _format_log(f"✅ Receptor prepared: {receptor_pdbqt.name}") |
|
|
|
|
|
|
|
|
data = request.get_json(silent=True) or {} |
|
|
cfg_list = data.get("ligands", []) |
|
|
ligand_configs = {} |
|
|
for cfg in cfg_list: |
|
|
try: |
|
|
idx = int(cfg.get("index", 0)) |
|
|
if idx > 0: |
|
|
ligand_configs[idx] = cfg |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
|
|
|
ligand_files = sorted(OUTPUT_DIR.glob("4_ligands_corrected_obabel_*.pdb")) |
|
|
if not ligand_files: |
|
|
|
|
|
ligand_files = sorted( |
|
|
[f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name] |
|
|
) |
|
|
if not ligand_files: |
|
|
error_msg = 'No corrected ligand PDB files found. Please run structure preparation with preserved ligands.' |
|
|
yield _format_log(error_msg, 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" |
|
|
return |
|
|
|
|
|
yield _format_log(f"Found {len(ligand_files)} ligand file(s) to process") |
|
|
yield _format_log(f"Selected {len(ligand_configs)} ligand(s) for docking") |
|
|
|
|
|
ligands_summary = [] |
|
|
warnings = [] |
|
|
errors = [] |
|
|
|
|
|
for idx, lig_pdb in enumerate(ligand_files, start=1): |
|
|
|
|
|
|
|
|
cfg = ligand_configs.get(idx) |
|
|
if cfg is None: |
|
|
|
|
|
continue |
|
|
if cfg.get("enabled") is False: |
|
|
|
|
|
continue |
|
|
|
|
|
yield _format_log(f"\n{'='*60}") |
|
|
yield _format_log(f"Processing ligand {idx} ({lig_pdb.name})") |
|
|
yield _format_log(f"{'='*60}") |
|
|
|
|
|
lig_dir = docking_dir / f"ligand_{idx}" |
|
|
lig_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
original_copy = lig_dir / "original_ligand.pdb" |
|
|
if not original_copy.exists(): |
|
|
original_copy.write_text(lig_pdb.read_text()) |
|
|
|
|
|
try: |
|
|
|
|
|
yield _format_log(f"Step 1: Converting ligand {idx} from PDB to SDF using OpenBabel...") |
|
|
sdf_path = lig_dir / f"ligand_{idx}.sdf" |
|
|
cmd = [ |
|
|
"obabel", |
|
|
"-i", |
|
|
"pdb", |
|
|
str(lig_pdb), |
|
|
"-o", |
|
|
"sdf", |
|
|
"-O", |
|
|
str(sdf_path), |
|
|
] |
|
|
yield _format_log(f"Running command: {' '.join(cmd)}") |
|
|
|
|
|
|
|
|
process = subprocess.Popen( |
|
|
cmd, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.STDOUT, |
|
|
text=True, |
|
|
bufsize=1, |
|
|
universal_newlines=True |
|
|
) |
|
|
|
|
|
for line in iter(process.stdout.readline, ''): |
|
|
if line: |
|
|
yield _format_log(line.strip()) |
|
|
|
|
|
process.wait() |
|
|
if process.returncode != 0 or not sdf_path.exists(): |
|
|
raise RuntimeError( |
|
|
f"OpenBabel failed for ligand {idx} ({lig_pdb.name}). Return code: {process.returncode}" |
|
|
) |
|
|
yield _format_log(f"✅ OpenBabel conversion successful: {sdf_path.name}") |
|
|
|
|
|
|
|
|
yield _format_log(f"Step 2: Preparing ligand {idx} with Meeko...") |
|
|
lig_pdbqt = lig_dir / f"ligand_{idx}.pdbqt" |
|
|
cmd = [ |
|
|
"mk_prepare_ligand.py", |
|
|
"-i", |
|
|
str(sdf_path), |
|
|
"-o", |
|
|
str(lig_pdbqt), |
|
|
] |
|
|
yield _format_log(f"Running command: {' '.join(cmd)}") |
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
|
|
|
if result.stdout: |
|
|
yield _format_log(result.stdout.strip()) |
|
|
if result.stderr: |
|
|
yield _format_log(result.stderr.strip(), 'warning') |
|
|
|
|
|
if result.returncode != 0 or not lig_pdbqt.exists(): |
|
|
raise RuntimeError( |
|
|
f"Meeko failed for ligand {idx}.\n" |
|
|
f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" |
|
|
) |
|
|
yield _format_log(f"✅ Meeko preparation successful: {lig_pdbqt.name}") |
|
|
|
|
|
|
|
|
yield _format_log(f"Step 3: Reading docking parameters for ligand {idx}...") |
|
|
|
|
|
config_file = lig_dir / "vina_config.txt" |
|
|
config = _parse_vina_config(config_file) if config_file.exists() else None |
|
|
|
|
|
|
|
|
exhaustiveness = 8 |
|
|
num_modes = 9 |
|
|
energy_range = 3 |
|
|
cpu = 0 |
|
|
seed = 0 |
|
|
|
|
|
|
|
|
if config: |
|
|
yield _format_log(f"Reading parameters from config file: {config_file.name}") |
|
|
cx = config.get("center_x", None) |
|
|
cy = config.get("center_y", None) |
|
|
cz = config.get("center_z", None) |
|
|
sx = config.get("size_x", 18.0) |
|
|
sy = config.get("size_y", 18.0) |
|
|
sz = config.get("size_z", 18.0) |
|
|
exhaustiveness = config.get("exhaustiveness", 8) |
|
|
num_modes = config.get("num_modes", 9) |
|
|
energy_range = config.get("energy_range", 3) |
|
|
cpu = config.get("cpu", 0) |
|
|
seed = config.get("seed", 0) |
|
|
else: |
|
|
|
|
|
user_center = (cfg or {}).get("center", {}) if cfg else {} |
|
|
if ( |
|
|
isinstance(user_center, dict) |
|
|
and all(k in user_center for k in ("x", "y", "z")) |
|
|
): |
|
|
try: |
|
|
cx = float(user_center.get("x")) |
|
|
cy = float(user_center.get("y")) |
|
|
cz = float(user_center.get("z")) |
|
|
yield _format_log(f"Using user-provided center: ({cx:.2f}, {cy:.2f}, {cz:.2f})") |
|
|
except Exception: |
|
|
cx, cy, cz = _compute_ligand_center(lig_pdb) |
|
|
yield _format_log(f"Computed center: ({cx:.2f}, {cy:.2f}, {cz:.2f})") |
|
|
else: |
|
|
cx, cy, cz = _compute_ligand_center(lig_pdb) |
|
|
yield _format_log(f"Computed center: ({cx:.2f}, {cy:.2f}, {cz:.2f})") |
|
|
|
|
|
user_size = (cfg or {}).get("size", {}) if cfg else {} |
|
|
try: |
|
|
sx = float(user_size.get("x", 18.0)) |
|
|
sy = float(user_size.get("y", 18.0)) |
|
|
sz = float(user_size.get("z", 18.0)) |
|
|
except Exception: |
|
|
sx = sy = sz = 18.0 |
|
|
|
|
|
|
|
|
if cx is None or cy is None or cz is None: |
|
|
cx, cy, cz = _compute_ligand_center(lig_pdb) |
|
|
yield _format_log(f"Computed center: ({cx:.2f}, {cy:.2f}, {cz:.2f})") |
|
|
|
|
|
yield _format_log(f"Box center: ({cx:.2f}, {cy:.2f}, {cz:.2f}) Å") |
|
|
yield _format_log(f"Box size: ({sx:.2f}, {sy:.2f}, {sz:.2f}) Å") |
|
|
yield _format_log(f"Exhaustiveness: {exhaustiveness}, Num modes: {num_modes}, Energy range: {energy_range} kcal/mol") |
|
|
|
|
|
|
|
|
yield _format_log(f"Step 4: Running AutoDock Vina docking for ligand {idx}...") |
|
|
docked_pdbqt = lig_dir / f"ligand_{idx}_docked.pdbqt" |
|
|
log_file = lig_dir / f"ligand_{idx}_docked.log" |
|
|
cmd = [ |
|
|
"vina", |
|
|
"--receptor", |
|
|
str(receptor_pdbqt), |
|
|
"--ligand", |
|
|
str(lig_pdbqt), |
|
|
"--center_x", |
|
|
str(cx), |
|
|
"--center_y", |
|
|
str(cy), |
|
|
"--center_z", |
|
|
str(cz), |
|
|
"--size_x", |
|
|
str(sx), |
|
|
"--size_y", |
|
|
str(sy), |
|
|
"--size_z", |
|
|
str(sz), |
|
|
"--exhaustiveness", |
|
|
str(exhaustiveness), |
|
|
"--num_modes", |
|
|
str(num_modes), |
|
|
"--energy_range", |
|
|
str(energy_range), |
|
|
"--out", |
|
|
str(docked_pdbqt), |
|
|
"--log", |
|
|
str(log_file), |
|
|
] |
|
|
if cpu > 0: |
|
|
cmd.extend(["--cpu", str(cpu)]) |
|
|
if seed > 0: |
|
|
cmd.extend(["--seed", str(seed)]) |
|
|
yield _format_log(f"Running command: {' '.join(cmd)}") |
|
|
|
|
|
|
|
|
process = subprocess.Popen( |
|
|
cmd, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.STDOUT, |
|
|
text=True, |
|
|
bufsize=1, |
|
|
universal_newlines=True |
|
|
) |
|
|
|
|
|
for line in iter(process.stdout.readline, ''): |
|
|
if line: |
|
|
yield _format_log(line.strip()) |
|
|
|
|
|
process.wait() |
|
|
if process.returncode != 0 or not docked_pdbqt.exists(): |
|
|
raise RuntimeError( |
|
|
f"Vina failed for ligand {idx}. Return code: {process.returncode}" |
|
|
) |
|
|
yield _format_log(f"✅ Vina docking completed: {docked_pdbqt.name}") |
|
|
|
|
|
|
|
|
yield _format_log(f"Step 5: Splitting docking poses for ligand {idx}...") |
|
|
cmd = [ |
|
|
"vina_split", |
|
|
"--input", |
|
|
str(docked_pdbqt), |
|
|
"--ligand", |
|
|
f"ligand_{idx}_mode", |
|
|
] |
|
|
yield _format_log(f"Running command: {' '.join(cmd)}") |
|
|
|
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
cwd=lig_dir, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
|
|
|
if result.stdout: |
|
|
yield _format_log(result.stdout.strip()) |
|
|
if result.stderr: |
|
|
yield _format_log(result.stderr.strip(), 'warning') |
|
|
|
|
|
if result.returncode != 0: |
|
|
warnings.append( |
|
|
f"vina_split reported issues for ligand {idx}: {result.stderr.strip()}" |
|
|
) |
|
|
yield _format_log(f"⚠️ Warning: vina_split issues for ligand {idx}", 'warning') |
|
|
else: |
|
|
yield _format_log(f"✅ Poses split successfully") |
|
|
|
|
|
|
|
|
mode_energies = _parse_vina_log(log_file) |
|
|
yield _format_log(f"Found {len(mode_energies)} binding mode(s)") |
|
|
|
|
|
|
|
|
yield _format_log(f"Step 6: Converting poses to PDB format...") |
|
|
pose_entries = [] |
|
|
mode_pdbqt_files = sorted(lig_dir.glob(f"ligand_{idx}_mode*.pdbqt")) |
|
|
yield _format_log(f"Processing {len(mode_pdbqt_files)} pose(s)...") |
|
|
|
|
|
for mode_pdbqt in mode_pdbqt_files: |
|
|
mode_name = mode_pdbqt.stem |
|
|
mode_index_str = mode_name.replace(f"ligand_{idx}_mode", "") |
|
|
try: |
|
|
mode_index = int(mode_index_str) |
|
|
except ValueError: |
|
|
mode_index = None |
|
|
|
|
|
yield _format_log(f"Processing pose {mode_index} ({mode_name})...") |
|
|
|
|
|
mode_pdb_noH = lig_dir / f"{mode_name}_noH.pdb" |
|
|
mode_pdb_h = lig_dir / f"{mode_name}_h.pdb" |
|
|
sanitized_pdb = lig_dir / f"{mode_name}_sanitized.pdb" |
|
|
|
|
|
|
|
|
if not mode_pdb_noH.exists(): |
|
|
yield _format_log(f" Converting {mode_pdbqt.name} to PDB (removing hydrogens)...") |
|
|
cmd = [ |
|
|
"obabel", |
|
|
"-i", "pdbqt", |
|
|
str(mode_pdbqt), |
|
|
"-o", "pdb", |
|
|
"-O", |
|
|
str(mode_pdb_noH), |
|
|
"-d", |
|
|
] |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
if result.returncode != 0 or not mode_pdb_noH.exists(): |
|
|
warnings.append( |
|
|
f"Failed to convert {mode_pdbqt.name} to PDB for ligand {idx}: " |
|
|
f"{result.stderr.strip()}" |
|
|
) |
|
|
yield _format_log(f" ⚠️ Failed to convert {mode_pdbqt.name}", 'warning') |
|
|
continue |
|
|
yield _format_log(f" ✅ Converted to {mode_pdb_noH.name}") |
|
|
|
|
|
|
|
|
if not mode_pdb_h.exists(): |
|
|
yield _format_log(f" Adding hydrogens at pH 7.4...") |
|
|
cmd = [ |
|
|
"obabel", |
|
|
"-i", "pdb", |
|
|
str(mode_pdb_noH), |
|
|
"-o", "pdb", |
|
|
"-O", |
|
|
str(mode_pdb_h), |
|
|
"-p", "7.4", |
|
|
] |
|
|
result = subprocess.run( |
|
|
cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
) |
|
|
if result.returncode != 0 or not mode_pdb_h.exists(): |
|
|
logger.warning(f"OpenBabel h_add failed for {mode_pdb_noH.name}: {result.stderr}") |
|
|
yield _format_log(f" ⚠️ Failed to add hydrogens, using noH file", 'warning') |
|
|
|
|
|
mode_pdb_h.write_text(mode_pdb_noH.read_text()) |
|
|
else: |
|
|
yield _format_log(f" ✅ Hydrogens added: {mode_pdb_h.name}") |
|
|
|
|
|
|
|
|
if not sanitized_pdb.exists(): |
|
|
yield _format_log(f" Sanitizing PDB for Antechamber compatibility...") |
|
|
try: |
|
|
|
|
|
original_residue_info = extract_original_residue_info(str(lig_pdb)) |
|
|
|
|
|
|
|
|
sanitized_pdb.write_text(mode_pdb_h.read_text()) |
|
|
|
|
|
|
|
|
_sanitize_docked_pose_for_antechamber(sanitized_pdb, original_residue_info) |
|
|
yield _format_log(f" ✅ Sanitized: {sanitized_pdb.name}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error sanitizing {mode_pdb_h.name}: {e}") |
|
|
yield _format_log(f" ⚠️ Sanitization error: {e}, using fallback", 'warning') |
|
|
|
|
|
if not sanitized_pdb.exists(): |
|
|
sanitized_pdb.write_text(mode_pdb_h.read_text()) |
|
|
|
|
|
energy = mode_energies.get(mode_index) |
|
|
if energy: |
|
|
yield _format_log(f" Binding energy: {energy:.2f} kcal/mol") |
|
|
|
|
|
pose_entries.append( |
|
|
{ |
|
|
"mode_index": mode_index, |
|
|
"file": str(mode_pdb_h.relative_to(OUTPUT_DIR)), |
|
|
"sanitized_file": str(sanitized_pdb.relative_to(OUTPUT_DIR)), |
|
|
"energy": energy, |
|
|
} |
|
|
) |
|
|
|
|
|
yield _format_log(f"✅ Successfully processed ligand {idx} with {len(pose_entries)} pose(s)", 'success') |
|
|
|
|
|
resname, chain = _get_ligand_info_from_pdb(lig_pdb) |
|
|
ligands_summary.append( |
|
|
{ |
|
|
"index": idx, |
|
|
"name": resname, |
|
|
"chain": chain, |
|
|
"original_file": str(original_copy.relative_to(OUTPUT_DIR)), |
|
|
"corrected_file": str(lig_pdb.relative_to(OUTPUT_DIR)), |
|
|
"poses": pose_entries, |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
error_msg = f"Ligand {idx} ({lig_pdb.name}): {str(e)}" |
|
|
errors.append(error_msg) |
|
|
yield _format_log(f"❌ Error: {error_msg}", 'error') |
|
|
|
|
|
|
|
|
resname_chain_count = defaultdict(int) |
|
|
for lig in ligands_summary: |
|
|
resname_chain_count[(lig["name"], lig["chain"])] += 1 |
|
|
resname_chain_instance = defaultdict(int) |
|
|
for lig in ligands_summary: |
|
|
key = (lig["name"], lig["chain"]) |
|
|
resname_chain_instance[key] += 1 |
|
|
instance = resname_chain_instance[key] |
|
|
count = resname_chain_count[key] |
|
|
lig["displayLabel"] = f"{lig['name']}-{lig['chain']}-{instance}" if count > 1 else f"{lig['name']}-{lig['chain']}" |
|
|
|
|
|
|
|
|
|
|
|
validate_and_sanitize_all_ligand_files() |
|
|
|
|
|
|
|
|
result_data = { |
|
|
'type': 'complete', |
|
|
'success': len(errors) == 0, |
|
|
'ligands': ligands_summary, |
|
|
'warnings': warnings, |
|
|
'errors': errors, |
|
|
} |
|
|
yield f"data: {json.dumps(result_data)}\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error running docking: {str(e)}") |
|
|
yield _format_log(f'Internal server error: {str(e)}', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Internal server error: {str(e)}'})}\n\n" |
|
|
|
|
|
return Response(generate(), mimetype='text/event-stream') |
|
|
|
|
|
|
|
|
@app.route('/api/docking/get-structure', methods=['GET']) |
|
|
def get_docking_structure(): |
|
|
""" |
|
|
Return PDB content for a docking structure (original or a specific pose). |
|
|
Query parameters: |
|
|
- ligand_index: 1-based index of ligand |
|
|
- type: 'original' or 'pose' |
|
|
- mode_index: integer (required when type='pose') |
|
|
""" |
|
|
try: |
|
|
ligand_index = int(request.args.get("ligand_index", "0")) |
|
|
if ligand_index <= 0: |
|
|
return jsonify({"success": False, "error": "Invalid ligand_index"}), 400 |
|
|
|
|
|
docking_dir = OUTPUT_DIR / "docking" / f"ligand_{ligand_index}" |
|
|
if not docking_dir.exists(): |
|
|
return jsonify({"success": False, "error": "Docking results not found for this ligand"}), 404 |
|
|
|
|
|
struct_type = request.args.get("type", "original") |
|
|
if struct_type == "original": |
|
|
pdb_path = docking_dir / "original_ligand.pdb" |
|
|
else: |
|
|
mode_index = int(request.args.get("mode_index", "0")) |
|
|
if mode_index <= 0: |
|
|
return jsonify({"success": False, "error": "mode_index must be positive for pose"}), 400 |
|
|
pdb_path = docking_dir / f"ligand_{ligand_index}_mode{mode_index}_h.pdb" |
|
|
|
|
|
if not pdb_path.exists(): |
|
|
return jsonify({"success": False, "error": f"PDB file not found: {pdb_path.name}"}), 404 |
|
|
|
|
|
content = pdb_path.read_text() |
|
|
return jsonify({"success": True, "content": content}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting docking structure: {str(e)}") |
|
|
return jsonify({"success": False, "error": str(e)}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/docking/get-config', methods=['GET']) |
|
|
def get_docking_config(): |
|
|
""" |
|
|
Get Vina config file for a ligand. |
|
|
Query parameters: |
|
|
- ligand_index: 1-based index of ligand |
|
|
""" |
|
|
try: |
|
|
ligand_index = int(request.args.get("ligand_index", "0")) |
|
|
if ligand_index <= 0: |
|
|
return jsonify({"success": False, "error": "Invalid ligand_index"}), 400 |
|
|
|
|
|
docking_dir = OUTPUT_DIR / "docking" |
|
|
docking_dir.mkdir(parents=True, exist_ok=True) |
|
|
lig_dir = docking_dir / f"ligand_{ligand_index}" |
|
|
lig_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
config_file = lig_dir / "vina_config.txt" |
|
|
|
|
|
|
|
|
if not config_file.exists(): |
|
|
|
|
|
ligand_files = sorted(OUTPUT_DIR.glob("4_ligands_corrected_obabel_*.pdb")) |
|
|
if not ligand_files: |
|
|
ligand_files = sorted( |
|
|
[f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name] |
|
|
) |
|
|
|
|
|
if ligand_index <= len(ligand_files): |
|
|
lig_pdb = ligand_files[ligand_index - 1] |
|
|
cx, cy, cz = _compute_ligand_center(lig_pdb) |
|
|
else: |
|
|
cx, cy, cz = 0.0, 0.0, 0.0 |
|
|
|
|
|
|
|
|
default_config = f"""# AutoDock Vina Configuration File |
|
|
# Ligand {ligand_index} |
|
|
|
|
|
# Search space center (Angstroms) |
|
|
center_x = {cx:.2f} |
|
|
center_y = {cy:.2f} |
|
|
center_z = {cz:.2f} |
|
|
|
|
|
# Search space size (Angstroms) |
|
|
size_x = 18.0 |
|
|
size_y = 18.0 |
|
|
size_z = 18.0 |
|
|
|
|
|
# Exhaustiveness of the global search (default: 8) |
|
|
# Higher values give better results but take longer |
|
|
exhaustiveness = 8 |
|
|
|
|
|
# Number of binding modes to generate (default: 9) |
|
|
num_modes = 9 |
|
|
|
|
|
# Maximum energy difference between the best binding mode and the worst one displayed (kcal/mol, default: 3) |
|
|
energy_range = 3 |
|
|
|
|
|
# Optional: CPU usage (default: 0 = use all available CPUs) |
|
|
cpu = 0 |
|
|
|
|
|
# Optional: Seed for random number generator (default: 0 = random) |
|
|
seed = 0 |
|
|
""" |
|
|
config_file.write_text(default_config) |
|
|
|
|
|
content = config_file.read_text() |
|
|
return jsonify({"success": True, "content": content}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting docking config: {str(e)}") |
|
|
return jsonify({"success": False, "error": str(e)}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/docking/save-config', methods=['POST']) |
|
|
def save_docking_config(): |
|
|
""" |
|
|
Save Vina config file for a ligand. |
|
|
Body: { "ligand_index": int, "content": str } |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
ligand_index = int(data.get("ligand_index", 0)) |
|
|
content = data.get("content", "") |
|
|
|
|
|
if ligand_index <= 0: |
|
|
return jsonify({"success": False, "error": "Invalid ligand_index"}), 400 |
|
|
|
|
|
if not content: |
|
|
return jsonify({"success": False, "error": "Config content is required"}), 400 |
|
|
|
|
|
docking_dir = OUTPUT_DIR / "docking" |
|
|
docking_dir.mkdir(parents=True, exist_ok=True) |
|
|
lig_dir = docking_dir / f"ligand_{ligand_index}" |
|
|
lig_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
config_file = lig_dir / "vina_config.txt" |
|
|
config_file.write_text(content) |
|
|
|
|
|
return jsonify({"success": True, "message": f"Config saved for ligand {ligand_index}"}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving docking config: {str(e)}") |
|
|
return jsonify({"success": False, "error": str(e)}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/docking/get-protein', methods=['GET']) |
|
|
def get_docking_protein(): |
|
|
""" |
|
|
Return the prepared protein structure (tleap_ready.pdb) for the poses viewer. |
|
|
""" |
|
|
try: |
|
|
tleap_ready = OUTPUT_DIR / "tleap_ready.pdb" |
|
|
if not tleap_ready.exists(): |
|
|
return jsonify({"success": False, "error": "Prepared structure not found"}), 404 |
|
|
|
|
|
content = tleap_ready.read_text() |
|
|
return jsonify({"success": True, "content": content}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting protein structure: {str(e)}") |
|
|
return jsonify({"success": False, "error": str(e)}), 500 |
|
|
|
|
|
|
|
|
def _validate_and_sanitize_ligand_name(resname: str) -> tuple[str, bool]: |
|
|
""" |
|
|
Validate ligand residue name. If it's pure numeric, convert to a 3-letter code. |
|
|
PDB format requires 3-letter residue names (exactly 3 characters). |
|
|
Returns: (sanitized_name, was_changed) |
|
|
""" |
|
|
if not resname: |
|
|
return "LIG", True |
|
|
|
|
|
|
|
|
if resname.isdigit(): |
|
|
|
|
|
|
|
|
num = int(resname) |
|
|
|
|
|
last_two = num % 100 |
|
|
sanitized = f"L{last_two:02d}" |
|
|
|
|
|
return sanitized, True |
|
|
|
|
|
|
|
|
resname_stripped = resname.strip() |
|
|
if len(resname_stripped) != 3: |
|
|
if len(resname_stripped) < 3: |
|
|
|
|
|
sanitized = f"{resname_stripped:>3}" |
|
|
else: |
|
|
|
|
|
sanitized = resname_stripped[:3] |
|
|
|
|
|
if sanitized != resname_stripped: |
|
|
return sanitized, True |
|
|
|
|
|
return resname_stripped, False |
|
|
|
|
|
|
|
|
def _update_pdb_residue_name(pdb_path: Path, old_resname: str, new_resname: str): |
|
|
""" |
|
|
Update all residue names in a PDB file from old_resname to new_resname. |
|
|
Only updates ATOM and HETATM records. |
|
|
""" |
|
|
try: |
|
|
content = pdb_path.read_text() |
|
|
lines = content.split('\n') |
|
|
updated_lines = [] |
|
|
updated = False |
|
|
|
|
|
for line in lines: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
|
|
|
current_resname = line[17:20].strip() |
|
|
if current_resname == old_resname: |
|
|
|
|
|
new_line = line[:17] + f"{new_resname:>3}" + line[20:] |
|
|
updated_lines.append(new_line) |
|
|
updated = True |
|
|
else: |
|
|
updated_lines.append(line) |
|
|
else: |
|
|
updated_lines.append(line) |
|
|
|
|
|
if updated: |
|
|
pdb_path.write_text('\n'.join(updated_lines)) |
|
|
return updated |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to update residue name in {pdb_path}: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def validate_and_sanitize_all_ligand_files(): |
|
|
""" |
|
|
Validate and sanitize all ligand PDB files in the output directory. |
|
|
This should be called early in the workflow to ensure consistency. |
|
|
Returns list of warnings about name changes in format: [(old_name, new_name, filename), ...] |
|
|
""" |
|
|
warnings = [] |
|
|
try: |
|
|
|
|
|
ligand_files = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) |
|
|
|
|
|
if not ligand_files: |
|
|
|
|
|
single_lig_file = OUTPUT_DIR / '4_ligands_corrected.pdb' |
|
|
if single_lig_file.exists(): |
|
|
ligand_files = [single_lig_file] |
|
|
|
|
|
for lig_file in ligand_files: |
|
|
|
|
|
original_resname = None |
|
|
with open(lig_file, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
original_resname = line[17:20].strip() |
|
|
break |
|
|
|
|
|
if original_resname: |
|
|
|
|
|
if original_resname.isdigit(): |
|
|
|
|
|
sanitized_name, was_changed = _validate_and_sanitize_ligand_name(original_resname) |
|
|
if was_changed: |
|
|
|
|
|
_update_pdb_residue_name(lig_file, original_resname, sanitized_name) |
|
|
warnings.append((original_resname, sanitized_name, lig_file.name)) |
|
|
else: |
|
|
|
|
|
sanitized_name, was_changed = _validate_and_sanitize_ligand_name(original_resname) |
|
|
if was_changed and sanitized_name != original_resname: |
|
|
_update_pdb_residue_name(lig_file, original_resname, sanitized_name) |
|
|
warnings.append((original_resname, sanitized_name, lig_file.name)) |
|
|
|
|
|
|
|
|
tleap_ready = OUTPUT_DIR / "tleap_ready.pdb" |
|
|
if tleap_ready.exists(): |
|
|
|
|
|
original_names = {} |
|
|
with open(tleap_ready, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith('HETATM'): |
|
|
resname = line[17:20].strip() |
|
|
if resname and resname not in ['HOH', 'WAT', 'TIP', 'SPC', 'NA', 'CL']: |
|
|
if resname not in original_names: |
|
|
original_names[resname] = True |
|
|
|
|
|
|
|
|
for original_resname in original_names.keys(): |
|
|
if original_resname.isdigit(): |
|
|
sanitized_name, was_changed = _validate_and_sanitize_ligand_name(original_resname) |
|
|
if was_changed: |
|
|
_update_pdb_residue_name(tleap_ready, original_resname, sanitized_name) |
|
|
warnings.append((original_resname, sanitized_name, tleap_ready.name)) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error validating ligand files: {e}") |
|
|
|
|
|
return warnings |
|
|
|
|
|
|
|
|
def _get_ligand_info_from_pdb(pdb_path: Path, sanitize: bool = True): |
|
|
""" |
|
|
Extract residue name and chain ID from a ligand PDB file. |
|
|
If sanitize=True, validates and updates numeric residue names in the file. |
|
|
""" |
|
|
resname = "UNK" |
|
|
chain = "A" |
|
|
with open(pdb_path, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
|
|
|
resname = line[17:20].strip() |
|
|
chain = line[21:22].strip() or "A" |
|
|
break |
|
|
|
|
|
|
|
|
if sanitize: |
|
|
sanitized_name, was_changed = _validate_and_sanitize_ligand_name(resname) |
|
|
if was_changed: |
|
|
original_name = resname |
|
|
logger.warning( |
|
|
f"Ligand residue name '{original_name}' in {pdb_path.name} is pure numeric. " |
|
|
f"Changed to '{sanitized_name}' (3-letter code) to avoid errors. " |
|
|
f"The PDB file has been updated." |
|
|
) |
|
|
_update_pdb_residue_name(pdb_path, resname, sanitized_name) |
|
|
resname = sanitized_name |
|
|
|
|
|
return resname, chain |
|
|
|
|
|
|
|
|
@app.route('/api/docking/get-ligand-boxes', methods=['GET']) |
|
|
def get_ligand_boxes(): |
|
|
""" |
|
|
Return default ligand box suggestions (center and size) for each corrected ligand. |
|
|
Also returns ligand name (residue name) and chain ID for display. |
|
|
Center is computed from 4_ligands_corrected_obabel_*.pdb using MDAnalysis, size defaults to 10 Å cube. |
|
|
""" |
|
|
try: |
|
|
|
|
|
ligand_files = sorted(OUTPUT_DIR.glob("4_ligands_corrected_obabel_*.pdb")) |
|
|
if not ligand_files: |
|
|
|
|
|
ligand_files = sorted( |
|
|
[f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name] |
|
|
) |
|
|
|
|
|
|
|
|
chains = [] |
|
|
tleap_ready = OUTPUT_DIR / "tleap_ready.pdb" |
|
|
if tleap_ready.exists(): |
|
|
seen_chains = set() |
|
|
with open(tleap_ready, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
chain = line[21:22].strip() or "A" |
|
|
if chain not in seen_chains: |
|
|
seen_chains.add(chain) |
|
|
chains.append(chain) |
|
|
|
|
|
ligands = [] |
|
|
for idx, lig_pdb in enumerate(ligand_files, start=1): |
|
|
try: |
|
|
cx, cy, cz = _compute_ligand_center(lig_pdb) |
|
|
resname, chain = _get_ligand_info_from_pdb(lig_pdb) |
|
|
ligands.append( |
|
|
{ |
|
|
"index": idx, |
|
|
"name": resname, |
|
|
"chain": chain, |
|
|
"center": {"x": cx, "y": cy, "z": cz}, |
|
|
"size": {"x": 10.0, "y": 10.0, "z": 10.0}, |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to compute center for {lig_pdb}: {e}") |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
resname_chain_count = defaultdict(int) |
|
|
for lig in ligands: |
|
|
resname_chain_count[(lig["name"], lig["chain"])] += 1 |
|
|
resname_chain_instance = defaultdict(int) |
|
|
for lig in ligands: |
|
|
key = (lig["name"], lig["chain"]) |
|
|
resname_chain_instance[key] += 1 |
|
|
instance = resname_chain_instance[key] |
|
|
count = resname_chain_count[key] |
|
|
lig["displayLabel"] = f"{lig['name']}-{lig['chain']}-{instance}" if count > 1 else f"{lig['name']}-{lig['chain']}" |
|
|
|
|
|
return jsonify({"success": True, "ligands": ligands, "chains": sorted(chains)}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error computing ligand boxes: {str(e)}") |
|
|
return jsonify({"success": False, "error": str(e)}), 500 |
|
|
|
|
|
|
|
|
@app.route('/api/docking/apply', methods=['POST']) |
|
|
def apply_docking_poses(): |
|
|
""" |
|
|
Apply user-selected docked poses by replacing the corresponding |
|
|
4_ligands_corrected_*.pdb files and rebuilding tleap_ready.pdb. |
|
|
Request JSON: |
|
|
{ |
|
|
"selections": [ |
|
|
{"ligand_index": 1, "choice": "original"}, |
|
|
{"ligand_index": 2, "choice": "mode", "mode_index": 1}, |
|
|
... |
|
|
] |
|
|
} |
|
|
""" |
|
|
try: |
|
|
data = request.get_json() or {} |
|
|
selections = data.get("selections", []) |
|
|
if not isinstance(selections, list) or not selections: |
|
|
return jsonify({"success": False, "error": "No selections provided"}), 400 |
|
|
|
|
|
protein_capped = OUTPUT_DIR / "2_protein_with_caps.pdb" |
|
|
if not protein_capped.exists(): |
|
|
return jsonify( |
|
|
{ |
|
|
"success": False, |
|
|
"error": "2_protein_with_caps.pdb not found. Run structure preparation first.", |
|
|
} |
|
|
), 400 |
|
|
|
|
|
|
|
|
updated_indices = [] |
|
|
for sel in selections: |
|
|
try: |
|
|
lig_index = int(sel.get("ligand_index", 0)) |
|
|
choice = sel.get("choice", "original") |
|
|
if lig_index <= 0: |
|
|
continue |
|
|
|
|
|
corrected_path = OUTPUT_DIR / f"4_ligands_corrected_{lig_index}.pdb" |
|
|
if not corrected_path.exists(): |
|
|
continue |
|
|
|
|
|
if choice == "original": |
|
|
|
|
|
continue |
|
|
|
|
|
if choice == "mode": |
|
|
mode_index = int(sel.get("mode_index", 0)) |
|
|
if mode_index <= 0: |
|
|
continue |
|
|
|
|
|
|
|
|
sanitized_pose = ( |
|
|
OUTPUT_DIR |
|
|
/ "docking" |
|
|
/ f"ligand_{lig_index}" |
|
|
/ f"ligand_{lig_index}_mode{mode_index}_sanitized.pdb" |
|
|
) |
|
|
|
|
|
|
|
|
if not sanitized_pose.exists(): |
|
|
sanitized_pose = ( |
|
|
OUTPUT_DIR |
|
|
/ "docking" |
|
|
/ f"ligand_{lig_index}" |
|
|
/ f"ligand_{lig_index}_mode{mode_index}_h.pdb" |
|
|
) |
|
|
|
|
|
if not sanitized_pose.exists(): |
|
|
logger.warning(f"Docking pose not found: {sanitized_pose}") |
|
|
continue |
|
|
|
|
|
|
|
|
corrected_path.write_text(sanitized_pose.read_text()) |
|
|
updated_indices.append(lig_index) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error applying selection {sel}: {str(e)}") |
|
|
|
|
|
|
|
|
tleap_ready = OUTPUT_DIR / "tleap_ready.pdb" |
|
|
ligand_groups = [] |
|
|
ligand_files = sorted( |
|
|
[f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name] |
|
|
) |
|
|
for lig_pdb in ligand_files: |
|
|
lines = [ |
|
|
line |
|
|
for line in lig_pdb.read_text().splitlines(keepends=True) |
|
|
if line.startswith(("ATOM", "HETATM")) |
|
|
] |
|
|
if lines: |
|
|
ligand_groups.append(lines) |
|
|
|
|
|
if ligand_groups: |
|
|
ok = merge_protein_and_ligand( |
|
|
str(protein_capped), None, str(tleap_ready), ligand_groups=ligand_groups |
|
|
) |
|
|
if not ok: |
|
|
return jsonify( |
|
|
{ |
|
|
"success": False, |
|
|
"error": "Failed to merge protein and updated ligands into tleap_ready.pdb", |
|
|
} |
|
|
), 500 |
|
|
|
|
|
return jsonify( |
|
|
{ |
|
|
"success": True, |
|
|
"updated_ligands": updated_indices, |
|
|
"tleap_ready": str(tleap_ready.relative_to(OUTPUT_DIR)) if tleap_ready.exists() else None, |
|
|
} |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error applying docking poses: {str(e)}") |
|
|
return jsonify({"success": False, "error": str(e)}), 500 |
|
|
|
|
|
def _format_log(message, log_type='info'): |
|
|
"""Helper function to format log message for SSE""" |
|
|
data = json.dumps({'type': log_type, 'message': message}) |
|
|
return f"data: {data}\n\n" |
|
|
|
|
|
@app.route('/api/generate-ligand-ff', methods=['POST']) |
|
|
@stream_with_context |
|
|
def generate_ligand_ff(): |
|
|
"""Generate force field parameters for multiple ligands with streaming logs""" |
|
|
def generate(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
force_field = data.get('force_field', 'gaff2') |
|
|
|
|
|
|
|
|
s_param = 2 if force_field == 'gaff2' else 1 |
|
|
|
|
|
yield _format_log(f"Working directory: {os.getcwd()}") |
|
|
yield _format_log(f"Output directory: {OUTPUT_DIR}") |
|
|
|
|
|
|
|
|
|
|
|
ligand_files = sorted([f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name]) |
|
|
|
|
|
if not ligand_files: |
|
|
|
|
|
single_ligand_pdb = OUTPUT_DIR / "4_ligands_corrected.pdb" |
|
|
if single_ligand_pdb.exists(): |
|
|
ligand_files = [single_ligand_pdb] |
|
|
else: |
|
|
yield _format_log('Ligand PDB file(s) not found. Please prepare structure with ligands first.', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'Ligand PDB file(s) not found. Please prepare structure with ligands first.'})}\n\n" |
|
|
return |
|
|
|
|
|
yield _format_log(f"Found {len(ligand_files)} ligand file(s) to process") |
|
|
|
|
|
|
|
|
|
|
|
validate_and_sanitize_all_ligand_files() |
|
|
yield _format_log("Validated ligand residue names (numeric names converted to LIG{number} format if needed)") |
|
|
|
|
|
import re |
|
|
processed_ligands = [] |
|
|
errors = [] |
|
|
|
|
|
|
|
|
ligand_by_resname = {} |
|
|
resname_to_ligand_num = {} |
|
|
|
|
|
for i, ligand_pdb in enumerate(ligand_files, 1): |
|
|
ligand_num = i |
|
|
|
|
|
match = re.search(r'_(\d+)\.pdb$', ligand_pdb.name) |
|
|
if match: |
|
|
ligand_num = int(match.group(1)) |
|
|
|
|
|
|
|
|
resname = get_residue_name_from_pdb(ligand_pdb, sanitize=True) |
|
|
if not resname: |
|
|
yield _format_log(f"Warning: Could not extract residue name from {ligand_pdb.name}, using LIG{ligand_num}", 'warning') |
|
|
resname = f"LIG{ligand_num}" |
|
|
|
|
|
|
|
|
if resname not in ligand_by_resname: |
|
|
ligand_by_resname[resname] = [] |
|
|
resname_to_ligand_num[resname] = ligand_num |
|
|
ligand_by_resname[resname].append((ligand_pdb, ligand_num)) |
|
|
|
|
|
yield _format_log(f"Found {len(ligand_by_resname)} unique ligand residue name(s): {', '.join(sorted(ligand_by_resname.keys()))}") |
|
|
|
|
|
|
|
|
for resname, ligand_list in ligand_by_resname.items(): |
|
|
|
|
|
ligand_pdb, ligand_num = ligand_list[0] |
|
|
|
|
|
|
|
|
if len(ligand_list) > 1: |
|
|
other_nums = [num for _, num in ligand_list[1:]] |
|
|
yield _format_log(f"Residue {resname} appears {len(ligand_list)} times (ligand files: {ligand_num}, {', '.join(map(str, other_nums))})", 'info') |
|
|
yield _format_log(f"Processing {resname} once using ligand file {ligand_num}, skipping duplicates", 'info') |
|
|
|
|
|
|
|
|
ligand_mol2 = OUTPUT_DIR / f"{resname}.mol2" |
|
|
ligand_frcmod = OUTPUT_DIR / f"{resname}.frcmod" |
|
|
|
|
|
yield _format_log(f"\n{'='*60}") |
|
|
yield _format_log(f"Processing ligand {resname} (from file {ligand_pdb.name})") |
|
|
yield _format_log(f"{'='*60}") |
|
|
|
|
|
|
|
|
yield _format_log(f"Step 1: Calculating net charge for ligand {resname}...") |
|
|
awk_cmd = "awk '/^HETATM/ {if($NF ~ /[A-Z][0-9]-$/) charge--; if($NF ~ /[A-Z][0-9]\\+$/) charge++} END {print \"Net charge:\", charge+0}'" |
|
|
cmd1 = f"{awk_cmd} {ligand_pdb}" |
|
|
|
|
|
try: |
|
|
result = subprocess.run(cmd1, shell=True, capture_output=True, text=True) |
|
|
output = result.stdout.strip() |
|
|
yield _format_log(f"Awk output: '{output}'") |
|
|
|
|
|
net_charge_match = re.search(r'Net charge:\s*(-?\d+)', output) |
|
|
if net_charge_match: |
|
|
net_charge = int(net_charge_match.group(1)) |
|
|
yield _format_log(f"Calculated net charge: {net_charge}") |
|
|
else: |
|
|
yield _format_log("Could not extract net charge from awk output, using 0", 'warning') |
|
|
net_charge = 0 |
|
|
except Exception as e: |
|
|
yield _format_log(f"Error running awk command: {e}, using net charge 0", 'error') |
|
|
net_charge = 0 |
|
|
|
|
|
|
|
|
yield _format_log(f"Step 2: Running antechamber for ligand {resname} with net charge {net_charge}...") |
|
|
cmd2 = f"antechamber -i {ligand_pdb.name} -fi pdb -o {ligand_mol2.name} -fo mol2 -c bcc -at {force_field} -nc {net_charge}" |
|
|
yield _format_log(f"Running command: {cmd2}") |
|
|
|
|
|
|
|
|
process = subprocess.Popen(cmd2, shell=True, cwd=str(OUTPUT_DIR), |
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
|
|
text=True, bufsize=1, universal_newlines=True) |
|
|
|
|
|
for line in iter(process.stdout.readline, ''): |
|
|
if line: |
|
|
yield _format_log(line.strip()) |
|
|
|
|
|
process.wait() |
|
|
return_code = process.returncode |
|
|
|
|
|
yield _format_log(f"antechamber return code: {return_code}") |
|
|
|
|
|
if return_code != 0: |
|
|
error_msg = f'antechamber failed for ligand {resname} with net charge {net_charge}' |
|
|
yield _format_log(f"ERROR: {error_msg}", 'error') |
|
|
errors.append(error_msg) |
|
|
continue |
|
|
|
|
|
|
|
|
yield _format_log(f"Step 3: Running parmchk2 for ligand {resname}...") |
|
|
cmd3 = f"parmchk2 -i {ligand_mol2.name} -f mol2 -o {ligand_frcmod.name} -a Y -s {s_param}" |
|
|
yield _format_log(f"Running command: {cmd3}") |
|
|
|
|
|
|
|
|
process = subprocess.Popen(cmd3, shell=True, cwd=str(OUTPUT_DIR), |
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, |
|
|
text=True, bufsize=1, universal_newlines=True) |
|
|
|
|
|
for line in iter(process.stdout.readline, ''): |
|
|
if line: |
|
|
yield _format_log(line.strip()) |
|
|
|
|
|
process.wait() |
|
|
return_code = process.returncode |
|
|
|
|
|
yield _format_log(f"parmchk2 return code: {return_code}") |
|
|
|
|
|
if return_code != 0: |
|
|
error_msg = f'parmchk2 failed for ligand {resname}' |
|
|
yield _format_log(f"ERROR: {error_msg}", 'error') |
|
|
errors.append(error_msg) |
|
|
continue |
|
|
|
|
|
|
|
|
if ligand_mol2.exists() and ligand_frcmod.exists(): |
|
|
processed_ligands.append({ |
|
|
'resname': resname, |
|
|
'ligand_num': ligand_num, |
|
|
'net_charge': net_charge, |
|
|
'files': { |
|
|
'pdb': str(ligand_pdb), |
|
|
'mol2': str(ligand_mol2), |
|
|
'frcmod': str(ligand_frcmod) |
|
|
}, |
|
|
'duplicate_files': [str(pdb) for pdb, num in ligand_list[1:]] if len(ligand_list) > 1 else [] |
|
|
}) |
|
|
yield _format_log(f"✅ Successfully processed ligand {resname}", 'success') |
|
|
else: |
|
|
error_msg = f'Force field generation failed for ligand {resname} - output files not created' |
|
|
yield _format_log(f"ERROR: {error_msg}", 'error') |
|
|
errors.append(error_msg) |
|
|
|
|
|
if not processed_ligands: |
|
|
error_msg = f'Failed to process any ligands. Errors: {"; ".join(errors)}' |
|
|
yield _format_log(error_msg, 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
result_data = { |
|
|
'type': 'complete', |
|
|
'success': True, |
|
|
'message': f'Successfully processed {len(processed_ligands)} ligand(s) with force field {force_field}', |
|
|
'ligands': processed_ligands, |
|
|
'errors': errors if errors else None |
|
|
} |
|
|
yield f"data: {json.dumps(result_data)}\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error generating ligand force field: {str(e)}") |
|
|
yield _format_log(f'Internal server error: {str(e)}', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Internal server error: {str(e)}'})}\n\n" |
|
|
|
|
|
return Response(generate(), mimetype='text/event-stream') |
|
|
|
|
|
@app.route('/api/calculate-net-charge', methods=['POST']) |
|
|
def calculate_net_charge(): |
|
|
"""Calculate net charge of the system using tleap""" |
|
|
try: |
|
|
|
|
|
tleap_ready_file = OUTPUT_DIR / "tleap_ready.pdb" |
|
|
if not tleap_ready_file.exists(): |
|
|
return jsonify({'error': 'Structure not prepared. Please prepare structure first.'}), 400 |
|
|
|
|
|
|
|
|
ligand_mol2_files = [] |
|
|
ligand_frcmod_files = [] |
|
|
ligand_resname_map = {} |
|
|
|
|
|
|
|
|
unique_resnames = get_all_ligand_residue_names() |
|
|
for resname in unique_resnames: |
|
|
mol2_file = OUTPUT_DIR / f"{resname}.mol2" |
|
|
frcmod_file = OUTPUT_DIR / f"{resname}.frcmod" |
|
|
if mol2_file.exists() and frcmod_file.exists(): |
|
|
ligand_resname_map[resname] = (mol2_file, frcmod_file) |
|
|
ligand_mol2_files.append(mol2_file) |
|
|
ligand_frcmod_files.append(frcmod_file) |
|
|
|
|
|
|
|
|
if not ligand_mol2_files: |
|
|
numbered_mol2 = sorted(OUTPUT_DIR.glob("4_ligands_corrected_*.mol2")) |
|
|
numbered_frcmod = sorted(OUTPUT_DIR.glob("4_ligands_corrected_*.frcmod")) |
|
|
if numbered_mol2 and numbered_frcmod: |
|
|
ligand_mol2_files = numbered_mol2 |
|
|
ligand_frcmod_files = numbered_frcmod |
|
|
|
|
|
resnames = get_all_ligand_residue_names() |
|
|
for i, (mol2_file, frcmod_file) in enumerate(zip(ligand_mol2_files, ligand_frcmod_files)): |
|
|
|
|
|
resname = get_residue_name_from_mol2(mol2_file) if mol2_file.exists() else None |
|
|
if not resname: |
|
|
|
|
|
if resnames and i < len(resnames): |
|
|
resname = resnames[i] |
|
|
else: |
|
|
resname = f"LIG{len(ligand_resname_map) + 1}" |
|
|
|
|
|
if resname not in ligand_resname_map: |
|
|
ligand_resname_map[resname] = (mol2_file, frcmod_file) |
|
|
|
|
|
|
|
|
if not ligand_mol2_files: |
|
|
single_mol2 = OUTPUT_DIR / "4_ligands_corrected.mol2" |
|
|
single_frcmod = OUTPUT_DIR / "4_ligands_corrected.frcmod" |
|
|
if single_mol2.exists() and single_frcmod.exists(): |
|
|
ligand_mol2_files = [single_mol2] |
|
|
ligand_frcmod_files = [single_frcmod] |
|
|
resname = get_all_ligand_residue_names() |
|
|
if resname: |
|
|
ligand_resname_map[resname[0]] = (single_mol2, single_frcmod) |
|
|
else: |
|
|
ligand_resname_map["LIG"] = (single_mol2, single_frcmod) |
|
|
|
|
|
ligand_present = len(ligand_mol2_files) > 0 and len(ligand_frcmod_files) > 0 |
|
|
|
|
|
|
|
|
tleap_input = OUTPUT_DIR / "calc_charge_on_system.in" |
|
|
|
|
|
|
|
|
data = request.get_json() if request.get_json() else {} |
|
|
selected_force_field = data.get('force_field', 'ff14SB') |
|
|
|
|
|
with open(tleap_input, 'w') as f: |
|
|
f.write(f"source leaprc.protein.{selected_force_field}\n") |
|
|
f.write("source leaprc.gaff2\n\n") |
|
|
|
|
|
if ligand_present: |
|
|
|
|
|
|
|
|
for resname in sorted(ligand_resname_map.keys()): |
|
|
mol2_file, frcmod_file = ligand_resname_map[resname] |
|
|
f.write(f"loadamberparams {frcmod_file.name}\n") |
|
|
f.write(f"{resname} = loadmol2 {mol2_file.name}\n") |
|
|
f.write("\n") |
|
|
|
|
|
f.write("x = loadpdb tleap_ready.pdb\n\n") |
|
|
f.write("charge x\n\n") |
|
|
f.write("quit\n") |
|
|
|
|
|
|
|
|
print("Running tleap to calculate system charge...") |
|
|
|
|
|
try: |
|
|
|
|
|
which_result = subprocess.run(['which', 'tleap'], capture_output=True, text=True) |
|
|
if which_result.returncode == 0: |
|
|
tleap_path = which_result.stdout.strip() |
|
|
else: |
|
|
|
|
|
conda_env = os.environ.get('CONDA_DEFAULT_ENV', 'MD_pipeline') |
|
|
conda_prefix = os.environ.get('CONDA_PREFIX', '') |
|
|
if conda_prefix: |
|
|
tleap_path = os.path.join(conda_prefix, 'bin', 'tleap') |
|
|
else: |
|
|
|
|
|
tleap_path = 'tleap' |
|
|
|
|
|
cmd = f"{tleap_path} -f calc_charge_on_system.in" |
|
|
result = subprocess.run(cmd, shell=True, cwd=str(OUTPUT_DIR), capture_output=True, text=True) |
|
|
except Exception as e: |
|
|
|
|
|
cmd = f"tleap -f calc_charge_on_system.in" |
|
|
result = subprocess.run(cmd, shell=True, cwd=str(OUTPUT_DIR), capture_output=True, text=True) |
|
|
|
|
|
print(f"tleap return code: {result.returncode}") |
|
|
print(f"tleap stdout: {result.stdout}") |
|
|
print(f"tleap stderr: {result.stderr}") |
|
|
|
|
|
|
|
|
|
|
|
if 'Total unperturbed charge' not in result.stdout and 'Total charge' not in result.stdout: |
|
|
return jsonify({'error': f'tleap failed to calculate charge. Error: {result.stderr}'}), 500 |
|
|
|
|
|
|
|
|
output_lines = result.stdout.split('\n') |
|
|
net_charge = None |
|
|
|
|
|
for line in output_lines: |
|
|
if 'Total unperturbed charge' in line or 'Total charge' in line: |
|
|
|
|
|
import re |
|
|
charge_match = re.search(r'charge[:\s]+(-?\d+\.?\d*)', line) |
|
|
if charge_match: |
|
|
net_charge = float(charge_match.group(1)) |
|
|
break |
|
|
|
|
|
if net_charge is None: |
|
|
return jsonify({'error': 'Could not extract net charge from tleap output'}), 500 |
|
|
|
|
|
|
|
|
if net_charge > 0: |
|
|
suggestion = "The system is positively charged. Add Cl- to neutralize." |
|
|
ion_type = "Cl-" |
|
|
ion_count = int(round(net_charge)) |
|
|
elif net_charge < 0: |
|
|
suggestion = "The system is negatively charged. Add Na+ to neutralize." |
|
|
ion_type = "Na+" |
|
|
ion_count = int(round(abs(net_charge))) |
|
|
else: |
|
|
suggestion = "The system is neutral. No ions needed." |
|
|
ion_type = "None" |
|
|
ion_count = 0 |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'net_charge': net_charge, |
|
|
'suggestion': suggestion, |
|
|
'ion_type': ion_type, |
|
|
'ion_count': ion_count, |
|
|
'ligand_present': ligand_present |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating net charge: {str(e)}") |
|
|
return jsonify({'error': f'Internal server error: {str(e)}'}), 500 |
|
|
|
|
|
@app.route('/api/generate-all-files', methods=['POST']) |
|
|
def generate_all_files(): |
|
|
"""Generate all simulation input files based on UI parameters""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
|
|
|
cutoff_distance = data.get('cutoff_distance', 10.0) |
|
|
temperature = data.get('temperature', 310.0) |
|
|
pressure = data.get('pressure', 1.0) |
|
|
|
|
|
|
|
|
restrained_steps = data.get('restrained_steps', 10000) |
|
|
restrained_force = data.get('restrained_force', 10.0) |
|
|
min_steps = data.get('min_steps', 20000) |
|
|
npt_heating_steps = data.get('npt_heating_steps', 50000) |
|
|
npt_equilibration_steps = data.get('npt_equilibration_steps', 100000) |
|
|
production_steps = data.get('production_steps', 1000000) |
|
|
|
|
|
dt = data.get('timestep', 0.002) |
|
|
|
|
|
|
|
|
force_field = data.get('force_field', 'ff14SB') |
|
|
water_model = data.get('water_model', 'TIP3P') |
|
|
add_ions = data.get('add_ions', 'None') |
|
|
distance = data.get('distance', 10.0) |
|
|
|
|
|
|
|
|
warnings = [] |
|
|
if restrained_steps < 5000: |
|
|
warnings.append("Restrained minimization steps should be at least 5000") |
|
|
if min_steps < 10000: |
|
|
warnings.append("Minimization steps should be at least 10000") |
|
|
|
|
|
|
|
|
tleap_ready_file = OUTPUT_DIR / "tleap_ready.pdb" |
|
|
if not tleap_ready_file.exists(): |
|
|
return jsonify({'error': 'tleap_ready.pdb not found. Please prepare structure first.'}), 400 |
|
|
|
|
|
total_residues = count_residues_in_pdb(str(tleap_ready_file)) |
|
|
|
|
|
|
|
|
generate_min_restrained_file(restrained_steps, restrained_force, total_residues, cutoff_distance) |
|
|
|
|
|
|
|
|
generate_min_file(min_steps, cutoff_distance) |
|
|
|
|
|
|
|
|
generate_heat_npt_file(npt_heating_steps, temperature, pressure, cutoff_distance, dt) |
|
|
|
|
|
|
|
|
generate_npt_equilibration_file(npt_equilibration_steps, temperature, pressure, cutoff_distance, dt) |
|
|
|
|
|
|
|
|
plumed_file = OUTPUT_DIR / 'plumed.dat' |
|
|
use_plumed = plumed_file.exists() |
|
|
|
|
|
|
|
|
generate_production_file(production_steps, temperature, pressure, cutoff_distance, dt, use_plumed=use_plumed) |
|
|
|
|
|
|
|
|
ff_files_generated = [] |
|
|
try: |
|
|
generate_ff_parameters_file(force_field, water_model, add_ions, distance) |
|
|
|
|
|
|
|
|
tleap_path = None |
|
|
try: |
|
|
result = subprocess.run(['which', 'tleap'], capture_output=True, text=True) |
|
|
if result.returncode == 0: |
|
|
tleap_path = result.stdout.strip() |
|
|
except: |
|
|
pass |
|
|
|
|
|
if not tleap_path: |
|
|
conda_prefix = os.environ.get('CONDA_PREFIX') |
|
|
if conda_prefix: |
|
|
tleap_path = os.path.join(conda_prefix, 'bin', 'tleap') |
|
|
else: |
|
|
tleap_path = '/home/hn533621/.conda/envs/MD_pipeline/bin/tleap' |
|
|
|
|
|
|
|
|
cmd = f"{tleap_path} -f generate_ff_parameters.in" |
|
|
result = subprocess.run(cmd, shell=True, cwd=str(OUTPUT_DIR), |
|
|
capture_output=True, text=True, timeout=300) |
|
|
|
|
|
if result.returncode != 0: |
|
|
warnings.append(f"Force field generation failed: {result.stderr}") |
|
|
else: |
|
|
|
|
|
ff_output_files = ['protein.prmtop', 'protein.inpcrd', 'protein_solvated.pdb'] |
|
|
for ff_file in ff_output_files: |
|
|
if (OUTPUT_DIR / ff_file).exists(): |
|
|
ff_files_generated.append(ff_file) |
|
|
|
|
|
if len(ff_files_generated) == 0: |
|
|
warnings.append("Force field parameter files were not generated") |
|
|
|
|
|
except Exception as ff_error: |
|
|
warnings.append(f"Force field generation error: {str(ff_error)}") |
|
|
|
|
|
|
|
|
pbs_generated = generate_submit_pbs_file(use_plumed=use_plumed) |
|
|
|
|
|
all_files = [ |
|
|
'min_restrained.in', |
|
|
'min.in', |
|
|
'HeatNPT.in', |
|
|
'mdin_equi.in', |
|
|
'mdin_prod.in' |
|
|
] + ff_files_generated |
|
|
|
|
|
if pbs_generated: |
|
|
all_files.append('submit_job.pbs') |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'message': f'All simulation files generated successfully ({len(all_files)} files)', |
|
|
'warnings': warnings, |
|
|
'files_generated': all_files |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error generating simulation files: {str(e)}") |
|
|
return jsonify({'error': f'Internal server error: {str(e)}'}), 500 |
|
|
|
|
|
def count_residues_in_pdb(pdb_file): |
|
|
"""Count total number of residues in PDB file""" |
|
|
try: |
|
|
with open(pdb_file, 'r') as f: |
|
|
lines = f.readlines() |
|
|
|
|
|
residues = set() |
|
|
for line in lines: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
|
|
|
residue_num = line[22:26].strip() |
|
|
if residue_num: |
|
|
residues.add(residue_num) |
|
|
|
|
|
return len(residues) |
|
|
except Exception as e: |
|
|
logger.error(f"Error counting residues: {str(e)}") |
|
|
return 607 |
|
|
|
|
|
def generate_min_restrained_file(steps, force_constant, total_residues, cutoff): |
|
|
"""Generate min_restrained.in file""" |
|
|
content = f"""initial minimization solvent + ions |
|
|
&cntrl |
|
|
imin = 1, |
|
|
maxcyc = {steps}, |
|
|
ncyc = {steps // 2}, |
|
|
ntb = 1, |
|
|
ntr = 1, |
|
|
ntxo = 1, |
|
|
cut = {cutoff} |
|
|
/ |
|
|
Restrain |
|
|
{force_constant} |
|
|
RES 1 {total_residues} |
|
|
END |
|
|
END |
|
|
|
|
|
""" |
|
|
|
|
|
with open(OUTPUT_DIR / "min_restrained.in", 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
def generate_min_file(steps, cutoff): |
|
|
"""Generate min.in file""" |
|
|
content = f"""Minimization |
|
|
&cntrl |
|
|
imin=1, |
|
|
maxcyc={steps}, |
|
|
ncyc={steps // 4}, |
|
|
ntb=1, |
|
|
cut={cutoff}, |
|
|
igb=0, |
|
|
ntr=0, |
|
|
/ |
|
|
|
|
|
""" |
|
|
|
|
|
with open(OUTPUT_DIR / "min.in", 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
def generate_heat_npt_file(steps, temperature, pressure, cutoff, dt=0.002): |
|
|
"""Generate HeatNPT.in file with temperature ramping""" |
|
|
|
|
|
step1 = int(steps * 0.2) |
|
|
step2 = int(steps * 0.2) |
|
|
step3 = int(steps * 0.2) |
|
|
step4 = int(steps * 0.4) |
|
|
|
|
|
|
|
|
temp1 = temperature * 0.03 |
|
|
temp2 = temperature * 0.66 |
|
|
temp3 = temperature |
|
|
temp4 = temperature |
|
|
|
|
|
content = f"""Heat |
|
|
&cntrl |
|
|
imin = 0, irest = 0, ntx = 1, |
|
|
ntb = 2, pres0 = {pressure}, ntp = 1, |
|
|
taup = 2.0, |
|
|
cut = {cutoff}, ntr = 0, |
|
|
ntc = 2, ntf = 2, |
|
|
tempi = 0, temp0 = {temperature}, |
|
|
ntt = 3, gamma_ln = 1.0, |
|
|
nstlim = {steps}, dt = {dt}, |
|
|
ntpr = 2000, ntwx = 2000, ntwr = 2000 |
|
|
/ |
|
|
&wt type='TEMP0', istep1=0, istep2={step1}, value1=0.0, value2={temp1} / |
|
|
&wt type='TEMP0', istep1={step1+1}, istep2={step1+step2}, value1={temp1}, value2={temp2} / |
|
|
&wt type='TEMP0', istep1={step1+step2+1}, istep2={step1+step2+step3}, value1={temp2}, value2={temp3} / |
|
|
&wt type='TEMP0', istep1={step1+step2+step3+1}, istep2={steps}, value1={temp3}, value2={temp4} / |
|
|
&wt type='END' / |
|
|
|
|
|
""" |
|
|
|
|
|
with open(OUTPUT_DIR / "HeatNPT.in", 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
def generate_npt_equilibration_file(steps, temperature, pressure, cutoff, dt=0.002): |
|
|
"""Generate mdin_equi.in file for NPT equilibration""" |
|
|
content = f"""NPT Equilibration |
|
|
&cntrl |
|
|
imin=0, |
|
|
ntx=1, |
|
|
irest=0, |
|
|
pres0={pressure}, |
|
|
taup=1.0, |
|
|
temp0={temperature}, |
|
|
tempi={temperature}, |
|
|
nstlim={steps}, |
|
|
dt={dt}, |
|
|
ntf=2, |
|
|
ntc=2, |
|
|
ntpr=500, |
|
|
ntwx=500, |
|
|
ntwr=500, |
|
|
cut={cutoff}, |
|
|
ntb=2, |
|
|
ntp=1, |
|
|
ntt=3, |
|
|
gamma_ln=3.0, |
|
|
ig=-1, |
|
|
iwrap=1, |
|
|
ntr=0, |
|
|
/ |
|
|
|
|
|
""" |
|
|
|
|
|
with open(OUTPUT_DIR / "mdin_equi.in", 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
def generate_production_file(steps, temperature, pressure, cutoff, dt=0.002, use_plumed=False): |
|
|
"""Generate mdin_prod.in file for production run""" |
|
|
content = f"""Production Run |
|
|
&cntrl |
|
|
imin=0, |
|
|
ntx=1, |
|
|
irest=0, |
|
|
pres0={pressure}, |
|
|
taup=1.0, |
|
|
temp0={temperature}, |
|
|
tempi={temperature}, |
|
|
nstlim={steps}, |
|
|
dt={dt}, |
|
|
ntf=2, |
|
|
ntc=2, |
|
|
ntpr=1000, |
|
|
ntwx=1000, |
|
|
ntwr=1000, |
|
|
cut={cutoff}, |
|
|
ntb=2, |
|
|
ntp=1, |
|
|
ntt=3, |
|
|
gamma_ln=3.0, |
|
|
ig=-1, |
|
|
iwrap=1, |
|
|
ntr=0, |
|
|
""" |
|
|
|
|
|
|
|
|
if use_plumed: |
|
|
content += " plumed=1,\n" |
|
|
content += " plumedfile='plumed.dat'\n" |
|
|
|
|
|
content += "/\n\n" |
|
|
|
|
|
with open(OUTPUT_DIR / "mdin_prod.in", 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
def generate_submit_pbs_file(use_plumed=False): |
|
|
"""Generate submit_job.pbs file for SLURM job submission""" |
|
|
try: |
|
|
|
|
|
output_dir_abs = OUTPUT_DIR.resolve() |
|
|
|
|
|
|
|
|
content = """#!/bin/bash |
|
|
#SBATCH -D {working_dir} # Critical: Sets working dir |
|
|
#SBATCH --job-name=job_name |
|
|
#SBATCH --partition=defq |
|
|
#SBATCH --get-user-env |
|
|
#SBATCH --nodes=1 |
|
|
#SBATCH --tasks-per-node=1 |
|
|
#SBATCH --cpus-per-task=1 |
|
|
#SBATCH --gres=gpu:1 |
|
|
#SBATCH --time=168:00:00 |
|
|
|
|
|
|
|
|
module load amber/24 |
|
|
""".format(working_dir=str(output_dir_abs)) |
|
|
|
|
|
|
|
|
if use_plumed: |
|
|
content += "module load plumed/2.9.1\n" |
|
|
|
|
|
content += """ |
|
|
pmemd.cuda -O -i min_restrained.in -o min_restrained.out -p protein.prmtop -c protein.inpcrd -r min_res.ncrst -x min_res.nc -ref protein.inpcrd -inf min_res.mdinfo |
|
|
pmemd.cuda -O -i min.in -o min.out -p protein.prmtop -c min_res.ncrst -r min.ncrst -x min.nc -inf min.mdinfo |
|
|
pmemd.cuda -O -i HeatNPT.in -o HeatNPT.out -p protein.prmtop -c min.ncrst -r HeatNPT.ncrst -x HeatNPT.nc -inf HeatNPT.mdinfo |
|
|
pmemd.cuda -O -i mdin_equi.in -o mdin_equi.out -p protein.prmtop -c HeatNPT.ncrst -r mdin_equi.ncrst -x mdin_equi.nc -inf mdin_equi.mdinfo -ref protein.inpcrd |
|
|
pmemd.cuda -O -i mdin_prod.in -o mdin_prod.out -p protein.prmtop -c mdin_equi.ncrst -r mdin_prod.ncrst -x mdin_prod.nc -inf mdin_prod.mdinfo -ref protein.inpcrd |
|
|
""" |
|
|
|
|
|
|
|
|
with open(OUTPUT_DIR / "submit_job.pbs", 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
logger.info(f"Generated submit_job.pbs in {OUTPUT_DIR}") |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating submit_job.pbs: {e}") |
|
|
return False |
|
|
|
|
|
@app.route('/api/health', methods=['GET']) |
|
|
def health_check(): |
|
|
"""Health check endpoint""" |
|
|
return jsonify({'status': 'healthy', 'message': 'MD Simulation Pipeline API is running'}) |
|
|
|
|
|
@app.route('/api/clean-output', methods=['POST']) |
|
|
def clean_output(): |
|
|
"""Clean output folder endpoint""" |
|
|
try: |
|
|
print("DEBUG: clean-output endpoint called") |
|
|
if clean_and_create_output_folder(): |
|
|
return jsonify({'success': True, 'message': 'Output folder cleaned successfully'}) |
|
|
else: |
|
|
return jsonify({'success': False, 'error': 'Failed to clean output folder'}), 500 |
|
|
except Exception as e: |
|
|
print(f"DEBUG: Error in clean-output: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/save-pdb-file', methods=['POST']) |
|
|
def save_pdb_file(): |
|
|
"""Save PDB file to output directory""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
pdb_content = data.get('pdb_content', '') |
|
|
filename = data.get('filename', 'input.pdb') |
|
|
|
|
|
if not pdb_content: |
|
|
return jsonify({'success': False, 'error': 'No PDB content provided'}), 400 |
|
|
|
|
|
|
|
|
output_file = OUTPUT_DIR / "0_original_input.pdb" |
|
|
with open(output_file, 'w') as f: |
|
|
f.write(pdb_content) |
|
|
|
|
|
logger.info(f"Saved PDB file to {output_file}") |
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'message': f'PDB file saved successfully', |
|
|
'file_path': str(output_file) |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving PDB file: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/save-plumed-file', methods=['POST']) |
|
|
def save_plumed_file(): |
|
|
"""Save PLUMED file to output directory""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
plumed_content = data.get('plumed_content', '') |
|
|
filename = data.get('filename', 'plumed.dat') |
|
|
|
|
|
if not plumed_content: |
|
|
return jsonify({'success': False, 'error': 'No PLUMED content provided'}), 400 |
|
|
|
|
|
|
|
|
if not filename.endswith('.dat'): |
|
|
filename = filename if '.' in filename else f"{filename}.dat" |
|
|
|
|
|
|
|
|
output_file = OUTPUT_DIR / filename |
|
|
with open(output_file, 'w') as f: |
|
|
f.write(plumed_content) |
|
|
|
|
|
logger.info(f"Saved PLUMED file to {output_file}") |
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'message': f'PLUMED file saved successfully to output/{filename}', |
|
|
'file_path': str(output_file), |
|
|
'filename': filename |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving PLUMED file: {str(e)}") |
|
|
return jsonify({'success': False, 'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/api/download-output-zip', methods=['GET']) |
|
|
def download_output_zip(): |
|
|
"""Create a ZIP of the output folder and return it for download""" |
|
|
try: |
|
|
if not OUTPUT_DIR.exists(): |
|
|
return jsonify({'error': 'Output directory not found'}), 404 |
|
|
|
|
|
import tempfile |
|
|
import shutil |
|
|
|
|
|
|
|
|
tmp_dir = tempfile.mkdtemp() |
|
|
zip_base = os.path.join(tmp_dir, 'output') |
|
|
zip_path = shutil.make_archive(zip_base, 'zip', root_dir=str(OUTPUT_DIR)) |
|
|
|
|
|
|
|
|
return send_file(zip_path, as_attachment=True, download_name='output.zip') |
|
|
except Exception as e: |
|
|
logger.error(f"Error creating output ZIP: {str(e)}") |
|
|
return jsonify({'error': f'Failed to create ZIP: {str(e)}'}), 500 |
|
|
|
|
|
@app.route('/api/get-generated-files', methods=['GET']) |
|
|
def get_generated_files(): |
|
|
"""Return contents of known generated input files for preview""" |
|
|
try: |
|
|
files_to_read = [ |
|
|
'min_restrained.in', |
|
|
'min.in', |
|
|
'HeatNPT.in', |
|
|
'mdin_equi.in', |
|
|
'mdin_prod.in', |
|
|
'submit_job.pbs' |
|
|
] |
|
|
|
|
|
excluded_files = [ |
|
|
'calc_charge_on_system.in', |
|
|
'generate_ff_parameters.in', |
|
|
'sqm.in' |
|
|
] |
|
|
|
|
|
|
|
|
def _is_esmfold_minimization_file(name): |
|
|
if name.startswith('tleap_') and name.endswith('.in'): |
|
|
return True |
|
|
|
|
|
if name.startswith('min_') and name.endswith('.in') and len(name) == 8: |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
user_created_files = [] |
|
|
try: |
|
|
for file_path in OUTPUT_DIR.glob("*.in"): |
|
|
filename = file_path.name |
|
|
|
|
|
if (filename not in files_to_read and filename not in excluded_files |
|
|
and not _is_esmfold_minimization_file(filename)): |
|
|
user_created_files.append(filename) |
|
|
except Exception as e: |
|
|
logger.warning(f"Error scanning for user-created files: {e}") |
|
|
|
|
|
|
|
|
all_files = files_to_read + sorted(user_created_files) |
|
|
|
|
|
result = {} |
|
|
for name in all_files: |
|
|
path = OUTPUT_DIR / name |
|
|
if path.exists(): |
|
|
try: |
|
|
with open(path, 'r') as f: |
|
|
result[name] = f.read() |
|
|
except Exception as fe: |
|
|
result[name] = f"<error reading file: {fe}>" |
|
|
else: |
|
|
result[name] = "<file not found>" |
|
|
return jsonify({'success': True, 'files': result}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error reading generated files: {str(e)}") |
|
|
return jsonify({'error': f'Failed to read files: {str(e)}'}), 500 |
|
|
|
|
|
@app.route('/api/save-file', methods=['POST']) |
|
|
def save_file(): |
|
|
"""Save edited file content back to the output directory""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
filename = data.get('filename') |
|
|
content = data.get('content') |
|
|
|
|
|
if not filename: |
|
|
return jsonify({'success': False, 'error': 'Filename is required'}), 400 |
|
|
|
|
|
if content is None: |
|
|
return jsonify({'success': False, 'error': 'Content is required'}), 400 |
|
|
|
|
|
|
|
|
allowed_files = [ |
|
|
'min_restrained.in', |
|
|
'min.in', |
|
|
'HeatNPT.in', |
|
|
'mdin_equi.in', |
|
|
'mdin_prod.in', |
|
|
'submit_job.pbs', |
|
|
'plumed.dat' |
|
|
] |
|
|
|
|
|
if filename not in allowed_files: |
|
|
return jsonify({'success': False, 'error': f'File "{filename}" is not allowed to be edited'}), 403 |
|
|
|
|
|
|
|
|
if '/' in filename or '\\' in filename or '..' in filename: |
|
|
return jsonify({'success': False, 'error': 'Invalid filename'}), 400 |
|
|
|
|
|
|
|
|
file_path = OUTPUT_DIR / filename |
|
|
try: |
|
|
with open(file_path, 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
logger.info(f"File {filename} saved successfully") |
|
|
return jsonify({'success': True, 'message': f'File {filename} saved successfully'}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error writing file {filename}: {str(e)}") |
|
|
return jsonify({'success': False, 'error': f'Failed to write file: {str(e)}'}), 500 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving file: {str(e)}") |
|
|
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 |
|
|
|
|
|
@app.route('/api/save-new-file', methods=['POST']) |
|
|
def save_new_file(): |
|
|
"""Save a new simulation file created by the user""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
filename = data.get('filename') |
|
|
content = data.get('content') |
|
|
|
|
|
if not filename: |
|
|
return jsonify({'success': False, 'error': 'Filename is required'}), 400 |
|
|
|
|
|
if content is None: |
|
|
return jsonify({'success': False, 'error': 'Content is required'}), 400 |
|
|
|
|
|
|
|
|
if not filename.endswith('.in'): |
|
|
return jsonify({'success': False, 'error': 'File name must end with .in extension'}), 400 |
|
|
|
|
|
|
|
|
if '/' in filename or '\\' in filename or '..' in filename: |
|
|
return jsonify({'success': False, 'error': 'Invalid filename'}), 400 |
|
|
|
|
|
|
|
|
file_path = OUTPUT_DIR / filename |
|
|
try: |
|
|
with open(file_path, 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
logger.info(f"New file {filename} saved successfully") |
|
|
return jsonify({'success': True, 'message': f'File {filename} saved successfully'}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error writing new file {filename}: {str(e)}") |
|
|
return jsonify({'success': False, 'error': f'Failed to write file: {str(e)}'}), 500 |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving new file: {str(e)}") |
|
|
return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 |
|
|
|
|
|
def get_ligand_residue_name(): |
|
|
"""Extract first ligand residue name from tleap_ready.pdb (for backward compatibility)""" |
|
|
ligand_names = get_all_ligand_residue_names() |
|
|
return ligand_names[0] if ligand_names else "LIG" |
|
|
|
|
|
def generate_ff_parameters_file(force_field, water_model, add_ions, distance): |
|
|
"""Generate the final force field parameters file with dynamic values""" |
|
|
|
|
|
print(f"DEBUG: force_field={force_field}, water_model={water_model}, add_ions={add_ions}, distance={distance}") |
|
|
|
|
|
|
|
|
ligand_mol2_files = [] |
|
|
ligand_frcmod_files = [] |
|
|
ligand_resname_map = {} |
|
|
|
|
|
|
|
|
unique_resnames = get_all_ligand_residue_names() |
|
|
for resname in unique_resnames: |
|
|
mol2_file = OUTPUT_DIR / f"{resname}.mol2" |
|
|
frcmod_file = OUTPUT_DIR / f"{resname}.frcmod" |
|
|
if mol2_file.exists() and frcmod_file.exists(): |
|
|
ligand_resname_map[resname] = (mol2_file, frcmod_file) |
|
|
ligand_mol2_files.append(mol2_file) |
|
|
ligand_frcmod_files.append(frcmod_file) |
|
|
|
|
|
|
|
|
if not ligand_mol2_files: |
|
|
numbered_mol2 = sorted(OUTPUT_DIR.glob("4_ligands_corrected_*.mol2")) |
|
|
numbered_frcmod = sorted(OUTPUT_DIR.glob("4_ligands_corrected_*.frcmod")) |
|
|
if numbered_mol2 and numbered_frcmod: |
|
|
ligand_mol2_files = numbered_mol2 |
|
|
ligand_frcmod_files = numbered_frcmod |
|
|
|
|
|
resnames = get_all_ligand_residue_names() |
|
|
for i, (mol2_file, frcmod_file) in enumerate(zip(ligand_mol2_files, ligand_frcmod_files)): |
|
|
|
|
|
resname = get_residue_name_from_mol2(mol2_file) if mol2_file.exists() else None |
|
|
if not resname: |
|
|
|
|
|
if resnames and i < len(resnames): |
|
|
resname = resnames[i] |
|
|
else: |
|
|
resname = f"LIG{len(ligand_resname_map) + 1}" |
|
|
|
|
|
if resname not in ligand_resname_map: |
|
|
ligand_resname_map[resname] = (mol2_file, frcmod_file) |
|
|
|
|
|
|
|
|
if not ligand_mol2_files: |
|
|
single_mol2 = OUTPUT_DIR / "4_ligands_corrected.mol2" |
|
|
single_frcmod = OUTPUT_DIR / "4_ligands_corrected.frcmod" |
|
|
if single_mol2.exists() and single_frcmod.exists(): |
|
|
ligand_mol2_files = [single_mol2] |
|
|
ligand_frcmod_files = [single_frcmod] |
|
|
resnames = get_all_ligand_residue_names() |
|
|
if resnames: |
|
|
ligand_resname_map[resnames[0]] = (single_mol2, single_frcmod) |
|
|
else: |
|
|
ligand_resname_map["LIG"] = (single_mol2, single_frcmod) |
|
|
|
|
|
ligand_present = len(ligand_mol2_files) > 0 and len(ligand_frcmod_files) > 0 |
|
|
|
|
|
|
|
|
content = f"source leaprc.protein.{force_field}\n" |
|
|
|
|
|
|
|
|
print(f"DEBUG: water_model={water_model}") |
|
|
if water_model.lower() == "tip3p": |
|
|
content += "source leaprc.water.tip3p\n" |
|
|
elif water_model == "spce": |
|
|
content += "source leaprc.water.spce\n" |
|
|
|
|
|
|
|
|
if ligand_present: |
|
|
content += "source leaprc.gaff2\n\n" |
|
|
|
|
|
|
|
|
|
|
|
for resname in sorted(ligand_resname_map.keys()): |
|
|
mol2_file, frcmod_file = ligand_resname_map[resname] |
|
|
content += f"loadamberparams {frcmod_file.name}\n" |
|
|
content += f"{resname} = loadmol2 {mol2_file.name}\n" |
|
|
content += "\n" |
|
|
else: |
|
|
content += "\n" |
|
|
|
|
|
content += "x = loadpdb tleap_ready.pdb\n\n" |
|
|
content += "charge x\n\n" |
|
|
|
|
|
|
|
|
if add_ions == "Na+": |
|
|
content += "addions x Na+ 0.0\n\n" |
|
|
elif add_ions == "Cl-": |
|
|
content += "addions x Cl- 0.0\n\n" |
|
|
|
|
|
|
|
|
|
|
|
if water_model.lower() == "tip3p": |
|
|
content += f"solvateBox x TIP3PBOX {distance}\n\n" |
|
|
elif water_model.lower() == "spce": |
|
|
content += f"solvateBox x SPCBOX {distance}\n\n" |
|
|
|
|
|
content += "saveamberparm x protein.prmtop protein.inpcrd\n\n" |
|
|
content += "savepdb x protein_solvated.pdb\n\n" |
|
|
content += "quit\n" |
|
|
|
|
|
|
|
|
print("DEBUG: Generated content:") |
|
|
print(content) |
|
|
|
|
|
|
|
|
with open(OUTPUT_DIR / "generate_ff_parameters.in", 'w') as f: |
|
|
f.write(content) |
|
|
|
|
|
def get_residue_name_from_pdb(pdb_file, sanitize: bool = True): |
|
|
""" |
|
|
Extract residue name from a ligand PDB file. |
|
|
If sanitize=True, validates and updates numeric residue names in the file. |
|
|
""" |
|
|
try: |
|
|
residue_name = None |
|
|
with open(pdb_file, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith(('ATOM', 'HETATM')): |
|
|
|
|
|
residue_name = line[17:20].strip() |
|
|
if residue_name and residue_name not in ['HOH', 'WAT', 'TIP', 'SPC', 'NA', 'CL']: |
|
|
break |
|
|
|
|
|
if not residue_name: |
|
|
return None |
|
|
|
|
|
|
|
|
if sanitize: |
|
|
sanitized_name, was_changed = _validate_and_sanitize_ligand_name(residue_name) |
|
|
if was_changed: |
|
|
original_name = residue_name |
|
|
logger.warning( |
|
|
f"Ligand residue name '{original_name}' in {Path(pdb_file).name} is pure numeric. " |
|
|
f"Changed to '{sanitized_name}' (3-letter code) to avoid errors. " |
|
|
f"The PDB file has been updated." |
|
|
) |
|
|
_update_pdb_residue_name(Path(pdb_file), residue_name, sanitized_name) |
|
|
residue_name = sanitized_name |
|
|
|
|
|
return residue_name |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not extract residue name from {pdb_file}: {e}") |
|
|
return None |
|
|
|
|
|
def get_residue_name_from_mol2(mol2_file): |
|
|
"""Extract residue name from a mol2 file (from @<TRIPOS>MOLECULE section)""" |
|
|
try: |
|
|
with open(mol2_file, 'r') as f: |
|
|
lines = f.readlines() |
|
|
|
|
|
in_molecule = False |
|
|
for i, line in enumerate(lines): |
|
|
if '@<TRIPOS>MOLECULE' in line: |
|
|
in_molecule = True |
|
|
|
|
|
if i + 1 < len(lines): |
|
|
resname = lines[i + 1].strip() |
|
|
|
|
|
resname = resname.split()[0] if resname.split() else resname |
|
|
return resname |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not extract residue name from {mol2_file}: {e}") |
|
|
return None |
|
|
|
|
|
def get_all_ligand_residue_names(sanitize: bool = True): |
|
|
""" |
|
|
Extract all unique ligand residue names from tleap_ready.pdb. |
|
|
If sanitize=True, validates and updates numeric residue names in the file. |
|
|
""" |
|
|
ligand_names = [] |
|
|
try: |
|
|
tleap_ready_path = OUTPUT_DIR / "tleap_ready.pdb" |
|
|
if not tleap_ready_path.exists(): |
|
|
return [] |
|
|
|
|
|
seen_residues = set() |
|
|
residues_to_update = {} |
|
|
|
|
|
|
|
|
with open(tleap_ready_path, 'r') as f: |
|
|
for line in f: |
|
|
if line.startswith('HETATM'): |
|
|
|
|
|
residue_name = line[17:20].strip() |
|
|
if residue_name and residue_name not in ['HOH', 'WAT', 'TIP', 'SPC', 'NA', 'CL']: |
|
|
if residue_name not in seen_residues: |
|
|
|
|
|
if sanitize: |
|
|
sanitized_name, was_changed = _validate_and_sanitize_ligand_name(residue_name) |
|
|
if was_changed: |
|
|
residues_to_update[residue_name] = sanitized_name |
|
|
residue_name = sanitized_name |
|
|
|
|
|
ligand_names.append(residue_name) |
|
|
seen_residues.add(residue_name) |
|
|
|
|
|
|
|
|
if sanitize and residues_to_update: |
|
|
for old_name, new_name in residues_to_update.items(): |
|
|
logger.warning( |
|
|
f"Ligand residue name '{old_name}' in tleap_ready.pdb is pure numeric. " |
|
|
f"Changed to '{new_name}' (3-letter code) to avoid errors. " |
|
|
f"The PDB file has been updated." |
|
|
) |
|
|
_update_pdb_residue_name(tleap_ready_path, old_name, new_name) |
|
|
|
|
|
return ligand_names |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not extract ligand residue names: {e}") |
|
|
return [] |
|
|
|
|
|
@app.route('/api/generate-ff-parameters', methods=['POST']) |
|
|
def generate_ff_parameters(): |
|
|
"""Generate final force field parameters using tleap""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
force_field = data.get('force_field', 'ff14SB') |
|
|
water_model = data.get('water_model', 'TIP3P') |
|
|
add_ions = data.get('add_ions', 'None') |
|
|
distance = data.get('distance', 10.0) |
|
|
|
|
|
|
|
|
generate_ff_parameters_file(force_field, water_model, add_ions, distance) |
|
|
|
|
|
|
|
|
tleap_path = None |
|
|
try: |
|
|
result = subprocess.run(['which', 'tleap'], capture_output=True, text=True) |
|
|
if result.returncode == 0: |
|
|
tleap_path = result.stdout.strip() |
|
|
except: |
|
|
pass |
|
|
|
|
|
if not tleap_path: |
|
|
conda_prefix = os.environ.get('CONDA_PREFIX') |
|
|
if conda_prefix: |
|
|
tleap_path = os.path.join(conda_prefix, 'bin', 'tleap') |
|
|
else: |
|
|
tleap_path = '/home/hn533621/.conda/envs/MD_pipeline/bin/tleap' |
|
|
|
|
|
|
|
|
cmd = f"{tleap_path} -f generate_ff_parameters.in" |
|
|
result = subprocess.run(cmd, shell=True, cwd=str(OUTPUT_DIR), |
|
|
capture_output=True, text=True, timeout=300) |
|
|
|
|
|
if result.returncode != 0: |
|
|
logger.error(f"tleap failed: {result.stderr}") |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': f'tleap failed: {result.stderr}' |
|
|
}), 500 |
|
|
|
|
|
|
|
|
output_files = ['protein.prmtop', 'protein.inpcrd', 'protein_solvated.pdb'] |
|
|
missing_files = [f for f in output_files if not (OUTPUT_DIR / f).exists()] |
|
|
|
|
|
if missing_files: |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': f'Missing output files: {", ".join(missing_files)}' |
|
|
}), 500 |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'message': 'Force field parameters generated successfully', |
|
|
'files_generated': output_files |
|
|
}) |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': 'tleap command timed out after 5 minutes' |
|
|
}), 500 |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating FF parameters: {str(e)}") |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': f'Failed to generate force field parameters: {str(e)}' |
|
|
}), 500 |
|
|
|
|
|
@app.route('/api/detect-missing-residues', methods=['POST']) |
|
|
def detect_missing_residues_endpoint(): |
|
|
"""Detect missing residues in the loaded PDB structure""" |
|
|
try: |
|
|
|
|
|
original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" |
|
|
if not original_pdb_path.exists(): |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': 'No PDB file loaded. Please load a PDB file first.' |
|
|
}), 400 |
|
|
|
|
|
|
|
|
try: |
|
|
pdb_id = get_pdb_id_from_pdb_file(str(original_pdb_path)) |
|
|
except ValueError as e: |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': f'Could not determine PDB ID: {str(e)}' |
|
|
}), 400 |
|
|
|
|
|
|
|
|
missing = detect_missing_residues(pdb_id) |
|
|
|
|
|
|
|
|
chain_sequences = get_chain_sequences(pdb_id) |
|
|
|
|
|
|
|
|
chains_with_missing = { |
|
|
chain: chain_sequences[chain] |
|
|
for chain in missing |
|
|
if chain in chain_sequences |
|
|
} |
|
|
|
|
|
|
|
|
missing_info = {} |
|
|
for chain, missing_list in missing.items(): |
|
|
missing_info[chain] = { |
|
|
'count': len(missing_list), |
|
|
'residues': missing_list |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
chain_first_residue = {} |
|
|
chain_sequence_start = {} |
|
|
try: |
|
|
original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" |
|
|
if original_pdb_path.exists(): |
|
|
with open(original_pdb_path, 'r') as f: |
|
|
pdb_lines = f.readlines() |
|
|
|
|
|
|
|
|
for line in pdb_lines: |
|
|
if line.startswith('ATOM') or line.startswith('HETATM'): |
|
|
chain_id = line[21:22].strip() |
|
|
if chain_id and chain_id not in chain_first_residue: |
|
|
|
|
|
residue_str = line[22:26].strip() |
|
|
try: |
|
|
|
|
|
import re |
|
|
match = re.match(r'(-?\d+)', residue_str) |
|
|
if match: |
|
|
residue_num = int(match.group(1)) |
|
|
chain_first_residue[chain_id] = residue_num |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for chain_id, first_pdb_residue in chain_first_residue.items(): |
|
|
|
|
|
|
|
|
min_missing_before = None |
|
|
if chain_id in missing_info: |
|
|
for resname, resnum in missing_info[chain_id]['residues']: |
|
|
if resnum < first_pdb_residue: |
|
|
if min_missing_before is None or resnum < min_missing_before: |
|
|
min_missing_before = resnum |
|
|
|
|
|
if min_missing_before is not None: |
|
|
|
|
|
|
|
|
sequence_start = min_missing_before |
|
|
else: |
|
|
|
|
|
sequence_start = first_pdb_residue |
|
|
|
|
|
chain_sequence_start[chain_id] = sequence_start |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not determine first residue numbers: {str(e)}") |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'pdb_id': pdb_id, |
|
|
'missing_residues': missing_info, |
|
|
'chains_with_missing': list(chains_with_missing.keys()), |
|
|
'chain_sequences': chain_sequences, |
|
|
'chain_first_residue': chain_first_residue, |
|
|
'chain_sequence_start': chain_sequence_start |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error detecting missing residues: {str(e)}") |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': f'Failed to detect missing residues: {str(e)}' |
|
|
}), 500 |
|
|
|
|
|
@app.route('/api/trim-residues', methods=['POST']) |
|
|
def trim_residues_endpoint(): |
|
|
"""Trim residues from edges of chain sequences""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
chain_sequences = data.get('chain_sequences', {}) |
|
|
trim_specs = data.get('trim_specs', {}) |
|
|
pdb_id = data.get('pdb_id') |
|
|
|
|
|
if not chain_sequences: |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': 'No chain sequences provided' |
|
|
}), 400 |
|
|
|
|
|
if not trim_specs: |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': 'No trim specifications provided' |
|
|
}), 400 |
|
|
|
|
|
|
|
|
try: |
|
|
trimmed_sequences = trim_chains_sequences(chain_sequences, trim_specs) |
|
|
except ValueError as e: |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
}), 400 |
|
|
|
|
|
|
|
|
if pdb_id: |
|
|
try: |
|
|
write_fasta_for_missing_chains( |
|
|
pdb_id, |
|
|
trimmed_sequences, |
|
|
output_dir=str(OUTPUT_DIR) |
|
|
) |
|
|
logger.info(f"Wrote trimmed FASTA file for PDB {pdb_id}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not write trimmed FASTA file: {str(e)}") |
|
|
|
|
|
|
|
|
trim_info = {} |
|
|
for chain, spec in trim_specs.items(): |
|
|
original_len = len(chain_sequences.get(chain, '')) |
|
|
trimmed_len = len(trimmed_sequences.get(chain, '')) |
|
|
trim_info[chain] = { |
|
|
'original_length': original_len, |
|
|
'trimmed_length': trimmed_len, |
|
|
'n_terminal_trimmed': spec.get('n_terminal', 0), |
|
|
'c_terminal_trimmed': spec.get('c_terminal', 0) |
|
|
} |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'trimmed_sequences': trimmed_sequences, |
|
|
'trim_info': trim_info, |
|
|
'message': f'Successfully trimmed residues from {len(trim_specs)} chain(s)' |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error trimming residues: {str(e)}") |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': f'Failed to trim residues: {str(e)}' |
|
|
}), 500 |
|
|
|
|
|
@app.route('/api/build-completed-structure', methods=['POST']) |
|
|
@stream_with_context |
|
|
def build_completed_structure_endpoint(): |
|
|
"""Build completed structure using ESMFold for selected chains with streaming logs""" |
|
|
def generate(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
selected_chains = data.get('selected_chains', []) |
|
|
|
|
|
if not selected_chains: |
|
|
yield _format_log('❌ No chains selected for completion', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'No chains selected for completion'})}\n\n" |
|
|
return |
|
|
|
|
|
yield _format_log(f"Starting ESMFold structure completion for chains: {', '.join(selected_chains)}") |
|
|
|
|
|
|
|
|
original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" |
|
|
if not original_pdb_path.exists(): |
|
|
yield _format_log('❌ No PDB file loaded. Please load a PDB file first.', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'No PDB file loaded. Please load a PDB file first.'})}\n\n" |
|
|
return |
|
|
|
|
|
original_for_align = OUTPUT_DIR / "0_original_input_backup.pdb" |
|
|
original_for_align = original_for_align if original_for_align.exists() else original_pdb_path |
|
|
|
|
|
|
|
|
try: |
|
|
pdb_id = get_pdb_id_from_pdb_file(str(original_pdb_path)) |
|
|
yield _format_log(f"Detected PDB ID: {pdb_id}") |
|
|
except ValueError as e: |
|
|
yield _format_log(f'❌ Could not determine PDB ID: {str(e)}', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Could not determine PDB ID: {str(e)}'})}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
provided_sequences = data.get('chain_sequences', None) |
|
|
if provided_sequences: |
|
|
chain_sequences = provided_sequences |
|
|
yield _format_log("Using provided chain sequences (may be trimmed)") |
|
|
else: |
|
|
yield _format_log("Fetching chain sequences from PDB database...") |
|
|
chain_sequences = get_chain_sequences(pdb_id) |
|
|
|
|
|
|
|
|
chains_to_process = [] |
|
|
for chain in selected_chains: |
|
|
if chain in chain_sequences: |
|
|
chains_to_process.append(chain) |
|
|
else: |
|
|
yield _format_log(f"⚠️ Chain {chain} not found in chain sequences", 'warning') |
|
|
|
|
|
if not chains_to_process: |
|
|
yield _format_log('❌ None of the selected chains have sequences available', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'None of the selected chains have sequences available'})}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
chains_with_missing = { |
|
|
chain: chain_sequences[chain] |
|
|
for chain in chains_to_process |
|
|
} |
|
|
|
|
|
|
|
|
try: |
|
|
write_fasta_for_missing_chains(pdb_id, chains_with_missing, output_dir=str(OUTPUT_DIR)) |
|
|
yield _format_log(f"Wrote FASTA file for chains: {chains_to_process}") |
|
|
except Exception as e: |
|
|
yield _format_log(f"⚠️ Could not write FASTA file: {str(e)}", 'warning') |
|
|
|
|
|
|
|
|
|
|
|
esmfold_results = {} |
|
|
for chain in chains_to_process: |
|
|
yield _format_log(f"Running ESMFold for chain {chain}...") |
|
|
seq = chain_sequences[chain] |
|
|
try: |
|
|
pdb_text = run_esmfold(seq) |
|
|
esmfold_results[chain] = pdb_text |
|
|
|
|
|
|
|
|
esm_pdb_filename = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.pdb" |
|
|
with open(esm_pdb_filename, 'w') as f: |
|
|
f.write(pdb_text) |
|
|
yield _format_log(f"✅ ESMFold completed for chain {chain}: {esm_pdb_filename.name}") |
|
|
except Exception as e: |
|
|
yield _format_log(f'❌ ESMFold failed for chain {chain}: {str(e)}', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'ESMFold failed for chain {chain}: {str(e)}'})}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
minimize_chains = data.get('minimize_chains', False) |
|
|
chains_to_minimize = data.get('chains_to_minimize', []) |
|
|
minimized_chains = [] |
|
|
if minimize_chains and chains_to_minimize: |
|
|
yield _format_log(f"\n{'='*60}") |
|
|
yield _format_log(f"Starting energy minimization for chains: {', '.join(chains_to_minimize)}") |
|
|
yield _format_log(f"{'='*60}") |
|
|
try: |
|
|
for chain in chains_to_minimize: |
|
|
yield _format_log(f"\nMinimizing chain {chain}...") |
|
|
for log_line in _minimize_esmfold_chains_streaming(pdb_id, [chain], original_for_align=original_for_align): |
|
|
yield log_line |
|
|
minimized_chains.append(chain) |
|
|
yield _format_log(f"✅ Chain {chain} minimization completed") |
|
|
min_status_file = OUTPUT_DIR / ".chains_minimized" |
|
|
with open(min_status_file, 'w') as f: |
|
|
f.write(','.join(minimized_chains)) |
|
|
yield _format_log(f"\n✅ All chains minimized successfully: {', '.join(minimized_chains)}") |
|
|
except Exception as e: |
|
|
yield _format_log(f'❌ Error during minimization: {str(e)}', 'error') |
|
|
|
|
|
|
|
|
output_pdb = OUTPUT_DIR / "0_complete_structure.pdb" |
|
|
yield _format_log("Rebuilding structure with PyMOL (superimposing to original)...") |
|
|
try: |
|
|
import tempfile |
|
|
import os |
|
|
chains_use_min_arg = repr(minimized_chains) if minimized_chains else "None" |
|
|
script_content = f"""#!/usr/bin/env python3 |
|
|
import sys |
|
|
import os |
|
|
|
|
|
# Add amberprep package to path (Fill_missing_residues is in amberprep/) |
|
|
sys.path.insert(0, r'{str(Path(__file__).parent)}') |
|
|
|
|
|
# Change to output directory |
|
|
os.chdir(r'{str(OUTPUT_DIR)}') |
|
|
|
|
|
# Import and run rebuild |
|
|
from Fill_missing_residues import rebuild_pdb_with_esmfold |
|
|
|
|
|
try: |
|
|
rebuild_pdb_with_esmfold( |
|
|
r'{pdb_id}', |
|
|
{repr(chains_to_process)}, |
|
|
output_pdb=r'{output_pdb.name}', |
|
|
original_pdb_path=r'{Path(original_for_align).name}', |
|
|
chains_use_minimized={chains_use_min_arg} |
|
|
) |
|
|
print("SUCCESS: Rebuild completed") |
|
|
except Exception as e: |
|
|
print(f"ERROR: {{e}}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
sys.exit(1) |
|
|
""" |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as script_file: |
|
|
script_file.write(script_content) |
|
|
script_path = script_file.name |
|
|
|
|
|
try: |
|
|
|
|
|
os.chmod(script_path, 0o755) |
|
|
|
|
|
|
|
|
result = subprocess.run( |
|
|
[sys.executable, script_path], |
|
|
capture_output=True, |
|
|
text=True, |
|
|
timeout=300, |
|
|
cwd=str(OUTPUT_DIR) |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
error_msg = result.stderr or result.stdout |
|
|
yield _format_log(f"❌ PyMOL rebuild failed: {error_msg}", 'error') |
|
|
|
|
|
if "pymol" in error_msg.lower() or "import" in error_msg.lower(): |
|
|
error_msg = f"PyMOL initialization failed. Make sure PyMOL is installed and accessible. Error: {error_msg}" |
|
|
else: |
|
|
error_msg = f"Rebuild failed: {error_msg}" |
|
|
yield _format_log(f"❌ {error_msg}", 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" |
|
|
return |
|
|
|
|
|
if "ERROR:" in result.stdout: |
|
|
error_line = [line for line in result.stdout.split('\\n') if 'ERROR:' in line] |
|
|
if error_line: |
|
|
error_msg = error_line[0].replace('ERROR:', '').strip() |
|
|
yield _format_log(f"❌ {error_msg}", 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" |
|
|
return |
|
|
|
|
|
if not output_pdb.exists(): |
|
|
error_msg = "Output file was not created" |
|
|
yield _format_log(f"❌ {error_msg}", 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" |
|
|
return |
|
|
|
|
|
yield _format_log(f"✅ Completed structure saved to {output_pdb.name}") |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
yield _format_log("❌ PyMOL rebuild timed out after 5 minutes", 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'PyMOL rebuild timed out. The structure might be too large. Please try again.'})}\n\n" |
|
|
return |
|
|
except Exception as e: |
|
|
yield _format_log(f"❌ Error rebuilding PDB: {str(e)}", 'error') |
|
|
import traceback |
|
|
logger.error(traceback.format_exc()) |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Failed to rebuild structure: {str(e)}'})}\n\n" |
|
|
return |
|
|
finally: |
|
|
|
|
|
try: |
|
|
os.unlink(script_path) |
|
|
except: |
|
|
pass |
|
|
except Exception as e: |
|
|
yield _format_log(f"❌ Error in PyMOL rebuild: {str(e)}", 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'PyMOL rebuild failed: {str(e)}'})}\n\n" |
|
|
return |
|
|
|
|
|
|
|
|
with open(output_pdb, 'r') as f: |
|
|
completed_content = f.read() |
|
|
|
|
|
chains_str = ', '.join(chains_to_process) |
|
|
yield _format_log(f"\n✅ Structure completion finished for chains: {chains_str}") |
|
|
|
|
|
result_message = f'Successfully completed structure for chains: {chains_str}' |
|
|
result_data = { |
|
|
'type': 'complete', |
|
|
'success': True, |
|
|
'message': result_message, |
|
|
'completed_chains': chains_to_process, |
|
|
'completed_structure': completed_content, |
|
|
'minimized_chains': minimized_chains |
|
|
} |
|
|
yield f"data: {json.dumps(result_data)}\n\n" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error building completed structure: {str(e)}") |
|
|
yield _format_log(f'❌ Error: {str(e)}', 'error') |
|
|
yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Failed to build completed structure: {str(e)}'})}\n\n" |
|
|
|
|
|
return Response(generate(), mimetype='text/event-stream') |
|
|
|
|
|
@app.route('/api/set-use-completed-structure', methods=['POST']) |
|
|
def set_use_completed_structure(): |
|
|
"""Set user preference to use completed structure (ESMFold) instead of original""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
use_completed = data.get('use_completed', False) |
|
|
|
|
|
|
|
|
flag_file = OUTPUT_DIR / ".use_completed_structure" |
|
|
|
|
|
if use_completed: |
|
|
|
|
|
flag_file.touch() |
|
|
logger.info("User chose to use ESMFold-completed structure") |
|
|
|
|
|
|
|
|
completed_pdb_path = OUTPUT_DIR / "0_complete_structure.pdb" |
|
|
original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" |
|
|
|
|
|
if completed_pdb_path.exists(): |
|
|
import shutil |
|
|
|
|
|
backup_path = OUTPUT_DIR / "0_original_input_backup.pdb" |
|
|
if original_pdb_path.exists() and not backup_path.exists(): |
|
|
shutil.copy2(original_pdb_path, backup_path) |
|
|
|
|
|
|
|
|
shutil.copy2(completed_pdb_path, original_pdb_path) |
|
|
logger.info(f"Replaced {original_pdb_path} with completed structure") |
|
|
else: |
|
|
|
|
|
if flag_file.exists(): |
|
|
flag_file.unlink() |
|
|
|
|
|
|
|
|
backup_path = OUTPUT_DIR / "0_original_input_backup.pdb" |
|
|
original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" |
|
|
|
|
|
if backup_path.exists() and original_pdb_path.exists(): |
|
|
import shutil |
|
|
|
|
|
completed_pdb_path = OUTPUT_DIR / "0_complete_structure.pdb" |
|
|
if completed_pdb_path.exists(): |
|
|
|
|
|
shutil.copy2(backup_path, original_pdb_path) |
|
|
logger.info(f"Restored original structure from backup") |
|
|
|
|
|
logger.info("User chose to use original structure") |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'use_completed': use_completed |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error setting use completed structure preference: {str(e)}") |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
}), 500 |
|
|
|
|
|
@app.route('/api/get-use-completed-structure', methods=['GET']) |
|
|
def get_use_completed_structure(): |
|
|
"""Get user preference for using completed structure""" |
|
|
try: |
|
|
flag_file = OUTPUT_DIR / ".use_completed_structure" |
|
|
use_completed = flag_file.exists() |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'use_completed': use_completed |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting use completed structure preference: {str(e)}") |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
}), 500 |
|
|
|
|
|
@app.route('/api/get-completed-structure', methods=['GET']) |
|
|
def get_completed_structure(): |
|
|
"""Get the completed structure PDB file if it exists""" |
|
|
try: |
|
|
completed_pdb_path = OUTPUT_DIR / "0_complete_structure.pdb" |
|
|
if not completed_pdb_path.exists(): |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'exists': False, |
|
|
'error': 'Completed structure not found' |
|
|
}), 404 |
|
|
|
|
|
with open(completed_pdb_path, 'r') as f: |
|
|
content = f.read() |
|
|
|
|
|
return jsonify({ |
|
|
'success': True, |
|
|
'exists': True, |
|
|
'content': content |
|
|
}) |
|
|
except Exception as e: |
|
|
logger.error(f"Error reading completed structure: {str(e)}") |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
}), 500 |
|
|
|
|
|
@app.route('/api/get-file', methods=['GET']) |
|
|
def get_file(): |
|
|
"""Get a file from the output directory""" |
|
|
try: |
|
|
filename = request.args.get('filename') |
|
|
if not filename: |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': 'Filename parameter required' |
|
|
}), 400 |
|
|
|
|
|
|
|
|
file_path = OUTPUT_DIR / filename |
|
|
|
|
|
|
|
|
if not str(file_path).startswith(str(OUTPUT_DIR)): |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': 'Invalid file path' |
|
|
}), 400 |
|
|
|
|
|
if not file_path.exists(): |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': f'File {filename} not found' |
|
|
}), 404 |
|
|
|
|
|
|
|
|
with open(file_path, 'r') as f: |
|
|
content = f.read() |
|
|
|
|
|
return content, 200, {'Content-Type': 'text/plain'} |
|
|
except Exception as e: |
|
|
logger.error(f"Error reading file {filename}: {str(e)}") |
|
|
return jsonify({ |
|
|
'success': False, |
|
|
'error': str(e) |
|
|
}), 500 |
|
|
|
|
|
if __name__ == '__main__': |
|
|
print("🧬 MD Simulation Pipeline") |
|
|
print("=========================") |
|
|
print("🌐 Starting Flask server...") |
|
|
print("📡 Backend API: http://localhost:5000") |
|
|
print("🔗 Web Interface: http://localhost:5000") |
|
|
print("") |
|
|
print("Press Ctrl+C to stop the server") |
|
|
print("") |
|
|
|
|
|
|
|
|
print("🧹 Cleaning output folder...") |
|
|
clean_and_create_output_folder() |
|
|
print("✅ Output folder ready!") |
|
|
print("") |
|
|
|
|
|
app.run(debug=False, host='0.0.0.0', port=5000) |
|
|
|