#!/usr/bin/env python3 """ MD Simulation Pipeline - Flask Backend Provides API endpoints for protein processing and file generation """ from flask import Flask, request, jsonify, send_file, render_template, send_from_directory, Response, stream_with_context from flask_cors import CORS import os import sys import json import tempfile import zipfile from pathlib import Path import requests import subprocess import time from Bio.PDB import PDBParser, PDBList import logging import html from collections import defaultdict from .structure_preparation import ( prepare_structure, parse_structure_info, extract_original_residue_info, restore_residue_info_in_pdb, sanity_check_ligand_pdb, merge_protein_and_ligand, ) from .Fill_missing_residues import ( get_pdb_id_from_pdb_file, detect_missing_residues, get_chain_sequences, run_esmfold, rebuild_pdb_with_esmfold, write_fasta_for_missing_chains, trim_residues_from_edges, trim_chains_sequences ) _BASE = Path(__file__).parent app = Flask(__name__, template_folder=str(_BASE / "html"), static_folder=str(_BASE), static_url_path="") CORS(app) # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Output directory (cwd/output when run as package) OUTPUT_DIR = Path.cwd() / "output" def clean_and_create_output_folder(): """Clean existing output folder and create a new one""" try: print(f"DEBUG: Starting cleanup. OUTPUT_DIR = {OUTPUT_DIR}") print(f"DEBUG: OUTPUT_DIR.exists() = {OUTPUT_DIR.exists()}") # Remove existing output folder if it exists if OUTPUT_DIR.exists(): import shutil print(f"DEBUG: Removing existing output folder: {OUTPUT_DIR}") shutil.rmtree(OUTPUT_DIR) print(f"DEBUG: Successfully removed output folder") logger.info(f"Removed existing output folder: {OUTPUT_DIR}") # Create new output folder print(f"DEBUG: Creating new output folder: {OUTPUT_DIR}") OUTPUT_DIR.mkdir(parents=True, exist_ok=True) print(f"DEBUG: Successfully created output folder") logger.info(f"Created new output folder: {OUTPUT_DIR}") return True except Exception as e: print(f"DEBUG: Error in cleanup: {str(e)}") logger.error(f"Error cleaning output folder: {str(e)}") return False def _ensure_docking_folder(): """Ensure the docking output folder exists and return its Path.""" docking_dir = OUTPUT_DIR / "docking" docking_dir.mkdir(parents=True, exist_ok=True) return docking_dir def _minimize_esmfold_chains_streaming(pdb_id, chains_to_minimize, original_for_align=None): """ Minimize ESMFold-generated chains using AMBER with streaming logs. Yields log messages in real-time. After removing hydrogens, the minimized chain is superimposed to the original (true crystal) structure so it stays in the same coordinate frame as the ligand and the rest of the system. Args: pdb_id: PDB ID (e.g., '1KE5') chains_to_minimize: List of chain IDs to minimize (e.g., ['A', 'B']) original_for_align: Path to the true original PDB for superimposition. Use 0_original_input_backup.pdb when it exists (true crystal), else 0_original_input.pdb. If None, this is computed automatically. Yields: Log messages as formatted SSE strings """ OUTPUT_DIR.mkdir(parents=True, exist_ok=True) if original_for_align is None: backup = OUTPUT_DIR / "0_original_input_backup.pdb" original_for_align = backup if backup.exists() else (OUTPUT_DIR / "0_original_input.pdb") for chain in chains_to_minimize: try: yield _format_log(f" Preparing minimization for chain {chain}...") # Step 1: Prepare tleap input file (all minimization files in output/, not docking/) esmfold_pdb = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.pdb" if not esmfold_pdb.exists(): yield _format_log(f" ❌ ESMFold PDB not found for chain {chain}: {esmfold_pdb}", 'error') continue tleap_in = OUTPUT_DIR / f"tleap_{chain}.in" with open(tleap_in, 'w') as f: f.write("source leaprc.protein.ff14SB\n") f.write(f"protein = loadpdb {esmfold_pdb.resolve()}\n") f.write(f"saveamberparm protein {pdb_id}_chain_{chain}_esmfold.prmtop {pdb_id}_chain_{chain}_esmfold.inpcrd\n") f.write("quit\n") # Step 2: Run tleap yield _format_log(f" Running tleap for chain {chain}...") prmtop = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.prmtop" inpcrd = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.inpcrd" if not prmtop.exists() or not inpcrd.exists(): cmd = ["tleap", "-f", str(tleap_in)] process = subprocess.Popen( cmd, cwd=str(OUTPUT_DIR), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1 ) for line in iter(process.stdout.readline, ''): if line.strip(): yield _format_log(f" {line.strip()}") process.wait() if process.returncode != 0 or not prmtop.exists(): yield _format_log(f" ❌ tleap failed for chain {chain}", 'error') continue yield _format_log(f" ✅ tleap completed for chain {chain}") # Step 3: Prepare min.in file min_in = OUTPUT_DIR / f"min_{chain}.in" with open(min_in, 'w') as f: f.write("#Two-stage minimization: sidechains first\n") f.write(" &cntrl\n") f.write(" imin=1, maxcyc=300, ncyc=150,\n") f.write(" ntb=0, cut=10.0, igb=1\n") f.write(" /\n") # Step 4: Run sander minimization and stream min_*.out in real-time yield _format_log(f" Running energy minimization (sander) for chain {chain}...") min_out = OUTPUT_DIR / f"min_{chain}.out" min_rst = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized.rst" cmd = [ "sander", "-O", "-i", str(min_in), "-o", str(min_out), "-p", str(prmtop), "-c", str(inpcrd), "-r", str(min_rst) ] # sander writes to -o file, not stdout: tail min_*.out in real-time process = subprocess.Popen( cmd, cwd=str(OUTPUT_DIR), stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, text=True, ) # Wait for min_*.out to be created for _ in range(100): if min_out.exists(): break time.sleep(0.1) last_pos = 0 buffer = "" while True: if min_out.exists(): try: with open(min_out, "r") as f: f.seek(last_pos) new = f.read() last_pos = f.tell() buffer += new while "\n" in buffer: line, buffer = buffer.split("\n", 1) if line.strip(): yield _format_log(f" {line.strip()}") except (IOError, OSError): pass ret = process.poll() if ret is not None: # Read any remaining output if min_out.exists(): try: with open(min_out, "r") as f: f.seek(last_pos) new = f.read() buffer += new while "\n" in buffer: line, buffer = buffer.split("\n", 1) if line.strip(): yield _format_log(f" {line.strip()}") if buffer.strip(): yield _format_log(f" {buffer.strip()}") except (IOError, OSError): pass break time.sleep(0.2) process.wait() if process.returncode != 0 and process.stderr: err = process.stderr.read() if err.strip(): yield _format_log(f" stderr: {err.strip()}", "error") if process.returncode != 0 or not min_rst.exists(): yield _format_log(f" ❌ sander minimization failed for chain {chain}", 'error') continue yield _format_log(f" ✅ Minimization completed for chain {chain}") # Step 5: Convert back to PDB using ambpdb yield _format_log(f" Converting minimized structure to PDB (ambpdb) for chain {chain}...") min_pdb = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized.pdb" with open(min_pdb, 'w') as f: cmd = [ "ambpdb", "-p", str(prmtop), "-c", str(min_rst) ] result = subprocess.run( cmd, stdout=f, stderr=subprocess.PIPE, text=True, ) if result.returncode != 0 or not min_pdb.exists(): yield _format_log(f" ❌ ambpdb failed for chain {chain}: {result.stderr}", 'error') continue yield _format_log(f" ✅ PDB conversion completed for chain {chain}") # Step 6: Remove hydrogens using PyMOL, then superimpose to original (true crystal) frame yield _format_log(f" Removing hydrogens using PyMOL for chain {chain}...") min_pdb_noH = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized_noH.pdb" do_superimpose = original_for_align.exists() if do_superimpose: yield _format_log(f" Superimposing minimized chain to original (true crystal) frame...") try: import tempfile # Build superimposition block: align minimized CA to original's chain CA so ligand stays in frame superimpose_block = "" if do_superimpose: superimpose_block = f""" cmd.load("{original_for_align.resolve()}", "orig_ref") cmd.align("min_chain_{chain} and name CA", "orig_ref and chain {chain} and name CA") cmd.delete("orig_ref") """ pymol_script = tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) pymol_script.write(f""" from pymol import cmd cmd.load("{min_pdb.resolve()}", "min_chain_{chain}") cmd.remove("hydrogens") {superimpose_block} cmd.save("{min_pdb_noH.resolve()}", "min_chain_{chain}") cmd.quit() """) pymol_script.close() result = subprocess.run( ["pymol", "-c", "-Q", pymol_script.name], capture_output=True, text=True, timeout=60 ) if result.returncode != 0 or not min_pdb_noH.exists(): raise Exception(f"PyMOL failed: {result.stderr}") os.unlink(pymol_script.name) yield _format_log(f" ✅ Hydrogens removed for chain {chain}") if do_superimpose: yield _format_log(f" ✅ Minimized chain {chain} superimposed to original frame") except Exception as e: yield _format_log(f" ⚠️ PyMOL hydrogen removal failed, using original: {e}", 'warning') min_pdb_noH = min_pdb # Minimized chain noH is written to output/; it will be merged into 1_protein_no_hydrogens.pdb # when the user runs Prepare Structure (1_protein_no_hydrogens is created there). yield _format_log(f" ✅ Chain {chain} minimization saved to {min_pdb_noH.name}. It will be merged into 1_protein_no_hydrogens.pdb when you run Prepare Structure.") except Exception as e: yield _format_log(f" ❌ Error minimizing chain {chain}: {str(e)}", 'error') import traceback logger.error(traceback.format_exc()) continue def _minimize_esmfold_chains(pdb_id, chains_to_minimize): """ Minimize ESMFold-generated chains using AMBER. Args: pdb_id: PDB ID (e.g., '1KE5') chains_to_minimize: List of chain IDs to minimize (e.g., ['A', 'B']) Returns: List of successfully minimized chain IDs """ minimized_chains = [] OUTPUT_DIR.mkdir(parents=True, exist_ok=True) for chain in chains_to_minimize: try: # Step 1: Prepare tleap input file (all minimization files in output/, not docking/) esmfold_pdb = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.pdb" if not esmfold_pdb.exists(): logger.warning(f"ESMFold PDB not found for chain {chain}: {esmfold_pdb}") continue tleap_in = OUTPUT_DIR / f"tleap_{chain}.in" with open(tleap_in, 'w') as f: f.write("source leaprc.protein.ff14SB\n") f.write(f"protein = loadpdb {esmfold_pdb.resolve()}\n") f.write(f"saveamberparm protein {pdb_id}_chain_{chain}_esmfold.prmtop {pdb_id}_chain_{chain}_esmfold.inpcrd\n") f.write("quit\n") # Step 2: Run tleap prmtop = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.prmtop" inpcrd = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.inpcrd" if not prmtop.exists() or not inpcrd.exists(): cmd = ["tleap", "-f", str(tleap_in)] result = subprocess.run( cmd, cwd=str(OUTPUT_DIR), capture_output=True, text=True, ) if result.returncode != 0 or not prmtop.exists(): logger.error(f"tleap failed for chain {chain}: {result.stderr}") continue # Step 3: Prepare min.in file min_in = OUTPUT_DIR / f"min_{chain}.in" with open(min_in, 'w') as f: f.write("#Two-stage minimization: sidechains first\n") f.write(" &cntrl\n") f.write(" imin=1, maxcyc=300, ncyc=150,\n") f.write(" ntb=0, cut=10.0, igb=1\n") f.write(" /\n") # Step 4: Run sander minimization min_out = OUTPUT_DIR / f"min_{chain}.out" min_rst = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized.rst" cmd = [ "sander", "-O", "-i", str(min_in), "-o", str(min_out), "-p", str(prmtop), "-c", str(inpcrd), "-r", str(min_rst) ] result = subprocess.run( cmd, cwd=str(OUTPUT_DIR), capture_output=True, text=True, ) if result.returncode != 0 or not min_rst.exists(): logger.error(f"sander minimization failed for chain {chain}: {result.stderr}") continue # Step 5: Convert back to PDB using ambpdb min_pdb = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized.pdb" with open(min_pdb, 'w') as f: cmd = [ "ambpdb", "-p", str(prmtop), "-c", str(min_rst) ] result = subprocess.run( cmd, stdout=f, stderr=subprocess.PIPE, text=True, ) if result.returncode != 0 or not min_pdb.exists(): logger.error(f"ambpdb failed for chain {chain}: {result.stderr}") continue # Step 6: Remove hydrogens using PyMOL (run in subprocess to avoid conflicts) min_pdb_noH = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold_minimized_noH.pdb" try: import tempfile pymol_script = tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) pymol_script.write(f""" from pymol import cmd cmd.load("{min_pdb.resolve()}", "min_chain_{chain}") cmd.remove("hydrogens") cmd.save("{min_pdb_noH.resolve()}", "min_chain_{chain}") cmd.quit() """) pymol_script.close() result = subprocess.run( ["pymol", "-c", "-Q", pymol_script.name], capture_output=True, text=True, timeout=60 ) if result.returncode != 0 or not min_pdb_noH.exists(): raise Exception(f"PyMOL failed: {result.stderr}") os.unlink(pymol_script.name) except Exception as e: logger.warning(f"PyMOL hydrogen removal failed for chain {chain}, using original: {e}") # Fallback: use the minimized PDB as-is min_pdb_noH = min_pdb # Minimized noH is in output/; it will be merged into 1_protein_no_hydrogens.pdb when user runs Prepare Structure logger.info(f"Minimized chain {chain} saved to {min_pdb_noH.name}. It will be merged into 1_protein_no_hydrogens.pdb when you run Prepare Structure.") minimized_chains.append(chain) except Exception as e: logger.error(f"Error minimizing chain {chain}: {str(e)}") import traceback logger.error(traceback.format_exc()) continue return minimized_chains def _replace_chain_in_pdb(target_pdb, chain_id, source_pdb): """ Replace a specific chain in target_pdb with the chain from source_pdb. Args: target_pdb: Path to target PDB file (will be modified) chain_id: Chain ID to replace source_pdb: Path to source PDB file containing the new chain """ # Read target PDB with open(target_pdb, 'r') as f: target_lines = f.readlines() # Read source PDB with open(source_pdb, 'r') as f: source_lines = f.readlines() # Filter target: keep all lines except those with the specified chain filtered_target = [] for line in target_lines: if line.startswith(('ATOM', 'HETATM')): if len(line) >= 21: chain = line[21] if chain != chain_id: filtered_target.append(line) else: # Keep non-ATOM lines filtered_target.append(line) # Extract chain from source source_chain_lines = [] for line in source_lines: if line.startswith(('ATOM', 'HETATM')): if len(line) >= 21: chain = line[21] if chain == 'A' or chain == chain_id: # ESMFold outputs as chain A # Update chain ID to match new_line = line[:21] + chain_id + line[22:] source_chain_lines.append(new_line) # Combine: target (without old chain) + new chain combined = [] for line in filtered_target: if line.startswith('END'): # Insert new chain before END combined.extend(source_chain_lines) combined.append(line) # Write back with open(target_pdb, 'w') as f: f.writelines(combined) def _prepare_receptor_for_docking(): """ Prepare receptor files for docking using the procedure in python/docking.py: 1. Run tleap on 1_protein_no_hydrogens.pdb to add hydrogens -> protein.pdb 2. Run pdb4amber on receptor.pdb -> receptor_fixed.pdb 3. Prepare receptor PDBQT with Meeko (mk_prepare_receptor.py) If ESMFold-completed structure is being used, the receptor will include: - Completed chains from ESMFold (for chains that were selected for completion) - Original chains (for chains that were not selected for completion) Returns paths (as Path objects) to receptor PDB and PDBQT. """ docking_dir = _ensure_docking_folder() protein_no_h = OUTPUT_DIR / "1_protein_no_hydrogens.pdb" if not protein_no_h.exists(): raise FileNotFoundError( f"1_protein_no_hydrogens.pdb not found in {OUTPUT_DIR}. " "Please run structure preparation first." ) # Check if completed structure is being used flag_file = OUTPUT_DIR / ".use_completed_structure" complete_structure_path = OUTPUT_DIR / "0_complete_structure.pdb" use_completed = flag_file.exists() and complete_structure_path.exists() if use_completed: logger.info("ESMFold-completed structure is being used for docking receptor preparation") logger.info(f"Completed structure includes: ESMFold-completed chains + original chains not selected for completion") # Step 1: tleap -> protein.pdb (receptor.pdb) tleap_in = docking_dir / "prepare_receptor.in" receptor_pdb = docking_dir / "receptor.pdb" # Check if receptor needs to be regenerated (if completed structure is newer or receptor doesn't exist) regenerate_receptor = False if not receptor_pdb.exists(): regenerate_receptor = True elif use_completed and complete_structure_path.exists(): # If using completed structure, check if it's newer than the receptor receptor_mtime = receptor_pdb.stat().st_mtime completed_mtime = complete_structure_path.stat().st_mtime protein_mtime = protein_no_h.stat().st_mtime # Regenerate if completed structure or protein file is newer if completed_mtime > receptor_mtime or protein_mtime > receptor_mtime: logger.info("Regenerating receptor: completed structure or protein file is newer") regenerate_receptor = True if regenerate_receptor: # Delete old receptor files to force regeneration if receptor_pdb.exists(): receptor_pdb.unlink() receptor_fixed_path = docking_dir / "receptor_fixed.pdb" if receptor_fixed_path.exists(): receptor_fixed_path.unlink() receptor_pdbqt_path = docking_dir / "receptor.pdbqt" if receptor_pdbqt_path.exists(): receptor_pdbqt_path.unlink() # Use absolute path to protein file since tleap runs from docking dir protein_no_h_abs = str(protein_no_h.resolve()) with open(tleap_in, "w") as f: f.write("source leaprc.protein.ff14SB\n") f.write(f"protein = loadpdb {protein_no_h_abs}\n") f.write("savepdb protein receptor.pdb\n") f.write("quit\n") # Run tleap in docking directory cmd = ["tleap", "-f", tleap_in.name] result = subprocess.run( cmd, cwd=docking_dir, capture_output=True, text=True, ) if result.returncode != 0 or not receptor_pdb.exists(): raise RuntimeError( "Failed to prepare receptor with tleap.\n" f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" ) # Step 2: pdb4amber -> receptor_fixed.pdb receptor_fixed = docking_dir / "receptor_fixed.pdb" if regenerate_receptor or not receptor_fixed.exists(): cmd = [ "pdb4amber", "-i", str(receptor_pdb), "-o", str(receptor_fixed), ] result = subprocess.run( cmd, capture_output=True, text=True, ) if result.returncode != 0 or not receptor_fixed.exists(): raise RuntimeError( "Failed to run pdb4amber on receptor.\n" f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" ) # Step 3: Meeko receptor preparation -> receptor.pdbqt receptor_pdbqt = docking_dir / "receptor.pdbqt" if regenerate_receptor or not receptor_pdbqt.exists(): cmd = [ "mk_prepare_receptor.py", "-i", str(receptor_fixed), "-o", "receptor", # Meeko will append .pdbqt "-p", ] result = subprocess.run( cmd, cwd=docking_dir, capture_output=True, text=True, ) if result.returncode != 0 or not receptor_pdbqt.exists(): # Check if error is due to clashes/disulfide bonds error_text = result.stderr + result.stdout needs_minimization = ( "excess inter-residue bond" in error_text or ("Expected" in error_text and "paddings" in error_text) ) if needs_minimization: # Check if chains were minimized min_status_file = OUTPUT_DIR / ".chains_minimized" minimized_chains = [] if min_status_file.exists(): with open(min_status_file, 'r') as f: content = f.read().strip() minimized_chains = content.split(',') if content else [] error_msg = ( "Failed to prepare receptor PDBQT with Meeko due to clashes/disulfide bonds.\n\n" ) if not minimized_chains: error_msg += ( "⚠️ ESMFold-generated chains need energy minimization.\n" "Please go back to the 'Fill Missing Residues' step and:\n" "1. Check the 'Energy minimize ESMFold-generated chains' option\n" "2. Select the chains you want to minimize\n" "3. Rebuild the completed structure\n" "4. Then try docking again.\n\n" ) else: error_msg += ( f"Some chains were minimized ({', '.join(minimized_chains)}), but the error persists.\n" "You may need to minimize additional chains or check the structure.\n\n" ) error_msg += f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" raise RuntimeError(error_msg) else: raise RuntimeError( "Failed to prepare receptor PDBQT with Meeko.\n" f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" ) return receptor_fixed, receptor_pdbqt def _compute_ligand_center(pdb_path: Path): """Compute geometric center of all atoms in a ligand PDB using MDAnalysis.""" try: import MDAnalysis as mda import numpy as np except Exception as e: raise RuntimeError( "MDAnalysis and NumPy are required for docking but could not be imported." ) from e u = mda.Universe(str(pdb_path)) if u.atoms.n_atoms == 0: raise ValueError(f"No atoms found in ligand file {pdb_path}") coords = u.atoms.positions.astype(float) center = coords.mean(axis=0) return float(center[0]), float(center[1]), float(center[2]) def _prepare_docked_pose_as_ligand(original_ligand: Path, pose_pdb: Path): """ Take a docked pose PDB and sanitize it so it can replace the original ligand: - Restore original residue name, chain ID, and residue index - Run ligand sanity checks (CONECT/MASTER removal, ATOM->HETATM, distinct atom names) This updates the pose_pdb in place. """ if not original_ligand.exists(): raise FileNotFoundError(f"Original ligand file not found: {original_ligand}") if not pose_pdb.exists(): raise FileNotFoundError(f"Docked pose file not found: {pose_pdb}") residue_info = extract_original_residue_info(str(original_ligand)) if residue_info: restore_residue_info_in_pdb( str(pose_pdb), residue_info.get("resname", "LIG"), residue_info.get("chain_id", ""), residue_info.get("resnum", "1"), ) # Run the existing ligand sanity checks if not sanity_check_ligand_pdb(str(pose_pdb)): raise RuntimeError(f"Sanity check failed for docked pose {pose_pdb}") def _sanitize_docked_pose_for_antechamber(pose_pdb: Path, original_residue_info: dict): """ Sanitize a docked pose PDB to make it compatible with antechamber: 1. Remove CONECT/MASTER/REMARK records 2. Convert all ATOM records to HETATM 3. Restore original residue name, chain ID, and residue number 4. Make atom names distinct (C1, C2, N1, N2, O1, O2, H1, H2, etc.) PDB Column format (1-indexed): 1-6: RECORD (HETATM) 7-11: ATOM # (atom serial number, right-justified) 12: Blank 13-16: ATOM NAME (right-justified for 1-2 char elements) 17: RES ALT (alternate location indicator, usually blank) 18-20: RES NAME (right-justified) 21: Blank 22: CHN ID (chain identifier) 23-26: RES# (residue sequence number, right-justified) 27: Insertion code (usually blank) 28-30: Blank (3 spaces) 31-38: X coordinate (8 chars, %8.3f) 39-46: Y coordinate (8 chars, %8.3f) 47-54: Z coordinate (8 chars, %8.3f) 55-60: OCC (occupancy, 6 chars) 61-66: TEMP (temperature factor, 6 chars) 67-76: Blank (10 spaces) 77-78: ELEMENT (right-justified) 79-80: Charge (e.g., 1+, 1-, 2+) Args: pose_pdb: Path to the docked pose PDB file (modified in place) original_residue_info: Dict with 'resname', 'chain_id', 'resnum' from original ligand """ if not pose_pdb.exists(): raise FileNotFoundError(f"Docked pose file not found: {pose_pdb}") # Get residue info (use provided or defaults) resname = original_residue_info.get("resname", "LIG") if original_residue_info else "LIG" chain_id = original_residue_info.get("chain_id", "A") if original_residue_info else "A" resnum = original_residue_info.get("resnum", "1") if original_residue_info else "1" # Ensure resname is exactly 3 chars, chain_id is 1 char resname = resname[:3].upper() chain_id = chain_id[0] if chain_id else "A" # Read the file with open(pose_pdb, 'r') as f: lines = f.readlines() # Track element counts for distinct atom naming from collections import defaultdict element_counts = defaultdict(int) processed_lines = [] atom_serial = 0 for line in lines: # Skip CONECT, MASTER, REMARK, COMPND, AUTHOR, TER, HEADER, TITLE, CRYST1 lines if line.startswith(('CONECT', 'MASTER', 'REMARK', 'COMPND', 'AUTHOR', 'TER', 'HEADER', 'TITLE', 'CRYST1')): continue if line.startswith(('ATOM', 'HETATM')): atom_serial += 1 # Pad line to ensure it's long enough padded_line = line.ljust(80) # Extract X, Y, Z coordinates (columns 31-54, 0-indexed: 30-54) try: x = float(padded_line[30:38].strip()) y = float(padded_line[38:46].strip()) z = float(padded_line[46:54].strip()) except ValueError: continue # Skip lines with invalid coordinates # Extract element from column 77-78 and charge from column 79-80 element = padded_line[76:78].strip() charge = padded_line[78:80].strip() # Handle cases where element+charge are combined (e.g., "N1+") if element and len(element) > 2: import re match = re.match(r'^([A-Za-z]{1,2})(\d*[+-])$', element) if match: element = match.group(1).upper() charge = match.group(2) # If no element found, extract from atom name if not element: atom_name = padded_line[12:16].strip() if len(atom_name) >= 1: # Check for two-letter elements if len(atom_name) >= 2 and atom_name[:2].upper() in ['CL', 'BR', 'MG', 'ZN', 'FE', 'CU', 'MN']: element = atom_name[:2].upper() else: # Get first alphabetic character for c in atom_name: if c.isalpha(): element = c.upper() break if not element: element = 'X' # Normalize element to uppercase element = element.upper() # Create distinct atom name (e.g., C1, C2, N1, H1, H2, etc.) element_counts[element] += 1 count = element_counts[element] # Format atom name: right-justify within 4 chars atom_name_str = f"{element}{count}" if len(atom_name_str) > 4: atom_name_str = atom_name_str[:4] # Build properly formatted PDB line following standard format # HETATM 1 N1 MKW A 203 7.216 9.776 -4.013 1.00 0.00 N new_line = ( f"HETATM" # 1-6: Record type (6 chars) f"{atom_serial:5d}" # 7-11: Atom serial (5 chars, right-justified) f" " # 12: Blank (1 char) f"{atom_name_str:>4}" # 13-16: Atom name (4 chars, right-justified) f" " # 17: Alt loc indicator (1 char, blank) f"{resname:>3}" # 18-20: Residue name (3 chars, right-justified) f" " # 21: Blank (1 char) f"{chain_id}" # 22: Chain ID (1 char) f"{resnum:>4}" # 23-26: Residue number (4 chars, right-justified) f" " # 27-30: Insertion code + blank (4 chars) f"{x:8.3f}" # 31-38: X coordinate (8 chars) f"{y:8.3f}" # 39-46: Y coordinate (8 chars) f"{z:8.3f}" # 47-54: Z coordinate (8 chars) f" 1.00" # 55-60: Occupancy (6 chars) f" 0.00" # 61-66: Temp factor (6 chars) f" " # 67-76: Blank (10 chars) f"{element:>2}" # 77-78: Element symbol (2 chars, right-justified) f"{charge:<2}" # 79-80: Charge (2 chars, left-justified) f"\n" ) processed_lines.append(new_line) elif line.startswith('END'): continue # We'll add END at the end # Add END record processed_lines.append('END\n') # Write back with open(pose_pdb, 'w') as f: f.writelines(processed_lines) logger.info(f"Sanitized docked pose {pose_pdb}: resname={resname}, chain={chain_id}, resnum={resnum}, atoms={atom_serial}") logger.info(f"Element counts: {dict(element_counts)}") def _parse_vina_config(config_path: Path): """ Parse Vina config file and return a dict with parameters. Returns None if file doesn't exist or can't be parsed. """ if not config_path.exists(): return None config = {} try: for line in config_path.read_text().split('\n'): line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # Parse key = value format if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # Try to convert to appropriate type try: if '.' in value: config[key] = float(value) else: config[key] = int(value) except ValueError: config[key] = value return config except Exception as e: logger.warning(f"Error parsing config file {config_path}: {e}") return None def _parse_vina_log(log_path: Path): """ Parse AutoDock Vina log file and extract binding energies per mode. Returns dict: {mode_index: energy_kcal_mol} """ energies = {} if not log_path.exists(): return energies try: import re with log_path.open("r") as f: for line in f: # Typical Vina line: # 1 -7.3 0.000 0.000 m = re.match(r"^\s*(\d+)\s+(-?\d+\.\d+)", line) if m: mode = int(m.group(1)) energy = float(m.group(2)) energies[mode] = energy except Exception as e: logger.warning(f"Could not parse Vina log {log_path}: {e}") return energies class MDSimulationGenerator: """Handles MD simulation file generation and protein processing""" def __init__(self): self.pdb_parser = PDBParser(QUIET=True) self.pdb_list = PDBList() def fetch_pdb_structure(self, pdb_id): """Fetch PDB structure from RCSB""" try: # Download PDB file pdb_file = self.pdb_list.retrieve_pdb_file(pdb_id, pdir=OUTPUT_DIR, file_format='pdb') return str(pdb_file) except Exception as e: logger.error(f"Error fetching PDB {pdb_id}: {str(e)}") raise def parse_pdb_structure(self, pdb_file): """Parse PDB file and extract structure information""" try: structure = self.pdb_parser.get_structure('protein', pdb_file) # Extract basic information atom_count = 0 chains = set() residues = set() for model in structure: for chain in model: chains.add(chain.id) for residue in chain: if residue.id[0] == ' ': # Standard residues residues.add(f"{residue.resname}{residue.id[1]}") for atom in residue: atom_count += 1 return { 'atom_count': atom_count, 'chains': list(chains), 'residue_count': len(residues), 'structure_id': Path(pdb_file).stem.upper() } except Exception as e: logger.error(f"Error parsing PDB file: {str(e)}") raise def generate_mdp_file(self, params, step_type='production'): """Generate GROMACS MDP file for different simulation steps""" if step_type == 'restrained_min': return f"""; Restrained Minimization Parameters integrator = steep nsteps = {params['steps']['restrainedMin']['steps']} emstep = 0.01 emtol = 1000 ; Position restraints define = -DPOSRES refcoord_scaling = com ; Output control nstxout = 100 nstenergy = 100 nstlog = 100 ; Bond parameters constraint_algorithm = lincs constraints = h-bonds ; Neighbor searching cutoff-scheme = Verlet ns_type = grid nstlist = 10 rlist = {params['cutoff']} ; Electrostatics coulombtype = PME rcoulomb = {params['cutoff']} pme_order = {params['pmeOrder']} ; Van der Waals vdwtype = Cut-off rvdw = {params['cutoff']} """ elif step_type == 'minimization': return f"""; Minimization Parameters integrator = {params['steps']['minimization']['algorithm']} nsteps = {params['steps']['minimization']['steps']} emstep = 0.01 emtol = 1000 ; Output control nstxout = 100 nstenergy = 100 nstlog = 100 ; Bond parameters constraint_algorithm = lincs constraints = h-bonds ; Neighbor searching cutoff-scheme = Verlet ns_type = grid nstlist = 10 rlist = {params['cutoff']} ; Electrostatics coulombtype = PME rcoulomb = {params['cutoff']} pme_order = {params['pmeOrder']} ; Van der Waals vdwtype = Cut-off rvdw = {params['cutoff']} """ elif step_type == 'nvt': return f"""; NVT Equilibration Parameters integrator = md dt = {params['timestep']} nsteps = {params['steps']['nvt']['steps']} ; Output control nstxout = 5000 nstvout = 5000 nstenergy = 1000 nstlog = 1000 ; Bond parameters constraint_algorithm = lincs constraints = h-bonds lincs_iter = 1 lincs_order = 4 ; Neighbor searching cutoff-scheme = Verlet ns_type = grid nstlist = 40 rlist = {params['cutoff']} ; Electrostatics coulombtype = PME rcoulomb = {params['cutoff']} pme_order = {params['pmeOrder']} ; Van der Waals vdwtype = Cut-off rvdw = {params['cutoff']} ; Temperature coupling tcoupl = {params['couplingType']} tc-grps = Protein Non-Protein tau_t = 0.1 0.1 ref_t = {params['steps']['nvt']['temperature']} {params['steps']['nvt']['temperature']} ; Pressure coupling (disabled for NVT) pcoupl = no ; Velocity generation gen_vel = yes gen_temp = {params['steps']['nvt']['temperature']} gen_seed = -1 """ elif step_type == 'npt': return f"""; NPT Equilibration Parameters integrator = md dt = {params['timestep']} nsteps = {params['steps']['npt']['steps']} ; Output control nstxout = 5000 nstvout = 5000 nstenergy = 1000 nstlog = 1000 ; Bond parameters constraint_algorithm = lincs constraints = h-bonds lincs_iter = 1 lincs_order = 4 ; Neighbor searching cutoff-scheme = Verlet ns_type = grid nstlist = 40 rlist = {params['cutoff']} ; Electrostatics coulombtype = PME rcoulomb = {params['cutoff']} pme_order = {params['pmeOrder']} ; Van der Waals vdwtype = Cut-off rvdw = {params['cutoff']} ; Temperature coupling tcoupl = {params['couplingType']} tc-grps = Protein Non-Protein tau_t = 0.1 0.1 ref_t = {params['steps']['npt']['temperature']} {params['steps']['npt']['temperature']} ; Pressure coupling pcoupl = {params['couplingType']} pcoupltype = isotropic tau_p = 2.0 ref_p = {params['steps']['npt']['pressure']} compressibility = 4.5e-5 ; Velocity generation gen_vel = no """ else: # production return f"""; MD Simulation Parameters ; Generated by MD Simulation Pipeline ; Run parameters integrator = md dt = {params['timestep']} nsteps = {params['steps']['production']['steps']} ; Output control nstxout = 5000 nstvout = 5000 nstenergy = 1000 nstlog = 1000 ; Bond parameters constraint_algorithm = lincs constraints = h-bonds lincs_iter = 1 lincs_order = 4 ; Neighbor searching cutoff-scheme = Verlet ns_type = grid nstlist = 40 rlist = {params['cutoff']} ; Electrostatics coulombtype = PME rcoulomb = {params['cutoff']} pme_order = {params['pmeOrder']} fourierspacing = 0.16 ; Van der Waals vdwtype = Cut-off rvdw = {params['cutoff']} ; Temperature coupling tcoupl = {params['couplingType']} tc-grps = Protein Non-Protein tau_t = 0.1 0.1 ref_t = {params['temperature']} {params['temperature']} ; Pressure coupling pcoupl = {params['couplingType']} pcoupltype = isotropic tau_p = 2.0 ref_p = {params['pressure']} compressibility = 4.5e-5 ; Dispersion correction DispCorr = EnerPres ; Velocity generation gen_vel = yes gen_temp = {params['temperature']} gen_seed = -1 """ def generate_pbs_script(self, protein_name, params): """Generate PBS script for HPC submission""" total_steps = params['steps']['production']['steps'] time_in_ns = (total_steps * params['timestep']) / 1000 return f"""#!/bin/bash #PBS -N {protein_name}_md #PBS -l nodes=1:ppn=16 #PBS -l walltime=24:00:00 #PBS -q normal #PBS -j oe # Change to the directory where the job was submitted cd $PBS_O_WORKDIR # Load required modules module load gromacs/2023.2 module load intel/2021.4.0 # Set up environment export OMP_NUM_THREADS=16 export GMX_MAXBACKUP=-1 # Simulation parameters PROTEIN={protein_name} STEPS={total_steps} TIME_NS={time_in_ns:.2f} echo "Starting MD simulation for $PROTEIN" echo "Total simulation time: $TIME_NS ns" echo "Job started at: $(date)" # Run the simulation ./run_simulation.sh $PROTEIN echo "Simulation completed at: $(date)" echo "Results saved in output directory" """ def generate_setup_script(self, protein_name, params): """Generate setup script for MD simulation""" return f"""#!/bin/bash # Setup script for {protein_name} MD simulation # Generated by MD Simulation Pipeline set -e PROTEIN={protein_name} FORCE_FIELD={params['forceField']} WATER_MODEL={params['waterModel']} echo "Setting up MD simulation for $PROTEIN" # Create output directory mkdir -p output # 1. Prepare protein structure echo "Preparing protein structure..." gmx pdb2gmx -f $PROTEIN.pdb -o $PROTEIN_processed.gro -p $PROTEIN.top -ff $FORCE_FIELD -water $WATER_MODEL # 2. Define simulation box echo "Defining simulation box..." gmx editconf -f $PROTEIN_processed.gro -o $PROTEIN_box.gro -c -d {params['boxMargin']} -bt {params['boxType']} # 3. Add solvent echo "Adding solvent..." gmx solvate -cp $PROTEIN_box.gro -cs spc216.gro -o $PROTEIN_solv.gro -p $PROTEIN.top # 4. Add ions echo "Adding ions..." gmx grompp -f $PROTEIN_restrained.mdp -c $PROTEIN_solv.gro -p $PROTEIN.top -o $PROTEIN_ions.tpr echo "SOL" | gmx genion -s $PROTEIN_ions.tpr -o $PROTEIN_final.gro -p $PROTEIN.top -pname NA -nname CL -neutral echo "Setup completed successfully!" echo "Ready to run simulation with: ./run_simulation.sh $PROTEIN" """ def generate_analysis_script(self, protein_name): """Generate analysis script for MD simulation results""" return f"""#!/bin/bash # Analysis script for {protein_name} MD simulation # Generated by MD Simulation Pipeline PROTEIN={protein_name} echo "Analyzing MD simulation results for $PROTEIN" # Create analysis directory mkdir -p analysis # 1. RMSD analysis echo "Calculating RMSD..." echo "Protein" | gmx rms -s $PROTEIN_final.tpr -f $PROTEIN_prod.xtc -o analysis/$PROTEIN_rmsd.xvg -tu ns # 2. RMSF analysis echo "Calculating RMSF..." echo "Protein" | gmx rmsf -s $PROTEIN_final.tpr -f $PROTEIN_prod.xtc -o analysis/$PROTEIN_rmsf.xvg -res # 3. Radius of gyration echo "Calculating radius of gyration..." echo "Protein" | gmx gyrate -s $PROTEIN_final.tpr -f $PROTEIN_prod.xtc -o analysis/$PROTEIN_gyrate.xvg # 4. Hydrogen bonds echo "Analyzing hydrogen bonds..." echo "Protein" | gmx hbond -s $PROTEIN_final.tpr -f $PROTEIN_prod.xtc -num analysis/$PROTEIN_hbonds.xvg # 5. Energy analysis echo "Analyzing energies..." gmx energy -f $PROTEIN_prod.edr -o analysis/$PROTEIN_energy.xvg # 6. Generate plots echo "Generating analysis plots..." python3 plot_analysis.py $PROTEIN echo "Analysis completed! Results saved in analysis/ directory" """ # Initialize the MD simulation generator md_generator = MDSimulationGenerator() @app.route('/api/fetch-pdb', methods=['POST']) def fetch_pdb(): """Fetch PDB structure from RCSB""" try: print("DEBUG: fetch-pdb endpoint called") data = request.get_json() pdb_id = data.get('pdb_id', '').upper() print(f"DEBUG: pdb_id = {pdb_id}") if not pdb_id or len(pdb_id) != 4: return jsonify({'error': 'Invalid PDB ID'}), 400 # Clean and create new output folder for fresh start print("DEBUG: Calling clean_and_create_output_folder()") if not clean_and_create_output_folder(): return jsonify({'error': 'Failed to clean output folder'}), 500 print("DEBUG: Output folder cleanup completed successfully") # Fetch PDB structure pdb_file = md_generator.fetch_pdb_structure(pdb_id) # Parse structure information structure_info = md_generator.parse_pdb_structure(pdb_file) return jsonify({ 'success': True, 'structure_info': structure_info, 'pdb_file': pdb_file }) except Exception as e: logger.error(f"Error fetching PDB: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/proxy-pdb/', methods=['GET']) def proxy_pdb(pdb_id): """Proxy endpoint to fetch PDB from RCSB or mirrors (avoids CORS issues)""" try: pdb_id = pdb_id.upper().strip() if not pdb_id or len(pdb_id) != 4: return jsonify({'error': 'Invalid PDB ID'}), 400 # Try multiple sources in order of preference urls = [ f"https://files.rcsb.org/download/{pdb_id}.pdb", # Primary RCSB f"https://www.ebi.ac.uk/pdbe/entry-files/download/pdb{pdb_id.lower()}.ent", # PDBe (European mirror) ] for url in urls: try: print(f"DEBUG: Trying to fetch PDB from {url}") response = requests.get(url, timeout=30) if response.status_code == 200: content = response.text # Validate it looks like a PDB file if 'ATOM' in content or 'HETATM' in content: print(f"DEBUG: Successfully fetched PDB from {url}") return Response(content, mimetype='text/plain') except requests.exceptions.RequestException as e: print(f"DEBUG: Failed to fetch from {url}: {e}") continue return jsonify({'error': f'PDB ID {pdb_id} not found or servers unavailable'}), 404 except Exception as e: logger.error(f"Error proxying PDB {pdb_id}: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/get-pdb-content', methods=['GET']) def get_pdb_content(): """Return the content of a PDB file""" try: file_path = request.args.get('file', '') if not file_path: return jsonify({'success': False, 'error': 'No file path provided'}), 400 # Security check: ensure the file is within the output directory file_path = Path(file_path) if not str(file_path.resolve()).startswith(str(OUTPUT_DIR.resolve())): return jsonify({'success': False, 'error': 'Invalid file path'}), 400 if not file_path.exists(): return jsonify({'success': False, 'error': 'File not found'}), 404 content = file_path.read_text() return jsonify({'success': True, 'content': content}) except Exception as e: logger.error(f"Error reading PDB content: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/parse-pdb', methods=['POST']) def parse_pdb(): """Parse uploaded PDB file""" try: print("DEBUG: parse-pdb endpoint called") if 'file' not in request.files: return jsonify({'error': 'No file uploaded'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No file selected'}), 400 print(f"DEBUG: Processing uploaded file: {file.filename}") # Clean and create new output folder for fresh start print("DEBUG: Calling clean_and_create_output_folder()") if not clean_and_create_output_folder(): return jsonify({'error': 'Failed to clean output folder'}), 500 print("DEBUG: Output folder cleanup completed successfully") # Save uploaded file temporarily temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.pdb') file.save(temp_file.name) # Parse structure information structure_info = md_generator.parse_pdb_structure(temp_file.name) # Clean up temporary file os.unlink(temp_file.name) return jsonify({ 'success': True, 'structure_info': structure_info }) except Exception as e: logger.error(f"Error parsing PDB: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/generate-files', methods=['POST']) def generate_files(): """Generate MD simulation files""" try: data = request.get_json() protein_name = data.get('protein_name', 'protein') simulation_params = data.get('simulation_params', {}) # Generate all files files = {} # MDP files files[f'{protein_name}.mdp'] = md_generator.generate_mdp_file(simulation_params, 'production') files[f'{protein_name}_restrained.mdp'] = md_generator.generate_mdp_file(simulation_params, 'restrained_min') files[f'{protein_name}_min.mdp'] = md_generator.generate_mdp_file(simulation_params, 'minimization') files[f'{protein_name}_nvt.mdp'] = md_generator.generate_mdp_file(simulation_params, 'nvt') files[f'{protein_name}_npt.mdp'] = md_generator.generate_mdp_file(simulation_params, 'npt') files[f'{protein_name}_prod.mdp'] = md_generator.generate_mdp_file(simulation_params, 'production') # Scripts files[f'{protein_name}_simulation.pbs'] = md_generator.generate_pbs_script(protein_name, simulation_params) files[f'setup_{protein_name}.sh'] = md_generator.generate_setup_script(protein_name, simulation_params) files[f'analyze_{protein_name}.sh'] = md_generator.generate_analysis_script(protein_name) return jsonify({ 'success': True, 'files': files }) except Exception as e: logger.error(f"Error generating files: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/download-zip', methods=['POST']) def download_zip(): """Download all generated files as a ZIP archive""" try: data = request.get_json() files = data.get('files', {}) # Create temporary ZIP file temp_zip = tempfile.NamedTemporaryFile(delete=False, suffix='.zip') with zipfile.ZipFile(temp_zip.name, 'w') as zip_file: for filename, content in files.items(): zip_file.writestr(filename, content) return send_file( temp_zip.name, as_attachment=True, download_name='md_simulation_files.zip', mimetype='application/zip' ) except Exception as e: logger.error(f"Error creating ZIP file: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/get-solvated-protein', methods=['GET']) def get_solvated_protein(): """Get the solvated protein PDB file content""" try: solvated_file = os.path.join(OUTPUT_DIR, 'protein_solvated.pdb') if not os.path.exists(solvated_file): return jsonify({'success': False, 'error': 'Solvated protein file not found. Please generate files first.'}) with open(solvated_file, 'r') as f: content = f.read() return jsonify({'success': True, 'content': content}) except Exception as e: logger.error(f"Error reading solvated protein file: {str(e)}") return jsonify({'success': False, 'error': str(e)}) @app.route('/api/get-viewer-pdb', methods=['GET']) def get_viewer_pdb(): """Return a single PDB for viewer: start from protein_solvated.pdb and mark ligand residues as HETATM. Ligand residues are detected from 4_ligands_corrected*.pdb files by (resname, chain, resi) tuples; if chains/resi not present, fallback to resname matching. """ try: solvated_path = OUTPUT_DIR / 'protein_solvated.pdb' # Find all corrected ligand files (support multiple ligands) # Exclude OpenBabel output files (4_ligands_corrected_obabel_*.pdb) lig_paths = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) # Fallback to single file for backward compatibility if not lig_paths: single_lig_path = OUTPUT_DIR / '4_ligands_corrected.pdb' if single_lig_path.exists(): lig_paths = [single_lig_path] viewer_out = OUTPUT_DIR / 'viewer_protein_with_ligand.pdb' if not solvated_path.exists(): return jsonify({'success': False, 'error': 'protein_solvated.pdb not found'}), 400 # Build ligand index from all corrected ligand PDB files if present ligand_keys = set() ligand_resnames = set() for lig_path in lig_paths: if lig_path.exists(): with open(lig_path, 'r') as lf: for line in lf: if line.startswith(('ATOM', 'HETATM')): resn = line[17:20].strip() chain = line[21:22].strip() resi = line[22:26].strip() ligand_resnames.add(resn) if chain and resi: ligand_keys.add((resn, chain, resi)) # Rewrite solvated file marking matching ligand residues and ions (NA/CL) as HETATM out_lines = [] with open(solvated_path, 'r') as sf: for line in sf: if line.startswith(('ATOM', 'HETATM')): resn = line[17:20].strip() chain = line[21:22].strip() resi = line[22:26].strip() is_match = False is_ion = resn in { 'NA', 'CL' } if (resn, chain, resi) in ligand_keys: is_match = True elif resn in ligand_resnames: # Fallback by residue name only is_match = True if is_match or is_ion: # Force to HETATM out_lines.append('HETATM' + line[6:]) else: out_lines.append(line) else: out_lines.append(line) # Save combined viewer file (optional but useful for debugging) try: with open(viewer_out, 'w') as vf: vf.writelines(out_lines) except Exception: pass return jsonify({'success': True, 'content': ''.join(out_lines)}) except Exception as e: logger.error(f"Error generating viewer PDB: {str(e)}") return jsonify({'success': False, 'error': str(e)}) @app.route('/view-pdb') def view_pdb_html(): """Serve PDB file as HTML page for instant viewing""" try: viewer_out = OUTPUT_DIR / 'viewer_protein_with_ligand.pdb' solvated_path = OUTPUT_DIR / 'protein_solvated.pdb' # Find all corrected ligand files (support multiple ligands) # Exclude OpenBabel output files (4_ligands_corrected_obabel_*.pdb) lig_paths = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) # Fallback to single file for backward compatibility if not lig_paths: single_lig_path = OUTPUT_DIR / '4_ligands_corrected.pdb' if single_lig_path.exists(): lig_paths = [single_lig_path] # If viewer file doesn't exist, generate it first if not viewer_out.exists(): if not solvated_path.exists(): return f""" Error - PDB Not Found

