import os import time import io import logging import pdbfixer import openmm from openmm import app as openmm_app from openmm import unit ENERGY = unit.kilocalories_per_mole LENGTH = unit.angstroms from diffab.tools.relax.base import RelaxTask def current_milli_time(): return round(time.time() * 1000) def _is_in_the_range(ch_rs_ic, flexible_residue_first, flexible_residue_last): if ch_rs_ic[0] != flexible_residue_first[0]: return False r_first, r_last = tuple(flexible_residue_first[1:]), tuple(flexible_residue_last[1:]) rs_ic = ch_rs_ic[1:] return r_first <= rs_ic <= r_last class ForceFieldMinimizer(object): def __init__(self, stiffness=10.0, max_iterations=0, tolerance=2.39*unit.kilocalories_per_mole, platform='CUDA'): super().__init__() self.stiffness = stiffness self.max_iterations = max_iterations self.tolerance = tolerance assert platform in ('CUDA', 'CPU') self.platform = platform def _fix(self, pdb_str): fixer = pdbfixer.PDBFixer(pdbfile=io.StringIO(pdb_str)) fixer.findNonstandardResidues() fixer.replaceNonstandardResidues() fixer.findMissingResidues() fixer.findMissingAtoms() fixer.addMissingAtoms(seed=0) fixer.addMissingHydrogens() out_handle = io.StringIO() openmm_app.PDBFile.writeFile(fixer.topology, fixer.positions, out_handle, keepIds=True) return out_handle.getvalue() def _get_pdb_string(self, topology, positions): with io.StringIO() as f: openmm_app.PDBFile.writeFile(topology, positions, f, keepIds=True) return f.getvalue() def _minimize(self, pdb_str, flexible_residue_first=None, flexible_residue_last=None): pdb = openmm_app.PDBFile(io.StringIO(pdb_str)) force_field = openmm_app.ForceField("amber99sb.xml") constraints = openmm_app.HBonds system = force_field.createSystem(pdb.topology, constraints=constraints) # Add constraints to non-generated regions force = openmm.CustomExternalForce("0.5 * k * ((x-x0)^2 + (y-y0)^2 + (z-z0)^2)") force.addGlobalParameter("k", self.stiffness) for p in ["x0", "y0", "z0"]: force.addPerParticleParameter(p) if flexible_residue_first is not None and flexible_residue_last is not None: for i, a in enumerate(pdb.topology.atoms()): ch_rs_ic = (a.residue.chain.id, int(a.residue.id), a.residue.insertionCode) if not _is_in_the_range(ch_rs_ic, flexible_residue_first, flexible_residue_last) and a.element.name != "hydrogen": force.addParticle(i, pdb.positions[i]) system.addForce(force) # Set up the integrator and simulation integrator = openmm.LangevinIntegrator(0, 0.01, 0.0) platform = openmm.Platform.getPlatformByName("CUDA") simulation = openmm_app.Simulation(pdb.topology, system, integrator, platform) simulation.context.setPositions(pdb.positions) # Perform minimization ret = {} state = simulation.context.getState(getEnergy=True, getPositions=True) ret["einit"] = state.getPotentialEnergy().value_in_unit(ENERGY) ret["posinit"] = state.getPositions(asNumpy=True).value_in_unit(LENGTH) simulation.minimizeEnergy(maxIterations=self.max_iterations, tolerance=self.tolerance) state = simulation.context.getState(getEnergy=True, getPositions=True) ret["efinal"] = state.getPotentialEnergy().value_in_unit(ENERGY) ret["pos"] = state.getPositions(asNumpy=True).value_in_unit(LENGTH) ret["min_pdb"] = self._get_pdb_string(simulation.topology, state.getPositions()) return ret['min_pdb'], ret def _add_energy_remarks(self, pdb_str, ret): pdb_lines = pdb_str.splitlines() pdb_lines.insert(1, "REMARK 1 FINAL ENERGY: {:.3f} KCAL/MOL".format(ret['efinal'])) pdb_lines.insert(1, "REMARK 1 INITIAL ENERGY: {:.3f} KCAL/MOL".format(ret['einit'])) return "\n".join(pdb_lines) def __call__(self, pdb_str, flexible_residue_first=None, flexible_residue_last=None, return_info=True): if '\n' not in pdb_str and pdb_str.lower().endswith(".pdb"): with open(pdb_str) as f: pdb_str = f.read() pdb_fixed = self._fix(pdb_str) pdb_min, ret = self._minimize(pdb_fixed, flexible_residue_first, flexible_residue_last) pdb_min = self._add_energy_remarks(pdb_min, ret) if return_info: return pdb_min, ret else: return pdb_min def run_openmm(task: RelaxTask): if not task.can_proceed() : return task if task.update_if_finished('openmm'): return task try: minimizer = ForceFieldMinimizer() with open(task.current_path, 'r') as f: pdb_str = f.read() pdb_min = minimizer( pdb_str = pdb_str, flexible_residue_first = task.flexible_residue_first, flexible_residue_last = task.flexible_residue_last, return_info = False, ) out_path = task.set_current_path_tag('openmm') with open(out_path, 'w') as f: f.write(pdb_min) task.mark_success() except ValueError as e: logging.warning( f'{e.__class__.__name__}: {str(e)} ({task.current_path})' ) task.mark_failure() return task