PDB File Not Found

Please complete the structure preparation steps first.

""", 404 # Generate the file directly (same logic as get_viewer_pdb but without JSON response) try: # Build ligand index from all corrected ligand PDB files if present ligand_keys = set() ligand_resnames = set() for lig_path in lig_paths: if lig_path.exists(): with open(lig_path, 'r') as lf: for line in lf: if line.startswith(('ATOM', 'HETATM')): resn = line[17:20].strip() chain = line[21:22].strip() resi = line[22:26].strip() ligand_resnames.add(resn) if chain and resi: ligand_keys.add((resn, chain, resi)) # Rewrite solvated file marking matching ligand residues and ions (NA/CL) as HETATM out_lines = [] with open(solvated_path, 'r') as sf: for line in sf: if line.startswith(('ATOM', 'HETATM')): resn = line[17:20].strip() chain = line[21:22].strip() resi = line[22:26].strip() is_match = False is_ion = resn in { 'NA', 'CL' } if (resn, chain, resi) in ligand_keys: is_match = True elif resn in ligand_resnames: # Fallback by residue name only is_match = True if is_match or is_ion: # Force to HETATM out_lines.append('HETATM' + line[6:]) else: out_lines.append(line) else: out_lines.append(line) # Save combined viewer file with open(viewer_out, 'w') as vf: vf.writelines(out_lines) except Exception as e: logger.error(f"Error generating viewer PDB: {str(e)}") return f""" Error

Error Generating PDB

Could not generate viewer PDB file: {html.escape(str(e))}

""", 500 # Read PDB content with open(viewer_out, 'r') as f: pdb_content = f.read() # Escape HTML special characters escaped_content = html.escape(pdb_content) # Create HTML page html_page = f""" Viewer PDB File

📄 Viewer PDB File

File: viewer_protein_with_ligand.pdb
{escaped_content}
""" return html_page, 200, {'Content-Type': 'text/html; charset=utf-8'} except Exception as e: logger.error(f"Error serving PDB as HTML: {str(e)}") return f""" Error

Error Loading PDB

{html.escape(str(e))}

""", 500 @app.route('/api/get-corrected-ligands', methods=['GET']) def get_corrected_ligands(): """Get the corrected ligand PDB file content if present (combines all ligands)""" try: # Find all corrected ligand files (support multiple ligands) # Exclude OpenBabel output files (4_ligands_corrected_obabel_*.pdb) ligand_files = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) # Fallback to single file for backward compatibility if not ligand_files: single_lig_file = OUTPUT_DIR / '4_ligands_corrected.pdb' if single_lig_file.exists(): ligand_files = [single_lig_file] if not ligand_files: # Return success with exists flag false so frontend can decide gracefully return jsonify({'success': True, 'exists': False, 'content': ''}) # Read and normalize records to HETATM for viewer compatibility, combine all ligands normalized_lines = [] for ligand_file in ligand_files: with open(ligand_file, 'r') as f: for line in f: if line.startswith('ATOM'): # Replace record name to HETATM, preserve fixed-width columns normalized_lines.append('HETATM' + line[6:]) elif line.startswith('HETATM'): normalized_lines.append(line) elif line.strip() == 'END' and ligand_file != ligand_files[-1]: # Skip END for intermediate ligands, keep only for last continue elif line.strip() and not line.startswith(('CRYST', 'REMARK', 'HEADER')): normalized_lines.append(line) # Ensure we have an END at the end if normalized_lines and not normalized_lines[-1].strip() == 'END': normalized_lines.append('END\n') content = ''.join(normalized_lines) return jsonify({'success': True, 'exists': True, 'content': content}) except Exception as e: logger.error(f"Error reading corrected ligand file: {str(e)}") return jsonify({'success': False, 'error': str(e)}) @app.route('/api/get-aligned-ligands', methods=['GET']) def get_aligned_ligands(): """Return ligand coordinates aligned to protein_solvated.pdb frame using PyMOL transforms.""" try: solvated_file = OUTPUT_DIR / 'protein_solvated.pdb' tleap_ready = OUTPUT_DIR / 'tleap_ready.pdb' # Find all corrected ligand files (support multiple ligands) # Exclude OpenBabel output files (4_ligands_corrected_obabel_*.pdb) ligand_files = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) # Fallback to single file for backward compatibility if not ligand_files: single_lig_file = OUTPUT_DIR / '4_ligands_corrected.pdb' if single_lig_file.exists(): ligand_files = [single_lig_file] if not solvated_file.exists(): return jsonify({'success': False, 'error': 'protein_solvated.pdb not found'}), 400 if not tleap_ready.exists(): return jsonify({'success': False, 'error': 'tleap_ready.pdb not found'}), 400 if not ligand_files: return jsonify({'success': True, 'exists': False, 'content': ''}) # Use first ligand file for PyMOL alignment (or combine them if needed) ligand_file = ligand_files[0] # Create temp output path aligned_lig = OUTPUT_DIR / 'ligand_aligned_for_preview.pdb' try: if aligned_lig.exists(): aligned_lig.unlink() except Exception: pass # PyMOL script: load solvated, load tlready (protein+lig), align tlready protein to solvated protein, then save transformed ligand pymol_script = f""" import pymol pymol.finish_launching(['pymol','-qc']) from pymol import cmd cmd.load('{solvated_file.as_posix()}', 'solv') cmd.load('{tleap_ready.as_posix()}', 'prep') cmd.load('{ligand_file.as_posix()}', 'lig') # Align prepared protein to solvated protein; use CA atoms to be robust cmd.align('prep and polymer.protein and name CA', 'solv and polymer.protein and name CA') # Apply same transform implicitly affects 'prep' object; we saved ligand as separate object, so match matrices mat = cmd.get_object_matrix('prep') cmd.set_object_matrix('lig', mat) # Save ligand in aligned frame, as HETATM cmd.alter('lig', 'type="HETATM"') cmd.save('{aligned_lig.as_posix()}', 'lig') cmd.quit() """ # Run PyMOL inline result = subprocess.run(['python3', '-c', pymol_script], capture_output=True, text=True, cwd=str(OUTPUT_DIR)) if result.returncode != 0: return jsonify({'success': False, 'error': f'PyMOL alignment failed: {result.stderr}'}), 500 if not aligned_lig.exists(): return jsonify({'success': False, 'error': 'Aligned ligand file was not produced'}), 500 # Read and return content normalized_lines = [] with open(aligned_lig, 'r') as f: for line in f: if line.startswith('ATOM'): normalized_lines.append('HETATM' + line[6:]) else: normalized_lines.append(line) content = ''.join(normalized_lines) return jsonify({'success': True, 'exists': True, 'content': content}) except Exception as e: logger.error(f"Error aligning ligands: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/viewer/') def viewer(filename): """Serve NGL viewer page""" # Check if the file exists, if not, try to generate it file_path = OUTPUT_DIR / filename if not file_path.exists(): # Try to generate the viewer PDB if it's the specific file we need if filename == 'viewer_protein_with_ligand.pdb': try: # Call the get_viewer_pdb function to generate the file result = get_viewer_pdb() if result[1] == 200: # Success pass # File should now exist except: pass # Continue anyway return f""" NGL Viewer - {filename}

🧬 3D Structure Viewer - {filename}

""" @app.route('/output/') def serve_output(filename): """Serve output files""" # Debug: print available files print(f"Requested file: {filename}") print(f"Full path: {OUTPUT_DIR / filename}") print(f"File exists: {(OUTPUT_DIR / filename).exists()}") print(f"Files in output dir: {list(OUTPUT_DIR.iterdir()) if OUTPUT_DIR.exists() else 'Directory not found'}") if not (OUTPUT_DIR / filename).exists(): abort(404) return send_from_directory(OUTPUT_DIR, filename) @app.route('/') def index(): """Serve the main HTML page""" return render_template('index.html') @app.route('/') def serve_static(filename): """Serve static files (CSS, JS, etc.)""" return send_from_directory(str(_BASE), filename) @app.route('/api/prepare-structure', methods=['POST']) def prepare_structure_endpoint(): """Prepare protein structure for AMBER""" try: data = request.get_json() pdb_content = data.get('pdb_content', '') options = data.get('options', {}) # Check if user wants to use completed structure (ESMFold) flag_file = OUTPUT_DIR / ".use_completed_structure" complete_structure_path = OUTPUT_DIR / "0_complete_structure.pdb" if flag_file.exists() and complete_structure_path.exists(): logger.info("Using superimposed completed structure (0_complete_structure.pdb) for preparation so ligands stay in the same coordinate frame") with open(complete_structure_path, 'r') as f: pdb_content = f.read() elif not pdb_content: return jsonify({'error': 'No PDB content provided and no completed structure found'}), 400 # Prepare structure (use OUTPUT_DIR so paths match app's output folder) result = prepare_structure(pdb_content, options, output_dir=str(OUTPUT_DIR)) # Check if prepare_structure returned an error if result.get('error'): logger.error(f"Structure preparation failed: {result['error']}") return jsonify({'error': result['error']}), 400 # Validate and sanitize ligand names early (after structure preparation) # This ensures numeric ligand names are converted to 3-letter codes ligand_name_changes = validate_and_sanitize_all_ligand_files() # Build response response_data = { 'success': True, 'prepared_structure': result['prepared_structure'], 'original_atoms': result['original_atoms'], 'prepared_atoms': result['prepared_atoms'], 'removed_components': result['removed_components'], 'added_capping': result['added_capping'], 'preserved_ligands': result['preserved_ligands'], 'ligand_present': result.get('ligand_present', False), 'separate_ligands': result.get('separate_ligands', False), 'ligand_content': result.get('ligand_content', ''), 'ligand_name_changes': ligand_name_changes # List of (old_name, new_name, filename) tuples } return jsonify(response_data) except Exception as e: logger.error(f"Error preparing structure: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/parse-structure', methods=['POST']) def parse_structure_endpoint(): """Parse structure information""" try: data = request.get_json() pdb_content = data.get('pdb_content', '') if not pdb_content: return jsonify({'error': 'No PDB content provided'}), 400 # Parse structure structure_info = parse_structure_info(pdb_content) return jsonify({ 'success': True, 'structure_info': structure_info }) except Exception as e: logger.error(f"Error parsing structure: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/docking/run', methods=['POST']) @stream_with_context def run_docking(): """ Run ligand docking for preserved ligands using Vina and Meeko with streaming logs. All outputs are stored under OUTPUT_DIR/docking. Returns a summary of ligands and available poses (file-based, no contents). """ def generate(): try: docking_dir = _ensure_docking_folder() yield _format_log(f"Working directory: {os.getcwd()}") yield _format_log(f"Output directory: {OUTPUT_DIR}") yield _format_log(f"Docking directory: {docking_dir}") # Check if using ESMFold-completed structure flag_file = OUTPUT_DIR / ".use_completed_structure" complete_structure_path = OUTPUT_DIR / "0_complete_structure.pdb" if flag_file.exists() and complete_structure_path.exists(): yield _format_log("ℹ️ Using ESMFold-completed structure for receptor") yield _format_log(" (Completed chains from ESMFold + original chains not selected for completion)") yield _format_log("Preparing receptor for docking...") receptor_fixed, receptor_pdbqt = _prepare_receptor_for_docking() yield _format_log(f"✅ Receptor prepared: {receptor_pdbqt.name}") # Optional per-ligand configuration from frontend data = request.get_json(silent=True) or {} cfg_list = data.get("ligands", []) ligand_configs = {} for cfg in cfg_list: try: idx = int(cfg.get("index", 0)) if idx > 0: ligand_configs[idx] = cfg except Exception: continue # Find all individual ligand files (use obabel versions for better PDB->SDF conversion) ligand_files = sorted(OUTPUT_DIR.glob("4_ligands_corrected_obabel_*.pdb")) if not ligand_files: # Fallback to non-obabel files if obabel files don't exist ligand_files = sorted( [f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name] ) if not ligand_files: error_msg = 'No corrected ligand PDB files found. Please run structure preparation with preserved ligands.' yield _format_log(error_msg, 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" return yield _format_log(f"Found {len(ligand_files)} ligand file(s) to process") yield _format_log(f"Selected {len(ligand_configs)} ligand(s) for docking") ligands_summary = [] warnings = [] errors = [] for idx, lig_pdb in enumerate(ligand_files, start=1): # Only dock ligands that are explicitly enabled in the config # If no config exists for this ligand, skip it (user didn't select it) cfg = ligand_configs.get(idx) if cfg is None: # No config sent = ligand was not selected for docking continue if cfg.get("enabled") is False: # Explicitly disabled continue yield _format_log(f"\n{'='*60}") yield _format_log(f"Processing ligand {idx} ({lig_pdb.name})") yield _format_log(f"{'='*60}") lig_dir = docking_dir / f"ligand_{idx}" lig_dir.mkdir(parents=True, exist_ok=True) # Copy original corrected ligand for reference original_copy = lig_dir / "original_ligand.pdb" if not original_copy.exists(): original_copy.write_text(lig_pdb.read_text()) try: # Step 1: obabel to SDF yield _format_log(f"Step 1: Converting ligand {idx} from PDB to SDF using OpenBabel...") sdf_path = lig_dir / f"ligand_{idx}.sdf" cmd = [ "obabel", "-i", "pdb", str(lig_pdb), "-o", "sdf", "-O", str(sdf_path), ] yield _format_log(f"Running command: {' '.join(cmd)}") # Stream obabel output process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True ) for line in iter(process.stdout.readline, ''): if line: yield _format_log(line.strip()) process.wait() if process.returncode != 0 or not sdf_path.exists(): raise RuntimeError( f"OpenBabel failed for ligand {idx} ({lig_pdb.name}). Return code: {process.returncode}" ) yield _format_log(f"✅ OpenBabel conversion successful: {sdf_path.name}") # Step 2: Meeko ligand preparation -> PDBQT yield _format_log(f"Step 2: Preparing ligand {idx} with Meeko...") lig_pdbqt = lig_dir / f"ligand_{idx}.pdbqt" cmd = [ "mk_prepare_ligand.py", "-i", str(sdf_path), "-o", str(lig_pdbqt), ] yield _format_log(f"Running command: {' '.join(cmd)}") result = subprocess.run( cmd, capture_output=True, text=True, ) if result.stdout: yield _format_log(result.stdout.strip()) if result.stderr: yield _format_log(result.stderr.strip(), 'warning') if result.returncode != 0 or not lig_pdbqt.exists(): raise RuntimeError( f"Meeko failed for ligand {idx}.\n" f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}" ) yield _format_log(f"✅ Meeko preparation successful: {lig_pdbqt.name}") # Step 3: Read docking parameters from config file or use provided values yield _format_log(f"Step 3: Reading docking parameters for ligand {idx}...") config_file = lig_dir / "vina_config.txt" config = _parse_vina_config(config_file) if config_file.exists() else None # Initialize defaults exhaustiveness = 8 num_modes = 9 energy_range = 3 cpu = 0 seed = 0 # Priority: config file > user-provided > computed if config: yield _format_log(f"Reading parameters from config file: {config_file.name}") cx = config.get("center_x", None) cy = config.get("center_y", None) cz = config.get("center_z", None) sx = config.get("size_x", 18.0) sy = config.get("size_y", 18.0) sz = config.get("size_z", 18.0) exhaustiveness = config.get("exhaustiveness", 8) num_modes = config.get("num_modes", 9) energy_range = config.get("energy_range", 3) cpu = config.get("cpu", 0) seed = config.get("seed", 0) else: # Fallback to user-provided or computed user_center = (cfg or {}).get("center", {}) if cfg else {} if ( isinstance(user_center, dict) and all(k in user_center for k in ("x", "y", "z")) ): try: cx = float(user_center.get("x")) cy = float(user_center.get("y")) cz = float(user_center.get("z")) yield _format_log(f"Using user-provided center: ({cx:.2f}, {cy:.2f}, {cz:.2f})") except Exception: cx, cy, cz = _compute_ligand_center(lig_pdb) yield _format_log(f"Computed center: ({cx:.2f}, {cy:.2f}, {cz:.2f})") else: cx, cy, cz = _compute_ligand_center(lig_pdb) yield _format_log(f"Computed center: ({cx:.2f}, {cy:.2f}, {cz:.2f})") user_size = (cfg or {}).get("size", {}) if cfg else {} try: sx = float(user_size.get("x", 18.0)) sy = float(user_size.get("y", 18.0)) sz = float(user_size.get("z", 18.0)) except Exception: sx = sy = sz = 18.0 # If center not in config, compute it if cx is None or cy is None or cz is None: cx, cy, cz = _compute_ligand_center(lig_pdb) yield _format_log(f"Computed center: ({cx:.2f}, {cy:.2f}, {cz:.2f})") yield _format_log(f"Box center: ({cx:.2f}, {cy:.2f}, {cz:.2f}) Å") yield _format_log(f"Box size: ({sx:.2f}, {sy:.2f}, {sz:.2f}) Å") yield _format_log(f"Exhaustiveness: {exhaustiveness}, Num modes: {num_modes}, Energy range: {energy_range} kcal/mol") # Step 4: Run Vina docking yield _format_log(f"Step 4: Running AutoDock Vina docking for ligand {idx}...") docked_pdbqt = lig_dir / f"ligand_{idx}_docked.pdbqt" log_file = lig_dir / f"ligand_{idx}_docked.log" cmd = [ "vina", "--receptor", str(receptor_pdbqt), "--ligand", str(lig_pdbqt), "--center_x", str(cx), "--center_y", str(cy), "--center_z", str(cz), "--size_x", str(sx), "--size_y", str(sy), "--size_z", str(sz), "--exhaustiveness", str(exhaustiveness), "--num_modes", str(num_modes), "--energy_range", str(energy_range), "--out", str(docked_pdbqt), "--log", str(log_file), ] if cpu > 0: cmd.extend(["--cpu", str(cpu)]) if seed > 0: cmd.extend(["--seed", str(seed)]) yield _format_log(f"Running command: {' '.join(cmd)}") # Stream Vina output process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True ) for line in iter(process.stdout.readline, ''): if line: yield _format_log(line.strip()) process.wait() if process.returncode != 0 or not docked_pdbqt.exists(): raise RuntimeError( f"Vina failed for ligand {idx}. Return code: {process.returncode}" ) yield _format_log(f"✅ Vina docking completed: {docked_pdbqt.name}") # Step 6: Split poses yield _format_log(f"Step 5: Splitting docking poses for ligand {idx}...") cmd = [ "vina_split", "--input", str(docked_pdbqt), "--ligand", f"ligand_{idx}_mode", ] yield _format_log(f"Running command: {' '.join(cmd)}") result = subprocess.run( cmd, cwd=lig_dir, capture_output=True, text=True, ) if result.stdout: yield _format_log(result.stdout.strip()) if result.stderr: yield _format_log(result.stderr.strip(), 'warning') if result.returncode != 0: warnings.append( f"vina_split reported issues for ligand {idx}: {result.stderr.strip()}" ) yield _format_log(f"⚠️ Warning: vina_split issues for ligand {idx}", 'warning') else: yield _format_log(f"✅ Poses split successfully") # Parse binding energies from Vina log (per mode) mode_energies = _parse_vina_log(log_file) yield _format_log(f"Found {len(mode_energies)} binding mode(s)") # Step 7: Convert each mode back to PDB with OpenBabel yield _format_log(f"Step 6: Converting poses to PDB format...") pose_entries = [] mode_pdbqt_files = sorted(lig_dir.glob(f"ligand_{idx}_mode*.pdbqt")) yield _format_log(f"Processing {len(mode_pdbqt_files)} pose(s)...") for mode_pdbqt in mode_pdbqt_files: mode_name = mode_pdbqt.stem # e.g., ligand_1_mode1 mode_index_str = mode_name.replace(f"ligand_{idx}_mode", "") try: mode_index = int(mode_index_str) except ValueError: mode_index = None yield _format_log(f"Processing pose {mode_index} ({mode_name})...") mode_pdb_noH = lig_dir / f"{mode_name}_noH.pdb" mode_pdb_h = lig_dir / f"{mode_name}_h.pdb" sanitized_pdb = lig_dir / f"{mode_name}_sanitized.pdb" # Step 7a: Convert PDBQT to PDB without hydrogens if not mode_pdb_noH.exists(): yield _format_log(f" Converting {mode_pdbqt.name} to PDB (removing hydrogens)...") cmd = [ "obabel", "-i", "pdbqt", str(mode_pdbqt), "-o", "pdb", "-O", str(mode_pdb_noH), "-d", # Delete existing hydrogens ] result = subprocess.run( cmd, capture_output=True, text=True, ) if result.returncode != 0 or not mode_pdb_noH.exists(): warnings.append( f"Failed to convert {mode_pdbqt.name} to PDB for ligand {idx}: " f"{result.stderr.strip()}" ) yield _format_log(f" ⚠️ Failed to convert {mode_pdbqt.name}", 'warning') continue yield _format_log(f" ✅ Converted to {mode_pdb_noH.name}") # Step 7b: Add hydrogens at pH 7.4 using OpenBabel if not mode_pdb_h.exists(): yield _format_log(f" Adding hydrogens at pH 7.4...") cmd = [ "obabel", "-i", "pdb", str(mode_pdb_noH), "-o", "pdb", "-O", str(mode_pdb_h), "-p", "7.4", ] result = subprocess.run( cmd, capture_output=True, text=True, ) if result.returncode != 0 or not mode_pdb_h.exists(): logger.warning(f"OpenBabel h_add failed for {mode_pdb_noH.name}: {result.stderr}") yield _format_log(f" ⚠️ Failed to add hydrogens, using noH file", 'warning') # Fallback: use noH file mode_pdb_h.write_text(mode_pdb_noH.read_text()) else: yield _format_log(f" ✅ Hydrogens added: {mode_pdb_h.name}") # Step 7c: Create sanitized PDB with proper formatting for antechamber if not sanitized_pdb.exists(): yield _format_log(f" Sanitizing PDB for Antechamber compatibility...") try: # Get original residue info (BES, chain A, resnum 1611, etc.) original_residue_info = extract_original_residue_info(str(lig_pdb)) # Copy the h_add output sanitized_pdb.write_text(mode_pdb_h.read_text()) # Sanitize: fix atom names (C1, N1, H1...), residue name, chain, etc. _sanitize_docked_pose_for_antechamber(sanitized_pdb, original_residue_info) yield _format_log(f" ✅ Sanitized: {sanitized_pdb.name}") except Exception as e: logger.warning(f"Error sanitizing {mode_pdb_h.name}: {e}") yield _format_log(f" ⚠️ Sanitization error: {e}, using fallback", 'warning') # Fallback: just copy the h_add output if not sanitized_pdb.exists(): sanitized_pdb.write_text(mode_pdb_h.read_text()) energy = mode_energies.get(mode_index) if energy: yield _format_log(f" Binding energy: {energy:.2f} kcal/mol") pose_entries.append( { "mode_index": mode_index, "file": str(mode_pdb_h.relative_to(OUTPUT_DIR)), "sanitized_file": str(sanitized_pdb.relative_to(OUTPUT_DIR)), "energy": energy, } ) yield _format_log(f"✅ Successfully processed ligand {idx} with {len(pose_entries)} pose(s)", 'success') # Extract ligand name (resname) from PDB file resname, chain = _get_ligand_info_from_pdb(lig_pdb) ligands_summary.append( { "index": idx, "name": resname, "chain": chain, "original_file": str(original_copy.relative_to(OUTPUT_DIR)), "corrected_file": str(lig_pdb.relative_to(OUTPUT_DIR)), "poses": pose_entries, } ) except Exception as e: error_msg = f"Ligand {idx} ({lig_pdb.name}): {str(e)}" errors.append(error_msg) yield _format_log(f"❌ Error: {error_msg}", 'error') # Assign displayLabel to match structure preparation and get-ligand-boxes (GOL-A-1, LIZ-A). resname_chain_count = defaultdict(int) for lig in ligands_summary: resname_chain_count[(lig["name"], lig["chain"])] += 1 resname_chain_instance = defaultdict(int) for lig in ligands_summary: key = (lig["name"], lig["chain"]) resname_chain_instance[key] += 1 instance = resname_chain_instance[key] count = resname_chain_count[key] lig["displayLabel"] = f"{lig['name']}-{lig['chain']}-{instance}" if count > 1 else f"{lig['name']}-{lig['chain']}" # Validate and sanitize ligand names before returning results # This ensures any numeric names are converted early validate_and_sanitize_all_ligand_files() # Send final result result_data = { 'type': 'complete', 'success': len(errors) == 0, 'ligands': ligands_summary, 'warnings': warnings, 'errors': errors, } yield f"data: {json.dumps(result_data)}\n\n" except Exception as e: logger.error(f"Error running docking: {str(e)}") yield _format_log(f'Internal server error: {str(e)}', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Internal server error: {str(e)}'})}\n\n" return Response(generate(), mimetype='text/event-stream') @app.route('/api/docking/get-structure', methods=['GET']) def get_docking_structure(): """ Return PDB content for a docking structure (original or a specific pose). Query parameters: - ligand_index: 1-based index of ligand - type: 'original' or 'pose' - mode_index: integer (required when type='pose') """ try: ligand_index = int(request.args.get("ligand_index", "0")) if ligand_index <= 0: return jsonify({"success": False, "error": "Invalid ligand_index"}), 400 docking_dir = OUTPUT_DIR / "docking" / f"ligand_{ligand_index}" if not docking_dir.exists(): return jsonify({"success": False, "error": "Docking results not found for this ligand"}), 404 struct_type = request.args.get("type", "original") if struct_type == "original": pdb_path = docking_dir / "original_ligand.pdb" else: mode_index = int(request.args.get("mode_index", "0")) if mode_index <= 0: return jsonify({"success": False, "error": "mode_index must be positive for pose"}), 400 pdb_path = docking_dir / f"ligand_{ligand_index}_mode{mode_index}_h.pdb" if not pdb_path.exists(): return jsonify({"success": False, "error": f"PDB file not found: {pdb_path.name}"}), 404 content = pdb_path.read_text() return jsonify({"success": True, "content": content}) except Exception as e: logger.error(f"Error getting docking structure: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/docking/get-config', methods=['GET']) def get_docking_config(): """ Get Vina config file for a ligand. Query parameters: - ligand_index: 1-based index of ligand """ try: ligand_index = int(request.args.get("ligand_index", "0")) if ligand_index <= 0: return jsonify({"success": False, "error": "Invalid ligand_index"}), 400 docking_dir = OUTPUT_DIR / "docking" docking_dir.mkdir(parents=True, exist_ok=True) lig_dir = docking_dir / f"ligand_{ligand_index}" lig_dir.mkdir(parents=True, exist_ok=True) config_file = lig_dir / "vina_config.txt" # If config doesn't exist, generate default if not config_file.exists(): # Get ligand PDB to compute center ligand_files = sorted(OUTPUT_DIR.glob("4_ligands_corrected_obabel_*.pdb")) if not ligand_files: ligand_files = sorted( [f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name] ) if ligand_index <= len(ligand_files): lig_pdb = ligand_files[ligand_index - 1] cx, cy, cz = _compute_ligand_center(lig_pdb) else: cx, cy, cz = 0.0, 0.0, 0.0 # Generate default config default_config = f"""# AutoDock Vina Configuration File # Ligand {ligand_index} # Search space center (Angstroms) center_x = {cx:.2f} center_y = {cy:.2f} center_z = {cz:.2f} # Search space size (Angstroms) size_x = 18.0 size_y = 18.0 size_z = 18.0 # Exhaustiveness of the global search (default: 8) # Higher values give better results but take longer exhaustiveness = 8 # Number of binding modes to generate (default: 9) num_modes = 9 # Maximum energy difference between the best binding mode and the worst one displayed (kcal/mol, default: 3) energy_range = 3 # Optional: CPU usage (default: 0 = use all available CPUs) cpu = 0 # Optional: Seed for random number generator (default: 0 = random) seed = 0 """ config_file.write_text(default_config) content = config_file.read_text() return jsonify({"success": True, "content": content}) except Exception as e: logger.error(f"Error getting docking config: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/docking/save-config', methods=['POST']) def save_docking_config(): """ Save Vina config file for a ligand. Body: { "ligand_index": int, "content": str } """ try: data = request.get_json() ligand_index = int(data.get("ligand_index", 0)) content = data.get("content", "") if ligand_index <= 0: return jsonify({"success": False, "error": "Invalid ligand_index"}), 400 if not content: return jsonify({"success": False, "error": "Config content is required"}), 400 docking_dir = OUTPUT_DIR / "docking" docking_dir.mkdir(parents=True, exist_ok=True) lig_dir = docking_dir / f"ligand_{ligand_index}" lig_dir.mkdir(parents=True, exist_ok=True) config_file = lig_dir / "vina_config.txt" config_file.write_text(content) return jsonify({"success": True, "message": f"Config saved for ligand {ligand_index}"}) except Exception as e: logger.error(f"Error saving docking config: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/docking/get-protein', methods=['GET']) def get_docking_protein(): """ Return the prepared protein structure (tleap_ready.pdb) for the poses viewer. """ try: tleap_ready = OUTPUT_DIR / "tleap_ready.pdb" if not tleap_ready.exists(): return jsonify({"success": False, "error": "Prepared structure not found"}), 404 content = tleap_ready.read_text() return jsonify({"success": True, "content": content}) except Exception as e: logger.error(f"Error getting protein structure: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 def _validate_and_sanitize_ligand_name(resname: str) -> tuple[str, bool]: """ Validate ligand residue name. If it's pure numeric, convert to a 3-letter code. PDB format requires 3-letter residue names (exactly 3 characters). Returns: (sanitized_name, was_changed) """ if not resname: return "LIG", True # Check if resname is pure numeric if resname.isdigit(): # Convert numeric name to a 3-letter code # Strategy: Use "L" + last 2 digits (e.g., 478 -> "L78", 5 -> "L05") num = int(resname) # Use modulo 100 to get last 2 digits, then format as 2-digit string last_two = num % 100 sanitized = f"L{last_two:02d}" # L00, L01, ..., L05, ..., L78, ..., L99 return sanitized, True # Ensure it's exactly 3 characters (pad or truncate if needed) resname_stripped = resname.strip() if len(resname_stripped) != 3: if len(resname_stripped) < 3: # Pad with spaces on the right (PDB format is right-justified) sanitized = f"{resname_stripped:>3}" else: # Truncate to 3 characters sanitized = resname_stripped[:3] if sanitized != resname_stripped: return sanitized, True return resname_stripped, False def _update_pdb_residue_name(pdb_path: Path, old_resname: str, new_resname: str): """ Update all residue names in a PDB file from old_resname to new_resname. Only updates ATOM and HETATM records. """ try: content = pdb_path.read_text() lines = content.split('\n') updated_lines = [] updated = False for line in lines: if line.startswith(('ATOM', 'HETATM')): # Extract current residue name (columns 18-20, 0-indexed: 17-20) current_resname = line[17:20].strip() if current_resname == old_resname: # Replace the residue name (columns 17-20, right-justified) new_line = line[:17] + f"{new_resname:>3}" + line[20:] updated_lines.append(new_line) updated = True else: updated_lines.append(line) else: updated_lines.append(line) if updated: pdb_path.write_text('\n'.join(updated_lines)) return updated except Exception as e: logger.warning(f"Failed to update residue name in {pdb_path}: {e}") return False def validate_and_sanitize_all_ligand_files(): """ Validate and sanitize all ligand PDB files in the output directory. This should be called early in the workflow to ensure consistency. Returns list of warnings about name changes in format: [(old_name, new_name, filename), ...] """ warnings = [] try: # Find all corrected ligand files ligand_files = sorted([f for f in OUTPUT_DIR.glob('4_ligands_corrected_*.pdb') if "_obabel_" not in f.name]) if not ligand_files: # Check for single ligand file single_lig_file = OUTPUT_DIR / '4_ligands_corrected.pdb' if single_lig_file.exists(): ligand_files = [single_lig_file] for lig_file in ligand_files: # Read the file first to get original name original_resname = None with open(lig_file, 'r') as f: for line in f: if line.startswith(('ATOM', 'HETATM')): original_resname = line[17:20].strip() break if original_resname: # Check if it's numeric if original_resname.isdigit(): # Get sanitized name sanitized_name, was_changed = _validate_and_sanitize_ligand_name(original_resname) if was_changed: # Update the file _update_pdb_residue_name(lig_file, original_resname, sanitized_name) warnings.append((original_resname, sanitized_name, lig_file.name)) else: # Still validate to ensure 3-letter format sanitized_name, was_changed = _validate_and_sanitize_ligand_name(original_resname) if was_changed and sanitized_name != original_resname: _update_pdb_residue_name(lig_file, original_resname, sanitized_name) warnings.append((original_resname, sanitized_name, lig_file.name)) # Also validate tleap_ready.pdb if it exists tleap_ready = OUTPUT_DIR / "tleap_ready.pdb" if tleap_ready.exists(): # Collect original names from tleap_ready.pdb original_names = {} with open(tleap_ready, 'r') as f: for line in f: if line.startswith('HETATM'): resname = line[17:20].strip() if resname and resname not in ['HOH', 'WAT', 'TIP', 'SPC', 'NA', 'CL']: if resname not in original_names: original_names[resname] = True # Validate each unique name for original_resname in original_names.keys(): if original_resname.isdigit(): sanitized_name, was_changed = _validate_and_sanitize_ligand_name(original_resname) if was_changed: _update_pdb_residue_name(tleap_ready, original_resname, sanitized_name) warnings.append((original_resname, sanitized_name, tleap_ready.name)) except Exception as e: logger.warning(f"Error validating ligand files: {e}") return warnings def _get_ligand_info_from_pdb(pdb_path: Path, sanitize: bool = True): """ Extract residue name and chain ID from a ligand PDB file. If sanitize=True, validates and updates numeric residue names in the file. """ resname = "UNK" chain = "A" with open(pdb_path, 'r') as f: for line in f: if line.startswith(('ATOM', 'HETATM')): # PDB format: residue name is columns 18-20, chain is column 22 resname = line[17:20].strip() chain = line[21:22].strip() or "A" break # Validate and sanitize if needed if sanitize: sanitized_name, was_changed = _validate_and_sanitize_ligand_name(resname) if was_changed: original_name = resname logger.warning( f"Ligand residue name '{original_name}' in {pdb_path.name} is pure numeric. " f"Changed to '{sanitized_name}' (3-letter code) to avoid errors. " f"The PDB file has been updated." ) _update_pdb_residue_name(pdb_path, resname, sanitized_name) resname = sanitized_name return resname, chain @app.route('/api/docking/get-ligand-boxes', methods=['GET']) def get_ligand_boxes(): """ Return default ligand box suggestions (center and size) for each corrected ligand. Also returns ligand name (residue name) and chain ID for display. Center is computed from 4_ligands_corrected_obabel_*.pdb using MDAnalysis, size defaults to 10 Å cube. """ try: # Use obabel versions for better atom naming compatibility ligand_files = sorted(OUTPUT_DIR.glob("4_ligands_corrected_obabel_*.pdb")) if not ligand_files: # Fallback to non-obabel files ligand_files = sorted( [f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name] ) # Also get chain information from prepared structure chains = [] tleap_ready = OUTPUT_DIR / "tleap_ready.pdb" if tleap_ready.exists(): seen_chains = set() with open(tleap_ready, 'r') as f: for line in f: if line.startswith(('ATOM', 'HETATM')): chain = line[21:22].strip() or "A" if chain not in seen_chains: seen_chains.add(chain) chains.append(chain) ligands = [] for idx, lig_pdb in enumerate(ligand_files, start=1): try: cx, cy, cz = _compute_ligand_center(lig_pdb) resname, chain = _get_ligand_info_from_pdb(lig_pdb) ligands.append( { "index": idx, "name": resname, "chain": chain, "center": {"x": cx, "y": cy, "z": cz}, "size": {"x": 10.0, "y": 10.0, "z": 10.0}, } ) except Exception as e: logger.warning(f"Failed to compute center for {lig_pdb}: {e}") continue # Assign displayLabel to match structure preparation: GOL-A-1, GOL-A-2 when # the same (resname, chain) appears more than once; otherwise resname-chain (e.g. LIZ-A). resname_chain_count = defaultdict(int) for lig in ligands: resname_chain_count[(lig["name"], lig["chain"])] += 1 resname_chain_instance = defaultdict(int) for lig in ligands: key = (lig["name"], lig["chain"]) resname_chain_instance[key] += 1 instance = resname_chain_instance[key] count = resname_chain_count[key] lig["displayLabel"] = f"{lig['name']}-{lig['chain']}-{instance}" if count > 1 else f"{lig['name']}-{lig['chain']}" return jsonify({"success": True, "ligands": ligands, "chains": sorted(chains)}) except Exception as e: logger.error(f"Error computing ligand boxes: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 @app.route('/api/docking/apply', methods=['POST']) def apply_docking_poses(): """ Apply user-selected docked poses by replacing the corresponding 4_ligands_corrected_*.pdb files and rebuilding tleap_ready.pdb. Request JSON: { "selections": [ {"ligand_index": 1, "choice": "original"}, {"ligand_index": 2, "choice": "mode", "mode_index": 1}, ... ] } """ try: data = request.get_json() or {} selections = data.get("selections", []) if not isinstance(selections, list) or not selections: return jsonify({"success": False, "error": "No selections provided"}), 400 protein_capped = OUTPUT_DIR / "2_protein_with_caps.pdb" if not protein_capped.exists(): return jsonify( { "success": False, "error": "2_protein_with_caps.pdb not found. Run structure preparation first.", } ), 400 # Update ligand files according to selections updated_indices = [] for sel in selections: try: lig_index = int(sel.get("ligand_index", 0)) choice = sel.get("choice", "original") if lig_index <= 0: continue corrected_path = OUTPUT_DIR / f"4_ligands_corrected_{lig_index}.pdb" if not corrected_path.exists(): continue if choice == "original": # Nothing to change for this ligand continue if choice == "mode": mode_index = int(sel.get("mode_index", 0)) if mode_index <= 0: continue # Use the sanitized pose file (already processed with h_add and sanitized) sanitized_pose = ( OUTPUT_DIR / "docking" / f"ligand_{lig_index}" / f"ligand_{lig_index}_mode{mode_index}_sanitized.pdb" ) # Fallback to pose with hydrogens if sanitized doesn't exist if not sanitized_pose.exists(): sanitized_pose = ( OUTPUT_DIR / "docking" / f"ligand_{lig_index}" / f"ligand_{lig_index}_mode{mode_index}_h.pdb" ) if not sanitized_pose.exists(): logger.warning(f"Docking pose not found: {sanitized_pose}") continue # Copy sanitized pose over corrected ligand corrected_path.write_text(sanitized_pose.read_text()) updated_indices.append(lig_index) except Exception as e: logger.warning(f"Error applying selection {sel}: {str(e)}") # Rebuild tleap_ready.pdb using updated ligand files (if any) tleap_ready = OUTPUT_DIR / "tleap_ready.pdb" ligand_groups = [] ligand_files = sorted( [f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name] ) for lig_pdb in ligand_files: lines = [ line for line in lig_pdb.read_text().splitlines(keepends=True) if line.startswith(("ATOM", "HETATM")) ] if lines: ligand_groups.append(lines) if ligand_groups: ok = merge_protein_and_ligand( str(protein_capped), None, str(tleap_ready), ligand_groups=ligand_groups ) if not ok: return jsonify( { "success": False, "error": "Failed to merge protein and updated ligands into tleap_ready.pdb", } ), 500 return jsonify( { "success": True, "updated_ligands": updated_indices, "tleap_ready": str(tleap_ready.relative_to(OUTPUT_DIR)) if tleap_ready.exists() else None, } ) except Exception as e: logger.error(f"Error applying docking poses: {str(e)}") return jsonify({"success": False, "error": str(e)}), 500 def _format_log(message, log_type='info'): """Helper function to format log message for SSE""" data = json.dumps({'type': log_type, 'message': message}) return f"data: {data}\n\n" @app.route('/api/generate-ligand-ff', methods=['POST']) @stream_with_context def generate_ligand_ff(): """Generate force field parameters for multiple ligands with streaming logs""" def generate(): try: data = request.get_json() force_field = data.get('force_field', 'gaff2') # Determine the s parameter based on force field s_param = 2 if force_field == 'gaff2' else 1 yield _format_log(f"Working directory: {os.getcwd()}") yield _format_log(f"Output directory: {OUTPUT_DIR}") # Find all individual ligand files (4_ligands_corrected_1.pdb, 4_ligands_corrected_2.pdb, etc.) # Exclude OpenBabel output files (4_ligands_corrected_obabel_*.pdb) ligand_files = sorted([f for f in OUTPUT_DIR.glob("4_ligands_corrected_*.pdb") if "_obabel_" not in f.name]) if not ligand_files: # Fallback: check for single ligand file (backward compatibility) single_ligand_pdb = OUTPUT_DIR / "4_ligands_corrected.pdb" if single_ligand_pdb.exists(): ligand_files = [single_ligand_pdb] else: yield _format_log('Ligand PDB file(s) not found. Please prepare structure with ligands first.', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'Ligand PDB file(s) not found. Please prepare structure with ligands first.'})}\n\n" return yield _format_log(f"Found {len(ligand_files)} ligand file(s) to process") # Validate and sanitize all ligand files first (early validation) # This ensures numeric ligand names are converted to LIG{number} format validate_and_sanitize_all_ligand_files() yield _format_log("Validated ligand residue names (numeric names converted to LIG{number} format if needed)") import re processed_ligands = [] errors = [] # Step 1: Extract residue names and group ligands by residue name ligand_by_resname = {} # Maps residue name to list of (ligand_pdb, ligand_num) tuples resname_to_ligand_num = {} # Maps residue name to the ligand_num we'll use for processing for i, ligand_pdb in enumerate(ligand_files, 1): ligand_num = i # Extract number from filename if available (e.g., 4_ligands_corrected_1.pdb -> 1) match = re.search(r'_(\d+)\.pdb$', ligand_pdb.name) if match: ligand_num = int(match.group(1)) # Extract residue name from this ligand file (already sanitized by validate function) resname = get_residue_name_from_pdb(ligand_pdb, sanitize=True) if not resname: yield _format_log(f"Warning: Could not extract residue name from {ligand_pdb.name}, using LIG{ligand_num}", 'warning') resname = f"LIG{ligand_num}" # Group by residue name if resname not in ligand_by_resname: ligand_by_resname[resname] = [] resname_to_ligand_num[resname] = ligand_num # Use first occurrence's number ligand_by_resname[resname].append((ligand_pdb, ligand_num)) yield _format_log(f"Found {len(ligand_by_resname)} unique ligand residue name(s): {', '.join(sorted(ligand_by_resname.keys()))}") # Step 2: Process each unique residue name only once for resname, ligand_list in ligand_by_resname.items(): # Use the first ligand file for this residue name ligand_pdb, ligand_num = ligand_list[0] # If there are multiple occurrences, log it if len(ligand_list) > 1: other_nums = [num for _, num in ligand_list[1:]] yield _format_log(f"Residue {resname} appears {len(ligand_list)} times (ligand files: {ligand_num}, {', '.join(map(str, other_nums))})", 'info') yield _format_log(f"Processing {resname} once using ligand file {ligand_num}, skipping duplicates", 'info') # Use residue name for output files to avoid conflicts ligand_mol2 = OUTPUT_DIR / f"{resname}.mol2" ligand_frcmod = OUTPUT_DIR / f"{resname}.frcmod" yield _format_log(f"\n{'='*60}") yield _format_log(f"Processing ligand {resname} (from file {ligand_pdb.name})") yield _format_log(f"{'='*60}") # Step 1: Calculate net charge using awk yield _format_log(f"Step 1: Calculating net charge for ligand {resname}...") awk_cmd = "awk '/^HETATM/ {if($NF ~ /[A-Z][0-9]-$/) charge--; if($NF ~ /[A-Z][0-9]\\+$/) charge++} END {print \"Net charge:\", charge+0}'" cmd1 = f"{awk_cmd} {ligand_pdb}" try: result = subprocess.run(cmd1, shell=True, capture_output=True, text=True) output = result.stdout.strip() yield _format_log(f"Awk output: '{output}'") net_charge_match = re.search(r'Net charge:\s*(-?\d+)', output) if net_charge_match: net_charge = int(net_charge_match.group(1)) yield _format_log(f"Calculated net charge: {net_charge}") else: yield _format_log("Could not extract net charge from awk output, using 0", 'warning') net_charge = 0 except Exception as e: yield _format_log(f"Error running awk command: {e}, using net charge 0", 'error') net_charge = 0 # Step 2: Run antechamber with streaming output yield _format_log(f"Step 2: Running antechamber for ligand {resname} with net charge {net_charge}...") cmd2 = f"antechamber -i {ligand_pdb.name} -fi pdb -o {ligand_mol2.name} -fo mol2 -c bcc -at {force_field} -nc {net_charge}" yield _format_log(f"Running command: {cmd2}") # Stream antechamber output in real-time process = subprocess.Popen(cmd2, shell=True, cwd=str(OUTPUT_DIR), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True) for line in iter(process.stdout.readline, ''): if line: yield _format_log(line.strip()) process.wait() return_code = process.returncode yield _format_log(f"antechamber return code: {return_code}") if return_code != 0: error_msg = f'antechamber failed for ligand {resname} with net charge {net_charge}' yield _format_log(f"ERROR: {error_msg}", 'error') errors.append(error_msg) continue # Step 3: Run parmchk2 with streaming output yield _format_log(f"Step 3: Running parmchk2 for ligand {resname}...") cmd3 = f"parmchk2 -i {ligand_mol2.name} -f mol2 -o {ligand_frcmod.name} -a Y -s {s_param}" yield _format_log(f"Running command: {cmd3}") # Stream parmchk2 output in real-time process = subprocess.Popen(cmd3, shell=True, cwd=str(OUTPUT_DIR), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, universal_newlines=True) for line in iter(process.stdout.readline, ''): if line: yield _format_log(line.strip()) process.wait() return_code = process.returncode yield _format_log(f"parmchk2 return code: {return_code}") if return_code != 0: error_msg = f'parmchk2 failed for ligand {resname}' yield _format_log(f"ERROR: {error_msg}", 'error') errors.append(error_msg) continue # Check if files were generated successfully if ligand_mol2.exists() and ligand_frcmod.exists(): processed_ligands.append({ 'resname': resname, 'ligand_num': ligand_num, 'net_charge': net_charge, 'files': { 'pdb': str(ligand_pdb), 'mol2': str(ligand_mol2), 'frcmod': str(ligand_frcmod) }, 'duplicate_files': [str(pdb) for pdb, num in ligand_list[1:]] if len(ligand_list) > 1 else [] }) yield _format_log(f"✅ Successfully processed ligand {resname}", 'success') else: error_msg = f'Force field generation failed for ligand {resname} - output files not created' yield _format_log(f"ERROR: {error_msg}", 'error') errors.append(error_msg) if not processed_ligands: error_msg = f'Failed to process any ligands. Errors: {"; ".join(errors)}' yield _format_log(error_msg, 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" return # Send final result result_data = { 'type': 'complete', 'success': True, 'message': f'Successfully processed {len(processed_ligands)} ligand(s) with force field {force_field}', 'ligands': processed_ligands, 'errors': errors if errors else None } yield f"data: {json.dumps(result_data)}\n\n" except Exception as e: logger.error(f"Error generating ligand force field: {str(e)}") yield _format_log(f'Internal server error: {str(e)}', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Internal server error: {str(e)}'})}\n\n" return Response(generate(), mimetype='text/event-stream') @app.route('/api/calculate-net-charge', methods=['POST']) def calculate_net_charge(): """Calculate net charge of the system using tleap""" try: # Check if structure is prepared tleap_ready_file = OUTPUT_DIR / "tleap_ready.pdb" if not tleap_ready_file.exists(): return jsonify({'error': 'Structure not prepared. Please prepare structure first.'}), 400 # Check if ligands are present - look for residue-named files first, then fallback to numbered files ligand_mol2_files = [] ligand_frcmod_files = [] ligand_resname_map = {} # Maps residue name to (mol2_file, frcmod_file) # First, try to find residue-named files (e.g., O9C.mol2, O9C.frcmod) unique_resnames = get_all_ligand_residue_names() for resname in unique_resnames: mol2_file = OUTPUT_DIR / f"{resname}.mol2" frcmod_file = OUTPUT_DIR / f"{resname}.frcmod" if mol2_file.exists() and frcmod_file.exists(): ligand_resname_map[resname] = (mol2_file, frcmod_file) ligand_mol2_files.append(mol2_file) ligand_frcmod_files.append(frcmod_file) # Fallback: check for numbered files (backward compatibility) if not ligand_mol2_files: numbered_mol2 = sorted(OUTPUT_DIR.glob("4_ligands_corrected_*.mol2")) numbered_frcmod = sorted(OUTPUT_DIR.glob("4_ligands_corrected_*.frcmod")) if numbered_mol2 and numbered_frcmod: ligand_mol2_files = numbered_mol2 ligand_frcmod_files = numbered_frcmod # Try to map to residue names resnames = get_all_ligand_residue_names() for i, (mol2_file, frcmod_file) in enumerate(zip(ligand_mol2_files, ligand_frcmod_files)): # Extract residue name from mol2 file if possible resname = get_residue_name_from_mol2(mol2_file) if mol2_file.exists() else None if not resname: # Try to get from tleap_ready.pdb if resnames and i < len(resnames): resname = resnames[i] else: resname = f"LIG{len(ligand_resname_map) + 1}" # Only add if not already in map (avoid duplicates) if resname not in ligand_resname_map: ligand_resname_map[resname] = (mol2_file, frcmod_file) # Final fallback: single ligand file (backward compatibility) if not ligand_mol2_files: single_mol2 = OUTPUT_DIR / "4_ligands_corrected.mol2" single_frcmod = OUTPUT_DIR / "4_ligands_corrected.frcmod" if single_mol2.exists() and single_frcmod.exists(): ligand_mol2_files = [single_mol2] ligand_frcmod_files = [single_frcmod] resname = get_all_ligand_residue_names() if resname: ligand_resname_map[resname[0]] = (single_mol2, single_frcmod) else: ligand_resname_map["LIG"] = (single_mol2, single_frcmod) ligand_present = len(ligand_mol2_files) > 0 and len(ligand_frcmod_files) > 0 # Create dynamic tleap input file tleap_input = OUTPUT_DIR / "calc_charge_on_system.in" # Get the selected force field from the request data = request.get_json() if request.get_json() else {} selected_force_field = data.get('force_field', 'ff14SB') with open(tleap_input, 'w') as f: f.write(f"source leaprc.protein.{selected_force_field}\n") f.write("source leaprc.gaff2\n\n") if ligand_present: # Load each unique ligand parameter and structure only once # Use sorted to ensure consistent ordering for resname in sorted(ligand_resname_map.keys()): mol2_file, frcmod_file = ligand_resname_map[resname] f.write(f"loadamberparams {frcmod_file.name}\n") f.write(f"{resname} = loadmol2 {mol2_file.name}\n") f.write("\n") f.write("x = loadpdb tleap_ready.pdb\n\n") f.write("charge x\n\n") f.write("quit\n") # Run tleap command print("Running tleap to calculate system charge...") # Find tleap executable dynamically try: # First try to find tleap in PATH which_result = subprocess.run(['which', 'tleap'], capture_output=True, text=True) if which_result.returncode == 0: tleap_path = which_result.stdout.strip() else: # Fallback: try common conda environment paths conda_env = os.environ.get('CONDA_DEFAULT_ENV', 'MD_pipeline') conda_prefix = os.environ.get('CONDA_PREFIX', '') if conda_prefix: tleap_path = os.path.join(conda_prefix, 'bin', 'tleap') else: # Last resort: assume it's in PATH tleap_path = 'tleap' cmd = f"{tleap_path} -f calc_charge_on_system.in" result = subprocess.run(cmd, shell=True, cwd=str(OUTPUT_DIR), capture_output=True, text=True) except Exception as e: # Fallback to simple tleap command cmd = f"tleap -f calc_charge_on_system.in" result = subprocess.run(cmd, shell=True, cwd=str(OUTPUT_DIR), capture_output=True, text=True) print(f"tleap return code: {result.returncode}") print(f"tleap stdout: {result.stdout}") print(f"tleap stderr: {result.stderr}") # Check if we got the charge information even if tleap had a non-zero exit code # (tleap often returns non-zero when run non-interactively but still calculates charge) if 'Total unperturbed charge' not in result.stdout and 'Total charge' not in result.stdout: return jsonify({'error': f'tleap failed to calculate charge. Error: {result.stderr}'}), 500 # Parse the output to find the net charge output_lines = result.stdout.split('\n') net_charge = None for line in output_lines: if 'Total unperturbed charge' in line or 'Total charge' in line: # Look for patterns like "Total charge: -3.0000" or "Total unperturbed charge: -3.0000" import re charge_match = re.search(r'charge[:\s]+(-?\d+\.?\d*)', line) if charge_match: net_charge = float(charge_match.group(1)) break if net_charge is None: return jsonify({'error': 'Could not extract net charge from tleap output'}), 500 # Suggest ion addition (plain-language message) if net_charge > 0: suggestion = "The system is positively charged. Add Cl- to neutralize." ion_type = "Cl-" ion_count = int(round(net_charge)) elif net_charge < 0: suggestion = "The system is negatively charged. Add Na+ to neutralize." ion_type = "Na+" ion_count = int(round(abs(net_charge))) else: suggestion = "The system is neutral. No ions needed." ion_type = "None" ion_count = 0 return jsonify({ 'success': True, 'net_charge': net_charge, 'suggestion': suggestion, 'ion_type': ion_type, 'ion_count': ion_count, 'ligand_present': ligand_present }) except Exception as e: logger.error(f"Error calculating net charge: {str(e)}") return jsonify({'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/generate-all-files', methods=['POST']) def generate_all_files(): """Generate all simulation input files based on UI parameters""" try: data = request.get_json() # Get simulation parameters from UI cutoff_distance = data.get('cutoff_distance', 10.0) temperature = data.get('temperature', 310.0) pressure = data.get('pressure', 1.0) # Get step parameters restrained_steps = data.get('restrained_steps', 10000) restrained_force = data.get('restrained_force', 10.0) min_steps = data.get('min_steps', 20000) npt_heating_steps = data.get('npt_heating_steps', 50000) npt_equilibration_steps = data.get('npt_equilibration_steps', 100000) production_steps = data.get('production_steps', 1000000) # Integration time step (ps) dt = data.get('timestep', 0.002) # Get force field parameters force_field = data.get('force_field', 'ff14SB') water_model = data.get('water_model', 'TIP3P') add_ions = data.get('add_ions', 'None') distance = data.get('distance', 10.0) # Validation warnings warnings = [] if restrained_steps < 5000: warnings.append("Restrained minimization steps should be at least 5000") if min_steps < 10000: warnings.append("Minimization steps should be at least 10000") # Count total residues in tleap_ready.pdb tleap_ready_file = OUTPUT_DIR / "tleap_ready.pdb" if not tleap_ready_file.exists(): return jsonify({'error': 'tleap_ready.pdb not found. Please prepare structure first.'}), 400 total_residues = count_residues_in_pdb(str(tleap_ready_file)) # Generate min_restrained.in generate_min_restrained_file(restrained_steps, restrained_force, total_residues, cutoff_distance) # Generate min.in generate_min_file(min_steps, cutoff_distance) # Generate HeatNPT.in generate_heat_npt_file(npt_heating_steps, temperature, pressure, cutoff_distance, dt) # Generate mdin_equi.in (NPT Equilibration) generate_npt_equilibration_file(npt_equilibration_steps, temperature, pressure, cutoff_distance, dt) # Check if plumed.dat exists in output folder plumed_file = OUTPUT_DIR / 'plumed.dat' use_plumed = plumed_file.exists() # Generate mdin_prod.in (Production) generate_production_file(production_steps, temperature, pressure, cutoff_distance, dt, use_plumed=use_plumed) # Generate force field parameters ff_files_generated = [] try: generate_ff_parameters_file(force_field, water_model, add_ions, distance) # Find tleap executable tleap_path = None try: result = subprocess.run(['which', 'tleap'], capture_output=True, text=True) if result.returncode == 0: tleap_path = result.stdout.strip() except: pass if not tleap_path: conda_prefix = os.environ.get('CONDA_PREFIX') if conda_prefix: tleap_path = os.path.join(conda_prefix, 'bin', 'tleap') else: tleap_path = '/home/hn533621/.conda/envs/MD_pipeline/bin/tleap' # Run tleap to generate force field parameters cmd = f"{tleap_path} -f generate_ff_parameters.in" result = subprocess.run(cmd, shell=True, cwd=str(OUTPUT_DIR), capture_output=True, text=True, timeout=300) if result.returncode != 0: warnings.append(f"Force field generation failed: {result.stderr}") else: # Check if key output files were created ff_output_files = ['protein.prmtop', 'protein.inpcrd', 'protein_solvated.pdb'] for ff_file in ff_output_files: if (OUTPUT_DIR / ff_file).exists(): ff_files_generated.append(ff_file) if len(ff_files_generated) == 0: warnings.append("Force field parameter files were not generated") except Exception as ff_error: warnings.append(f"Force field generation error: {str(ff_error)}") # Generate PBS submit script into output pbs_generated = generate_submit_pbs_file(use_plumed=use_plumed) all_files = [ 'min_restrained.in', 'min.in', 'HeatNPT.in', 'mdin_equi.in', 'mdin_prod.in' ] + ff_files_generated if pbs_generated: all_files.append('submit_job.pbs') return jsonify({ 'success': True, 'message': f'All simulation files generated successfully ({len(all_files)} files)', 'warnings': warnings, 'files_generated': all_files }) except Exception as e: logger.error(f"Error generating simulation files: {str(e)}") return jsonify({'error': f'Internal server error: {str(e)}'}), 500 def count_residues_in_pdb(pdb_file): """Count total number of residues in PDB file""" try: with open(pdb_file, 'r') as f: lines = f.readlines() residues = set() for line in lines: if line.startswith(('ATOM', 'HETATM')): # Extract residue number (columns 23-26) residue_num = line[22:26].strip() if residue_num: residues.add(residue_num) return len(residues) except Exception as e: logger.error(f"Error counting residues: {str(e)}") return 607 # Default fallback def generate_min_restrained_file(steps, force_constant, total_residues, cutoff): """Generate min_restrained.in file""" content = f"""initial minimization solvent + ions &cntrl imin = 1, maxcyc = {steps}, ncyc = {steps // 2}, ntb = 1, ntr = 1, ntxo = 1, cut = {cutoff} / Restrain {force_constant} RES 1 {total_residues} END END """ with open(OUTPUT_DIR / "min_restrained.in", 'w') as f: f.write(content) def generate_min_file(steps, cutoff): """Generate min.in file""" content = f"""Minimization &cntrl imin=1, maxcyc={steps}, ncyc={steps // 4}, ntb=1, cut={cutoff}, igb=0, ntr=0, / """ with open(OUTPUT_DIR / "min.in", 'w') as f: f.write(content) def generate_heat_npt_file(steps, temperature, pressure, cutoff, dt=0.002): """Generate HeatNPT.in file with temperature ramping""" # Calculate step divisions: 20%, 20%, 20%, 40% step1 = int(steps * 0.2) step2 = int(steps * 0.2) step3 = int(steps * 0.2) step4 = int(steps * 0.4) # Calculate temperature values: 3%, 66%, 100% temp1 = temperature * 0.03 temp2 = temperature * 0.66 temp3 = temperature temp4 = temperature content = f"""Heat &cntrl imin = 0, irest = 0, ntx = 1, ntb = 2, pres0 = {pressure}, ntp = 1, taup = 2.0, cut = {cutoff}, ntr = 0, ntc = 2, ntf = 2, tempi = 0, temp0 = {temperature}, ntt = 3, gamma_ln = 1.0, nstlim = {steps}, dt = {dt}, ntpr = 2000, ntwx = 2000, ntwr = 2000 / &wt type='TEMP0', istep1=0, istep2={step1}, value1=0.0, value2={temp1} / &wt type='TEMP0', istep1={step1+1}, istep2={step1+step2}, value1={temp1}, value2={temp2} / &wt type='TEMP0', istep1={step1+step2+1}, istep2={step1+step2+step3}, value1={temp2}, value2={temp3} / &wt type='TEMP0', istep1={step1+step2+step3+1}, istep2={steps}, value1={temp3}, value2={temp4} / &wt type='END' / """ with open(OUTPUT_DIR / "HeatNPT.in", 'w') as f: f.write(content) def generate_npt_equilibration_file(steps, temperature, pressure, cutoff, dt=0.002): """Generate mdin_equi.in file for NPT equilibration""" content = f"""NPT Equilibration &cntrl imin=0, ntx=1, irest=0, pres0={pressure}, taup=1.0, temp0={temperature}, tempi={temperature}, nstlim={steps}, dt={dt}, ntf=2, ntc=2, ntpr=500, ntwx=500, ntwr=500, cut={cutoff}, ntb=2, ntp=1, ntt=3, gamma_ln=3.0, ig=-1, iwrap=1, ntr=0, / """ with open(OUTPUT_DIR / "mdin_equi.in", 'w') as f: f.write(content) def generate_production_file(steps, temperature, pressure, cutoff, dt=0.002, use_plumed=False): """Generate mdin_prod.in file for production run""" content = f"""Production Run &cntrl imin=0, ntx=1, irest=0, pres0={pressure}, taup=1.0, temp0={temperature}, tempi={temperature}, nstlim={steps}, dt={dt}, ntf=2, ntc=2, ntpr=1000, ntwx=1000, ntwr=1000, cut={cutoff}, ntb=2, ntp=1, ntt=3, gamma_ln=3.0, ig=-1, iwrap=1, ntr=0, """ # Add PLUMED lines if plumed.dat exists if use_plumed: content += " plumed=1,\n" content += " plumedfile='plumed.dat'\n" content += "/\n\n" with open(OUTPUT_DIR / "mdin_prod.in", 'w') as f: f.write(content) def generate_submit_pbs_file(use_plumed=False): """Generate submit_job.pbs file for SLURM job submission""" try: # Get absolute path to output directory output_dir_abs = OUTPUT_DIR.resolve() # Build PBS script content content = """#!/bin/bash #SBATCH -D {working_dir} # Critical: Sets working dir #SBATCH --job-name=job_name #SBATCH --partition=defq #SBATCH --get-user-env #SBATCH --nodes=1 #SBATCH --tasks-per-node=1 #SBATCH --cpus-per-task=1 #SBATCH --gres=gpu:1 #SBATCH --time=168:00:00 module load amber/24 """.format(working_dir=str(output_dir_abs)) # Add PLUMED module if plumed.dat exists if use_plumed: content += "module load plumed/2.9.1\n" content += """ pmemd.cuda -O -i min_restrained.in -o min_restrained.out -p protein.prmtop -c protein.inpcrd -r min_res.ncrst -x min_res.nc -ref protein.inpcrd -inf min_res.mdinfo pmemd.cuda -O -i min.in -o min.out -p protein.prmtop -c min_res.ncrst -r min.ncrst -x min.nc -inf min.mdinfo pmemd.cuda -O -i HeatNPT.in -o HeatNPT.out -p protein.prmtop -c min.ncrst -r HeatNPT.ncrst -x HeatNPT.nc -inf HeatNPT.mdinfo pmemd.cuda -O -i mdin_equi.in -o mdin_equi.out -p protein.prmtop -c HeatNPT.ncrst -r mdin_equi.ncrst -x mdin_equi.nc -inf mdin_equi.mdinfo -ref protein.inpcrd pmemd.cuda -O -i mdin_prod.in -o mdin_prod.out -p protein.prmtop -c mdin_equi.ncrst -r mdin_prod.ncrst -x mdin_prod.nc -inf mdin_prod.mdinfo -ref protein.inpcrd """ # Write submit_job.pbs file with open(OUTPUT_DIR / "submit_job.pbs", 'w') as f: f.write(content) logger.info(f"Generated submit_job.pbs in {OUTPUT_DIR}") return True except Exception as e: logger.error(f"Error generating submit_job.pbs: {e}") return False @app.route('/api/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({'status': 'healthy', 'message': 'MD Simulation Pipeline API is running'}) @app.route('/api/clean-output', methods=['POST']) def clean_output(): """Clean output folder endpoint""" try: print("DEBUG: clean-output endpoint called") if clean_and_create_output_folder(): return jsonify({'success': True, 'message': 'Output folder cleaned successfully'}) else: return jsonify({'success': False, 'error': 'Failed to clean output folder'}), 500 except Exception as e: print(f"DEBUG: Error in clean-output: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/save-pdb-file', methods=['POST']) def save_pdb_file(): """Save PDB file to output directory""" try: data = request.get_json() pdb_content = data.get('pdb_content', '') filename = data.get('filename', 'input.pdb') if not pdb_content: return jsonify({'success': False, 'error': 'No PDB content provided'}), 400 # Save to output directory as 0_original_input.pdb output_file = OUTPUT_DIR / "0_original_input.pdb" with open(output_file, 'w') as f: f.write(pdb_content) logger.info(f"Saved PDB file to {output_file}") return jsonify({ 'success': True, 'message': f'PDB file saved successfully', 'file_path': str(output_file) }) except Exception as e: logger.error(f"Error saving PDB file: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/save-plumed-file', methods=['POST']) def save_plumed_file(): """Save PLUMED file to output directory""" try: data = request.get_json() plumed_content = data.get('plumed_content', '') filename = data.get('filename', 'plumed.dat') if not plumed_content: return jsonify({'success': False, 'error': 'No PLUMED content provided'}), 400 # Ensure filename has .dat extension if not provided if not filename.endswith('.dat'): filename = filename if '.' in filename else f"{filename}.dat" # Save to output directory output_file = OUTPUT_DIR / filename with open(output_file, 'w') as f: f.write(plumed_content) logger.info(f"Saved PLUMED file to {output_file}") return jsonify({ 'success': True, 'message': f'PLUMED file saved successfully to output/{filename}', 'file_path': str(output_file), 'filename': filename }) except Exception as e: logger.error(f"Error saving PLUMED file: {str(e)}") return jsonify({'success': False, 'error': str(e)}), 500 @app.route('/api/download-output-zip', methods=['GET']) def download_output_zip(): """Create a ZIP of the output folder and return it for download""" try: if not OUTPUT_DIR.exists(): return jsonify({'error': 'Output directory not found'}), 404 import tempfile import shutil # Create a temporary zip file tmp_dir = tempfile.mkdtemp() zip_base = os.path.join(tmp_dir, 'output') zip_path = shutil.make_archive(zip_base, 'zip', root_dir=str(OUTPUT_DIR)) # Send file for download return send_file(zip_path, as_attachment=True, download_name='output.zip') except Exception as e: logger.error(f"Error creating output ZIP: {str(e)}") return jsonify({'error': f'Failed to create ZIP: {str(e)}'}), 500 @app.route('/api/get-generated-files', methods=['GET']) def get_generated_files(): """Return contents of known generated input files for preview""" try: files_to_read = [ 'min_restrained.in', 'min.in', 'HeatNPT.in', 'mdin_equi.in', 'mdin_prod.in', 'submit_job.pbs' ] # Files to exclude from preview (intermediate/utility files) excluded_files = [ 'calc_charge_on_system.in', 'generate_ff_parameters.in', 'sqm.in' ] # Exclude ESMFold minimization intermediates: tleap_A.in, min_A.in, etc. (per-chain; # keep min.in and min_restrained.in which are in files_to_read) def _is_esmfold_minimization_file(name): if name.startswith('tleap_') and name.endswith('.in'): return True # min_{chain}.in e.g. min_A.in, min_B.in (len 8: min_ + X + .in) if name.startswith('min_') and name.endswith('.in') and len(name) == 8: return True return False # Note: Force field parameter files (protein.prmtop, protein.inpcrd, protein_solvated.pdb) # are excluded from preview as they are binary/large files # Also include any user-created .in files in the output directory user_created_files = [] try: for file_path in OUTPUT_DIR.glob("*.in"): filename = file_path.name # Exclude standard files, utility files, and ESMFold minimization intermediates if (filename not in files_to_read and filename not in excluded_files and not _is_esmfold_minimization_file(filename)): user_created_files.append(filename) except Exception as e: logger.warning(f"Error scanning for user-created files: {e}") # Combine standard files and user-created files all_files = files_to_read + sorted(user_created_files) result = {} for name in all_files: path = OUTPUT_DIR / name if path.exists(): try: with open(path, 'r') as f: result[name] = f.read() except Exception as fe: result[name] = f"" else: result[name] = "" return jsonify({'success': True, 'files': result}) except Exception as e: logger.error(f"Error reading generated files: {str(e)}") return jsonify({'error': f'Failed to read files: {str(e)}'}), 500 @app.route('/api/save-file', methods=['POST']) def save_file(): """Save edited file content back to the output directory""" try: data = request.get_json() filename = data.get('filename') content = data.get('content') if not filename: return jsonify({'success': False, 'error': 'Filename is required'}), 400 if content is None: return jsonify({'success': False, 'error': 'Content is required'}), 400 # Security: Only allow saving files that are in the allowed list allowed_files = [ 'min_restrained.in', 'min.in', 'HeatNPT.in', 'mdin_equi.in', 'mdin_prod.in', 'submit_job.pbs', 'plumed.dat' ] if filename not in allowed_files: return jsonify({'success': False, 'error': f'File "{filename}" is not allowed to be edited'}), 403 # Prevent directory traversal attacks if '/' in filename or '\\' in filename or '..' in filename: return jsonify({'success': False, 'error': 'Invalid filename'}), 400 # Write file file_path = OUTPUT_DIR / filename try: with open(file_path, 'w') as f: f.write(content) logger.info(f"File {filename} saved successfully") return jsonify({'success': True, 'message': f'File {filename} saved successfully'}) except Exception as e: logger.error(f"Error writing file {filename}: {str(e)}") return jsonify({'success': False, 'error': f'Failed to write file: {str(e)}'}), 500 except Exception as e: logger.error(f"Error saving file: {str(e)}") return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 @app.route('/api/save-new-file', methods=['POST']) def save_new_file(): """Save a new simulation file created by the user""" try: data = request.get_json() filename = data.get('filename') content = data.get('content') if not filename: return jsonify({'success': False, 'error': 'Filename is required'}), 400 if content is None: return jsonify({'success': False, 'error': 'Content is required'}), 400 # Validate filename - must end with .in if not filename.endswith('.in'): return jsonify({'success': False, 'error': 'File name must end with .in extension'}), 400 # Prevent directory traversal attacks if '/' in filename or '\\' in filename or '..' in filename: return jsonify({'success': False, 'error': 'Invalid filename'}), 400 # Write file file_path = OUTPUT_DIR / filename try: with open(file_path, 'w') as f: f.write(content) logger.info(f"New file {filename} saved successfully") return jsonify({'success': True, 'message': f'File {filename} saved successfully'}) except Exception as e: logger.error(f"Error writing new file {filename}: {str(e)}") return jsonify({'success': False, 'error': f'Failed to write file: {str(e)}'}), 500 except Exception as e: logger.error(f"Error saving new file: {str(e)}") return jsonify({'success': False, 'error': f'Internal server error: {str(e)}'}), 500 def get_ligand_residue_name(): """Extract first ligand residue name from tleap_ready.pdb (for backward compatibility)""" ligand_names = get_all_ligand_residue_names() return ligand_names[0] if ligand_names else "LIG" def generate_ff_parameters_file(force_field, water_model, add_ions, distance): """Generate the final force field parameters file with dynamic values""" # Debug logging print(f"DEBUG: force_field={force_field}, water_model={water_model}, add_ions={add_ions}, distance={distance}") # Check if ligands are present - look for residue-named files first, then fallback to numbered files ligand_mol2_files = [] ligand_frcmod_files = [] ligand_resname_map = {} # Maps residue name to (mol2_file, frcmod_file) # First, try to find residue-named files (e.g., O9C.mol2, O9C.frcmod) unique_resnames = get_all_ligand_residue_names() for resname in unique_resnames: mol2_file = OUTPUT_DIR / f"{resname}.mol2" frcmod_file = OUTPUT_DIR / f"{resname}.frcmod" if mol2_file.exists() and frcmod_file.exists(): ligand_resname_map[resname] = (mol2_file, frcmod_file) ligand_mol2_files.append(mol2_file) ligand_frcmod_files.append(frcmod_file) # Fallback: check for numbered files (backward compatibility) if not ligand_mol2_files: numbered_mol2 = sorted(OUTPUT_DIR.glob("4_ligands_corrected_*.mol2")) numbered_frcmod = sorted(OUTPUT_DIR.glob("4_ligands_corrected_*.frcmod")) if numbered_mol2 and numbered_frcmod: ligand_mol2_files = numbered_mol2 ligand_frcmod_files = numbered_frcmod # Try to map to residue names resnames = get_all_ligand_residue_names() for i, (mol2_file, frcmod_file) in enumerate(zip(ligand_mol2_files, ligand_frcmod_files)): # Extract residue name from mol2 file if possible resname = get_residue_name_from_mol2(mol2_file) if mol2_file.exists() else None if not resname: # Try to get from tleap_ready.pdb if resnames and i < len(resnames): resname = resnames[i] else: resname = f"LIG{len(ligand_resname_map) + 1}" # Only add if not already in map (avoid duplicates) if resname not in ligand_resname_map: ligand_resname_map[resname] = (mol2_file, frcmod_file) # Final fallback: single ligand file (backward compatibility) if not ligand_mol2_files: single_mol2 = OUTPUT_DIR / "4_ligands_corrected.mol2" single_frcmod = OUTPUT_DIR / "4_ligands_corrected.frcmod" if single_mol2.exists() and single_frcmod.exists(): ligand_mol2_files = [single_mol2] ligand_frcmod_files = [single_frcmod] resnames = get_all_ligand_residue_names() if resnames: ligand_resname_map[resnames[0]] = (single_mol2, single_frcmod) else: ligand_resname_map["LIG"] = (single_mol2, single_frcmod) ligand_present = len(ligand_mol2_files) > 0 and len(ligand_frcmod_files) > 0 # Build the content dynamically content = f"source leaprc.protein.{force_field}\n" # Add water model source print(f"DEBUG: water_model={water_model}") if water_model.lower() == "tip3p": content += "source leaprc.water.tip3p\n" elif water_model == "spce": content += "source leaprc.water.spce\n" # Add ligand-related commands only if ligands are present if ligand_present: content += "source leaprc.gaff2\n\n" # Load each unique ligand parameter and structure only once # Use sorted to ensure consistent ordering for resname in sorted(ligand_resname_map.keys()): mol2_file, frcmod_file = ligand_resname_map[resname] content += f"loadamberparams {frcmod_file.name}\n" content += f"{resname} = loadmol2 {mol2_file.name}\n" content += "\n" else: content += "\n" content += "x = loadpdb tleap_ready.pdb\n\n" content += "charge x\n\n" # Add ions based on selection if add_ions == "Na+": content += "addions x Na+ 0.0\n\n" elif add_ions == "Cl-": content += "addions x Cl- 0.0\n\n" # If "None", skip adding ions # Add solvation with selected water model and distance if water_model.lower() == "tip3p": content += f"solvateBox x TIP3PBOX {distance}\n\n" elif water_model.lower() == "spce": content += f"solvateBox x SPCBOX {distance}\n\n" content += "saveamberparm x protein.prmtop protein.inpcrd\n\n" content += "savepdb x protein_solvated.pdb\n\n" content += "quit\n" # Debug: print the generated content print("DEBUG: Generated content:") print(content) # Write the file with open(OUTPUT_DIR / "generate_ff_parameters.in", 'w') as f: f.write(content) def get_residue_name_from_pdb(pdb_file, sanitize: bool = True): """ Extract residue name from a ligand PDB file. If sanitize=True, validates and updates numeric residue names in the file. """ try: residue_name = None with open(pdb_file, 'r') as f: for line in f: if line.startswith(('ATOM', 'HETATM')): # Extract residue name (columns 18-20) residue_name = line[17:20].strip() if residue_name and residue_name not in ['HOH', 'WAT', 'TIP', 'SPC', 'NA', 'CL']: break if not residue_name: return None # Validate and sanitize if needed if sanitize: sanitized_name, was_changed = _validate_and_sanitize_ligand_name(residue_name) if was_changed: original_name = residue_name logger.warning( f"Ligand residue name '{original_name}' in {Path(pdb_file).name} is pure numeric. " f"Changed to '{sanitized_name}' (3-letter code) to avoid errors. " f"The PDB file has been updated." ) _update_pdb_residue_name(Path(pdb_file), residue_name, sanitized_name) residue_name = sanitized_name return residue_name except Exception as e: logger.warning(f"Could not extract residue name from {pdb_file}: {e}") return None def get_residue_name_from_mol2(mol2_file): """Extract residue name from a mol2 file (from @MOLECULE section)""" try: with open(mol2_file, 'r') as f: lines = f.readlines() # Find @MOLECULE section in_molecule = False for i, line in enumerate(lines): if '@MOLECULE' in line: in_molecule = True # The next line is the molecule name/residue name if i + 1 < len(lines): resname = lines[i + 1].strip() # Remove any extra whitespace or comments resname = resname.split()[0] if resname.split() else resname return resname return None except Exception as e: logger.warning(f"Could not extract residue name from {mol2_file}: {e}") return None def get_all_ligand_residue_names(sanitize: bool = True): """ Extract all unique ligand residue names from tleap_ready.pdb. If sanitize=True, validates and updates numeric residue names in the file. """ ligand_names = [] try: tleap_ready_path = OUTPUT_DIR / "tleap_ready.pdb" if not tleap_ready_path.exists(): return [] seen_residues = set() residues_to_update = {} # Track old_name -> new_name mappings # First pass: collect all residue names and validate them with open(tleap_ready_path, 'r') as f: for line in f: if line.startswith('HETATM'): # Extract residue name (columns 18-20) residue_name = line[17:20].strip() if residue_name and residue_name not in ['HOH', 'WAT', 'TIP', 'SPC', 'NA', 'CL']: if residue_name not in seen_residues: # Validate and sanitize if needed if sanitize: sanitized_name, was_changed = _validate_and_sanitize_ligand_name(residue_name) if was_changed: residues_to_update[residue_name] = sanitized_name residue_name = sanitized_name ligand_names.append(residue_name) seen_residues.add(residue_name) # Update tleap_ready.pdb if any residue names were changed if sanitize and residues_to_update: for old_name, new_name in residues_to_update.items(): logger.warning( f"Ligand residue name '{old_name}' in tleap_ready.pdb is pure numeric. " f"Changed to '{new_name}' (3-letter code) to avoid errors. " f"The PDB file has been updated." ) _update_pdb_residue_name(tleap_ready_path, old_name, new_name) return ligand_names except Exception as e: logger.warning(f"Could not extract ligand residue names: {e}") return [] @app.route('/api/generate-ff-parameters', methods=['POST']) def generate_ff_parameters(): """Generate final force field parameters using tleap""" try: data = request.get_json() force_field = data.get('force_field', 'ff14SB') water_model = data.get('water_model', 'TIP3P') add_ions = data.get('add_ions', 'None') distance = data.get('distance', 10.0) # Generate the dynamic input file generate_ff_parameters_file(force_field, water_model, add_ions, distance) # Find tleap executable tleap_path = None try: result = subprocess.run(['which', 'tleap'], capture_output=True, text=True) if result.returncode == 0: tleap_path = result.stdout.strip() except: pass if not tleap_path: conda_prefix = os.environ.get('CONDA_PREFIX') if conda_prefix: tleap_path = os.path.join(conda_prefix, 'bin', 'tleap') else: tleap_path = '/home/hn533621/.conda/envs/MD_pipeline/bin/tleap' # Run tleap cmd = f"{tleap_path} -f generate_ff_parameters.in" result = subprocess.run(cmd, shell=True, cwd=str(OUTPUT_DIR), capture_output=True, text=True, timeout=300) if result.returncode != 0: logger.error(f"tleap failed: {result.stderr}") return jsonify({ 'success': False, 'error': f'tleap failed: {result.stderr}' }), 500 # Check if key output files were created output_files = ['protein.prmtop', 'protein.inpcrd', 'protein_solvated.pdb'] missing_files = [f for f in output_files if not (OUTPUT_DIR / f).exists()] if missing_files: return jsonify({ 'success': False, 'error': f'Missing output files: {", ".join(missing_files)}' }), 500 return jsonify({ 'success': True, 'message': 'Force field parameters generated successfully', 'files_generated': output_files }) except subprocess.TimeoutExpired: return jsonify({ 'success': False, 'error': 'tleap command timed out after 5 minutes' }), 500 except Exception as e: logger.error(f"Error generating FF parameters: {str(e)}") return jsonify({ 'success': False, 'error': f'Failed to generate force field parameters: {str(e)}' }), 500 @app.route('/api/detect-missing-residues', methods=['POST']) def detect_missing_residues_endpoint(): """Detect missing residues in the loaded PDB structure""" try: # Check if original input file exists original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" if not original_pdb_path.exists(): return jsonify({ 'success': False, 'error': 'No PDB file loaded. Please load a PDB file first.' }), 400 # Get PDB ID from the file try: pdb_id = get_pdb_id_from_pdb_file(str(original_pdb_path)) except ValueError as e: return jsonify({ 'success': False, 'error': f'Could not determine PDB ID: {str(e)}' }), 400 # Detect missing residues missing = detect_missing_residues(pdb_id) # Get chain sequences chain_sequences = get_chain_sequences(pdb_id) # Find chains with missing residues that have sequences available chains_with_missing = { chain: chain_sequences[chain] for chain in missing if chain in chain_sequences } # Format missing residues info for display missing_info = {} for chain, missing_list in missing.items(): missing_info[chain] = { 'count': len(missing_list), 'residues': missing_list } # Get first residue number for each chain from the PDB file # Also calculate the starting residue number for the sequence viewer # (accounting for missing residues before the first PDB residue) chain_first_residue = {} chain_sequence_start = {} try: original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" if original_pdb_path.exists(): with open(original_pdb_path, 'r') as f: pdb_lines = f.readlines() # First pass: find first residue number for each chain for line in pdb_lines: if line.startswith('ATOM') or line.startswith('HETATM'): chain_id = line[21:22].strip() if chain_id and chain_id not in chain_first_residue: # Extract residue number (columns 22-26, but we need to handle insertion codes) residue_str = line[22:26].strip() try: # Try to extract just the number part (handle negative numbers) import re match = re.match(r'(-?\d+)', residue_str) if match: residue_num = int(match.group(1)) chain_first_residue[chain_id] = residue_num except: pass # Second pass: calculate sequence start for each chain # We want to find the first residue number that should be displayed # This is the first PDB residue minus the count of missing residues before it # Example: If PDB starts at 189 and residues 173-188 are missing (16 residues), # then sequence_start = 189 - 16 = 173 for chain_id, first_pdb_residue in chain_first_residue.items(): # Find the minimum missing residue number before first_pdb_residue # This tells us where the sequence should start displaying min_missing_before = None if chain_id in missing_info: for resname, resnum in missing_info[chain_id]['residues']: if resnum < first_pdb_residue: if min_missing_before is None or resnum < min_missing_before: min_missing_before = resnum if min_missing_before is not None: # Sequence should start from the first missing residue before PDB start # This accounts for all missing residues before the first PDB residue sequence_start = min_missing_before else: # No missing residues before first PDB residue, start from first PDB residue sequence_start = first_pdb_residue chain_sequence_start[chain_id] = sequence_start except Exception as e: logger.warning(f"Could not determine first residue numbers: {str(e)}") return jsonify({ 'success': True, 'pdb_id': pdb_id, 'missing_residues': missing_info, 'chains_with_missing': list(chains_with_missing.keys()), 'chain_sequences': chain_sequences, 'chain_first_residue': chain_first_residue, 'chain_sequence_start': chain_sequence_start }) except Exception as e: logger.error(f"Error detecting missing residues: {str(e)}") return jsonify({ 'success': False, 'error': f'Failed to detect missing residues: {str(e)}' }), 500 @app.route('/api/trim-residues', methods=['POST']) def trim_residues_endpoint(): """Trim residues from edges of chain sequences""" try: data = request.get_json() chain_sequences = data.get('chain_sequences', {}) trim_specs = data.get('trim_specs', {}) pdb_id = data.get('pdb_id') if not chain_sequences: return jsonify({ 'success': False, 'error': 'No chain sequences provided' }), 400 if not trim_specs: return jsonify({ 'success': False, 'error': 'No trim specifications provided' }), 400 # Apply trimming try: trimmed_sequences = trim_chains_sequences(chain_sequences, trim_specs) except ValueError as e: return jsonify({ 'success': False, 'error': str(e) }), 400 # Optionally write trimmed FASTA file if pdb_id is provided if pdb_id: try: write_fasta_for_missing_chains( pdb_id, trimmed_sequences, output_dir=str(OUTPUT_DIR) ) logger.info(f"Wrote trimmed FASTA file for PDB {pdb_id}") except Exception as e: logger.warning(f"Could not write trimmed FASTA file: {str(e)}") # Calculate trim info for response trim_info = {} for chain, spec in trim_specs.items(): original_len = len(chain_sequences.get(chain, '')) trimmed_len = len(trimmed_sequences.get(chain, '')) trim_info[chain] = { 'original_length': original_len, 'trimmed_length': trimmed_len, 'n_terminal_trimmed': spec.get('n_terminal', 0), 'c_terminal_trimmed': spec.get('c_terminal', 0) } return jsonify({ 'success': True, 'trimmed_sequences': trimmed_sequences, 'trim_info': trim_info, 'message': f'Successfully trimmed residues from {len(trim_specs)} chain(s)' }) except Exception as e: logger.error(f"Error trimming residues: {str(e)}") return jsonify({ 'success': False, 'error': f'Failed to trim residues: {str(e)}' }), 500 @app.route('/api/build-completed-structure', methods=['POST']) @stream_with_context def build_completed_structure_endpoint(): """Build completed structure using ESMFold for selected chains with streaming logs""" def generate(): try: data = request.get_json() selected_chains = data.get('selected_chains', []) if not selected_chains: yield _format_log('❌ No chains selected for completion', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'No chains selected for completion'})}\n\n" return yield _format_log(f"Starting ESMFold structure completion for chains: {', '.join(selected_chains)}") # Check if original input file exists original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" if not original_pdb_path.exists(): yield _format_log('❌ No PDB file loaded. Please load a PDB file first.', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'No PDB file loaded. Please load a PDB file first.'})}\n\n" return # Use true crystal for alignment and rebuild: 0_original_input_backup if it exists (before set-use-completed overwrote 0_original_input), else 0_original_input original_for_align = OUTPUT_DIR / "0_original_input_backup.pdb" original_for_align = original_for_align if original_for_align.exists() else original_pdb_path # Get PDB ID try: pdb_id = get_pdb_id_from_pdb_file(str(original_pdb_path)) yield _format_log(f"Detected PDB ID: {pdb_id}") except ValueError as e: yield _format_log(f'❌ Could not determine PDB ID: {str(e)}', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Could not determine PDB ID: {str(e)}'})}\n\n" return # Get chain sequences (use provided sequences if available, otherwise fetch) provided_sequences = data.get('chain_sequences', None) if provided_sequences: chain_sequences = provided_sequences yield _format_log("Using provided chain sequences (may be trimmed)") else: yield _format_log("Fetching chain sequences from PDB database...") chain_sequences = get_chain_sequences(pdb_id) # Verify selected chains have sequences chains_to_process = [] for chain in selected_chains: if chain in chain_sequences: chains_to_process.append(chain) else: yield _format_log(f"⚠️ Chain {chain} not found in chain sequences", 'warning') if not chains_to_process: yield _format_log('❌ None of the selected chains have sequences available', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'None of the selected chains have sequences available'})}\n\n" return # Create dictionary of chains with their sequences for FASTA writing chains_with_missing = { chain: chain_sequences[chain] for chain in chains_to_process } # Write FASTA file for the selected chains try: write_fasta_for_missing_chains(pdb_id, chains_with_missing, output_dir=str(OUTPUT_DIR)) yield _format_log(f"Wrote FASTA file for chains: {chains_to_process}") except Exception as e: yield _format_log(f"⚠️ Could not write FASTA file: {str(e)}", 'warning') # Don't fail the entire operation if FASTA writing fails # Run ESMFold for each selected chain esmfold_results = {} for chain in chains_to_process: yield _format_log(f"Running ESMFold for chain {chain}...") seq = chain_sequences[chain] try: pdb_text = run_esmfold(seq) esmfold_results[chain] = pdb_text # Save each chain's ESMFold result esm_pdb_filename = OUTPUT_DIR / f"{pdb_id}_chain_{chain}_esmfold.pdb" with open(esm_pdb_filename, 'w') as f: f.write(pdb_text) yield _format_log(f"✅ ESMFold completed for chain {chain}: {esm_pdb_filename.name}") except Exception as e: yield _format_log(f'❌ ESMFold failed for chain {chain}: {str(e)}', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'ESMFold failed for chain {chain}: {str(e)}'})}\n\n" return # Minimization (before rebuild): minimized PDBs will be superimposed in rebuild minimize_chains = data.get('minimize_chains', False) chains_to_minimize = data.get('chains_to_minimize', []) minimized_chains = [] if minimize_chains and chains_to_minimize: yield _format_log(f"\n{'='*60}") yield _format_log(f"Starting energy minimization for chains: {', '.join(chains_to_minimize)}") yield _format_log(f"{'='*60}") try: for chain in chains_to_minimize: yield _format_log(f"\nMinimizing chain {chain}...") for log_line in _minimize_esmfold_chains_streaming(pdb_id, [chain], original_for_align=original_for_align): yield log_line minimized_chains.append(chain) yield _format_log(f"✅ Chain {chain} minimization completed") min_status_file = OUTPUT_DIR / ".chains_minimized" with open(min_status_file, 'w') as f: f.write(','.join(minimized_chains)) yield _format_log(f"\n✅ All chains minimized successfully: {', '.join(minimized_chains)}") except Exception as e: yield _format_log(f'❌ Error during minimization: {str(e)}', 'error') # Rebuild PDB using PyMOL (aligns ESMFold or minimized chains to original, then merges) output_pdb = OUTPUT_DIR / "0_complete_structure.pdb" yield _format_log("Rebuilding structure with PyMOL (superimposing to original)...") try: import tempfile import os chains_use_min_arg = repr(minimized_chains) if minimized_chains else "None" script_content = f"""#!/usr/bin/env python3 import sys import os # Add amberprep package to path (Fill_missing_residues is in amberprep/) sys.path.insert(0, r'{str(Path(__file__).parent)}') # Change to output directory os.chdir(r'{str(OUTPUT_DIR)}') # Import and run rebuild from Fill_missing_residues import rebuild_pdb_with_esmfold try: rebuild_pdb_with_esmfold( r'{pdb_id}', {repr(chains_to_process)}, output_pdb=r'{output_pdb.name}', original_pdb_path=r'{Path(original_for_align).name}', chains_use_minimized={chains_use_min_arg} ) print("SUCCESS: Rebuild completed") except Exception as e: print(f"ERROR: {{e}}") import traceback traceback.print_exc() sys.exit(1) """ # Write script to temporary file with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as script_file: script_file.write(script_content) script_path = script_file.name try: # Make script executable os.chmod(script_path, 0o755) # Run script in subprocess result = subprocess.run( [sys.executable, script_path], capture_output=True, text=True, timeout=300, cwd=str(OUTPUT_DIR) ) if result.returncode != 0: error_msg = result.stderr or result.stdout yield _format_log(f"❌ PyMOL rebuild failed: {error_msg}", 'error') # Check if it's a PyMOL initialization issue if "pymol" in error_msg.lower() or "import" in error_msg.lower(): error_msg = f"PyMOL initialization failed. Make sure PyMOL is installed and accessible. Error: {error_msg}" else: error_msg = f"Rebuild failed: {error_msg}" yield _format_log(f"❌ {error_msg}", 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" return if "ERROR:" in result.stdout: error_line = [line for line in result.stdout.split('\\n') if 'ERROR:' in line] if error_line: error_msg = error_line[0].replace('ERROR:', '').strip() yield _format_log(f"❌ {error_msg}", 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" return if not output_pdb.exists(): error_msg = "Output file was not created" yield _format_log(f"❌ {error_msg}", 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': error_msg})}\n\n" return yield _format_log(f"✅ Completed structure saved to {output_pdb.name}") except subprocess.TimeoutExpired: yield _format_log("❌ PyMOL rebuild timed out after 5 minutes", 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': 'PyMOL rebuild timed out. The structure might be too large. Please try again.'})}\n\n" return except Exception as e: yield _format_log(f"❌ Error rebuilding PDB: {str(e)}", 'error') import traceback logger.error(traceback.format_exc()) yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Failed to rebuild structure: {str(e)}'})}\n\n" return finally: # Clean up temporary script try: os.unlink(script_path) except: pass except Exception as e: yield _format_log(f"❌ Error in PyMOL rebuild: {str(e)}", 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'PyMOL rebuild failed: {str(e)}'})}\n\n" return # Read the completed structure (includes superimposed minimized chains when minimization was used) with open(output_pdb, 'r') as f: completed_content = f.read() chains_str = ', '.join(chains_to_process) yield _format_log(f"\n✅ Structure completion finished for chains: {chains_str}") result_message = f'Successfully completed structure for chains: {chains_str}' result_data = { 'type': 'complete', 'success': True, 'message': result_message, 'completed_chains': chains_to_process, 'completed_structure': completed_content, 'minimized_chains': minimized_chains } yield f"data: {json.dumps(result_data)}\n\n" except Exception as e: logger.error(f"Error building completed structure: {str(e)}") yield _format_log(f'❌ Error: {str(e)}', 'error') yield f"data: {json.dumps({'type': 'complete', 'success': False, 'error': f'Failed to build completed structure: {str(e)}'})}\n\n" return Response(generate(), mimetype='text/event-stream') @app.route('/api/set-use-completed-structure', methods=['POST']) def set_use_completed_structure(): """Set user preference to use completed structure (ESMFold) instead of original""" try: data = request.get_json() use_completed = data.get('use_completed', False) # Create a flag file to indicate user wants to use completed structure flag_file = OUTPUT_DIR / ".use_completed_structure" if use_completed: # User wants to use completed structure - create flag file flag_file.touch() logger.info("User chose to use ESMFold-completed structure") # Also replace the original input with completed structure for consistency completed_pdb_path = OUTPUT_DIR / "0_complete_structure.pdb" original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" if completed_pdb_path.exists(): import shutil # Backup original if it doesn't exist as backup backup_path = OUTPUT_DIR / "0_original_input_backup.pdb" if original_pdb_path.exists() and not backup_path.exists(): shutil.copy2(original_pdb_path, backup_path) # Replace original with completed structure shutil.copy2(completed_pdb_path, original_pdb_path) logger.info(f"Replaced {original_pdb_path} with completed structure") else: # User doesn't want to use completed structure - remove flag if flag_file.exists(): flag_file.unlink() # Restore original structure from backup if it exists backup_path = OUTPUT_DIR / "0_original_input_backup.pdb" original_pdb_path = OUTPUT_DIR / "0_original_input.pdb" if backup_path.exists() and original_pdb_path.exists(): import shutil # Check if current original is the completed structure (by comparing with completed) completed_pdb_path = OUTPUT_DIR / "0_complete_structure.pdb" if completed_pdb_path.exists(): # Restore original from backup shutil.copy2(backup_path, original_pdb_path) logger.info(f"Restored original structure from backup") logger.info("User chose to use original structure") return jsonify({ 'success': True, 'use_completed': use_completed }) except Exception as e: logger.error(f"Error setting use completed structure preference: {str(e)}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/get-use-completed-structure', methods=['GET']) def get_use_completed_structure(): """Get user preference for using completed structure""" try: flag_file = OUTPUT_DIR / ".use_completed_structure" use_completed = flag_file.exists() return jsonify({ 'success': True, 'use_completed': use_completed }) except Exception as e: logger.error(f"Error getting use completed structure preference: {str(e)}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/get-completed-structure', methods=['GET']) def get_completed_structure(): """Get the completed structure PDB file if it exists""" try: completed_pdb_path = OUTPUT_DIR / "0_complete_structure.pdb" if not completed_pdb_path.exists(): return jsonify({ 'success': False, 'exists': False, 'error': 'Completed structure not found' }), 404 with open(completed_pdb_path, 'r') as f: content = f.read() return jsonify({ 'success': True, 'exists': True, 'content': content }) except Exception as e: logger.error(f"Error reading completed structure: {str(e)}") return jsonify({ 'success': False, 'error': str(e) }), 500 @app.route('/api/get-file', methods=['GET']) def get_file(): """Get a file from the output directory""" try: filename = request.args.get('filename') if not filename: return jsonify({ 'success': False, 'error': 'Filename parameter required' }), 400 # Security: only allow files from output directory file_path = OUTPUT_DIR / filename # Prevent directory traversal if not str(file_path).startswith(str(OUTPUT_DIR)): return jsonify({ 'success': False, 'error': 'Invalid file path' }), 400 if not file_path.exists(): return jsonify({ 'success': False, 'error': f'File {filename} not found' }), 404 # Read file content with open(file_path, 'r') as f: content = f.read() return content, 200, {'Content-Type': 'text/plain'} except Exception as e: logger.error(f"Error reading file {filename}: {str(e)}") return jsonify({ 'success': False, 'error': str(e) }), 500 if __name__ == '__main__': print("🧬 MD Simulation Pipeline") print("=========================") print("🌐 Starting Flask server...") print("📡 Backend API: http://localhost:5000") print("🔗 Web Interface: http://localhost:5000") print("") print("Press Ctrl+C to stop the server") print("") # Clean and create fresh output folder on startup print("🧹 Cleaning output folder...") clean_and_create_output_folder() print("✅ Output folder ready!") print("") app.run(debug=False, host='0.0.0.0', port=5000)