Spaces:
Running
Running
| """Local web UI for the Digital Evolution Engine. | |
| Single-user Flask server intended to be launched by the desktop ``.app`` and | |
| opened in the user's browser. Stateful by design (long-running jobs are kept | |
| in memory, results held until a new job replaces them) — this is *not* a | |
| multi-tenant service. | |
| API surface: | |
| POST /api/preview parse a sequence (file upload OR JSON body) and | |
| return its metadata + any CDS features. | |
| POST /api/run enqueue a pipeline job. Returns immediately with | |
| a job_id; the work runs in a background thread. | |
| GET /api/status/:id poll job progress. | |
| GET /api/result/:id fetch the variant table once a job has finished. | |
| GET /api/download/:id?format=csv|fasta download outputs. | |
| POST /api/shutdown cleanly stop the server (used by the desktop app | |
| when the user explicitly quits). | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import io | |
| import json | |
| import logging | |
| import os | |
| import re | |
| import tempfile | |
| import threading | |
| import time | |
| import traceback | |
| import uuid | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| from flask import Flask, Response, jsonify, request, send_file, send_from_directory | |
| from dee.core.codon import ( | |
| DEFAULT_FORBIDDEN_SITES, | |
| variants_to_dataframe, | |
| write_library_csv, | |
| ) | |
| from dee.core.sequence import ( | |
| SequenceValidationError, | |
| find_orfs_in_dna, | |
| list_cds_features, | |
| parse_input, | |
| ) | |
| from dee.models.scorer import ESM2Scorer, ScorerConfig, top_percentile_pool | |
| from dee.optimizer.search import SearchConfig, apply_variant, evolve | |
| logger = logging.getLogger("dee.server") | |
| STATIC_DIR = Path(__file__).resolve().parent / "static" | |
| STATE_DIR = Path.home() / ".dee" / "state" | |
| OUTPUT_DIR = Path.home() / ".dee" / "output" | |
| DESKTOP_DIR = Path.home() / "Desktop" | |
| STATE_DIR.mkdir(parents=True, exist_ok=True) | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| def _patch_ssl_for_macos() -> None: | |
| """Some macOS Python builds ship without a system CA bundle, so | |
| ``urllib.request.urlopen`` (which Biopython's NCBIWWW.qblast uses | |
| internally) blows up with ``CERTIFICATE_VERIFY_FAILED: self-signed | |
| certificate in certificate chain`` when it tries to reach | |
| https://blast.ncbi.nlm.nih.gov. | |
| Fix it by installing a default urllib opener whose HTTPS handler uses | |
| certifi's bundled CA roots (certifi is already a transitive dep of | |
| huggingface-hub). Idempotent and safe to call multiple times. | |
| """ | |
| import ssl | |
| import urllib.request | |
| try: | |
| import certifi | |
| except ImportError: # pragma: no cover — certifi is a dep of HF hub | |
| logger.warning("certifi not installed; BLAST may fail on macOS Pythons.") | |
| return | |
| try: | |
| ctx = ssl.create_default_context(cafile=certifi.where()) | |
| https_handler = urllib.request.HTTPSHandler(context=ctx) | |
| opener = urllib.request.build_opener(https_handler) | |
| urllib.request.install_opener(opener) | |
| # Also override the module-level default context so callers that go | |
| # around urllib (e.g. http.client direct) still get the certifi roots. | |
| ssl._create_default_https_context = lambda: ssl.create_default_context( | |
| cafile=certifi.where() | |
| ) | |
| logger.info("Patched urllib SSL context with certifi CA bundle.") | |
| except Exception as exc: # noqa: BLE001 — best-effort patch | |
| logger.warning("Could not patch SSL CA bundle: %s", exc) | |
| _patch_ssl_for_macos() | |
| # ----------------------------------------------------------------- job state | |
| class JobState: | |
| job_id: str | |
| status: str = "pending" | |
| progress: float = 0.0 | |
| message: str = "Queued." | |
| started_at: float = field(default_factory=time.time) | |
| finished_at: Optional[float] = None | |
| wt_identifier: str = "" | |
| wt_protein: str = "" | |
| csv_path: Optional[str] = None | |
| desktop_path: Optional[str] = None | |
| variants: Optional[List[Dict[str, Any]]] = None | |
| error: Optional[str] = None | |
| # Frozen copy of the user's settings + the parameters the engine actually | |
| # used. Surfaces in the UI's metadata pills and the auto-generated | |
| # methods paragraph; also a paper-trail for reproducibility. | |
| settings_used: Dict[str, Any] = field(default_factory=dict) | |
| def elapsed(self) -> float: | |
| end = self.finished_at if self.finished_at else time.time() | |
| return round(end - self.started_at, 1) | |
| def public(self) -> Dict[str, Any]: | |
| return { | |
| "job_id": self.job_id, | |
| "status": self.status, | |
| "progress": round(self.progress, 3), | |
| "message": self.message, | |
| "started_at": self.started_at, | |
| "elapsed_seconds": self.elapsed(), | |
| "wt_identifier": self.wt_identifier, | |
| "wt_protein_length": len(self.wt_protein), | |
| "csv_path": self.csv_path, | |
| "desktop_path": self.desktop_path, | |
| "error": self.error, | |
| } | |
| _JOBS: Dict[str, JobState] = {} | |
| _JOBS_LOCK = threading.Lock() | |
| def _get_job(job_id: str) -> Optional[JobState]: | |
| with _JOBS_LOCK: | |
| return _JOBS.get(job_id) | |
| def _put_job(job: JobState) -> None: | |
| with _JOBS_LOCK: | |
| _JOBS[job.job_id] = job | |
| # ----------------------------------------------------------------- preview cache | |
| # Uploaded files / pasted strings are cached briefly so /api/preview and the | |
| # subsequent /api/run don't have to re-receive the same payload. Keyed by a | |
| # session_id we hand back to the browser. | |
| _SESSIONS: Dict[str, Dict[str, Any]] = {} | |
| _SESSIONS_LOCK = threading.Lock() | |
| # ----------------------------------------------------------------- BLAST jobs | |
| class BlastJob: | |
| """One asynchronous NCBI-BLAST submission. Keyed by sequence hash so | |
| repeat lookups on the same WT protein return instantly from cache.""" | |
| job_id: str | |
| seq_hash: str | |
| status: str = "pending" | |
| started_at: float = field(default_factory=time.time) | |
| finished_at: Optional[float] = None | |
| hits: List[Dict[str, Any]] = field(default_factory=list) | |
| error: Optional[str] = None | |
| def elapsed(self) -> float: | |
| end = self.finished_at if self.finished_at else time.time() | |
| return round(end - self.started_at, 1) | |
| def public(self) -> Dict[str, Any]: | |
| return { | |
| "job_id": self.job_id, | |
| "status": self.status, | |
| "elapsed_seconds": self.elapsed(), | |
| "hits": self.hits, | |
| "error": self.error, | |
| } | |
| _BLAST_CACHE: Dict[str, BlastJob] = {} # seq_hash -> BlastJob | |
| _BLAST_BY_ID: Dict[str, BlastJob] = {} # job_id -> BlastJob | |
| _BLAST_LOCK = threading.Lock() | |
| def _hash_sequence(protein: str) -> str: | |
| return hashlib.sha256(protein.encode("utf-8")).hexdigest()[:16] | |
| def _extract_organism(hit_def: str) -> str: | |
| """NCBI hit descriptions look like 'GFP [Aequorea victoria]'. Pull out | |
| the bracketed organism if present; fall back to empty string.""" | |
| m = re.search(r"\[([^\]]+)\]", hit_def or "") | |
| return m.group(1) if m else "" | |
| def _trim_description(hit_def: str) -> str: | |
| """Strip the trailing '[organism]' from the description for cleaner | |
| display — the organism is shown separately.""" | |
| return re.sub(r"\s*\[[^\]]+\]\s*$", "", hit_def or "").strip() | |
| def _extract_uniprot(alignment) -> Optional[str]: | |
| """Pull a UniProt accession out of an NCBI BLAST alignment if there is one. | |
| NCBI hit_id strings look like: | |
| ``sp|P12345|HUMAN_GENE`` (SwissProt — has UniProt accession) | |
| ``ref|NP_001234.1|`` (RefSeq — no direct UniProt mapping) | |
| ``pdb|1ABC|A`` (PDB) | |
| ``gb|AAA12345.1|`` (GenBank) | |
| AlphaFold-DB only indexes UniProt entries, so we only return something | |
| when we see ``sp|`` (SwissProt) or ``tr|`` (TrEMBL) prefixes. | |
| """ | |
| hit_id = getattr(alignment, "hit_id", "") or "" | |
| for token in hit_id.split(";"): | |
| token = token.strip() | |
| m = re.match(r"^(?:sp|tr)\|([A-Z0-9]+(?:-\d+)?)\|", token) | |
| if m: | |
| return m.group(1) | |
| return None | |
| def _run_blast(job: BlastJob, protein: str) -> None: | |
| """Submit to NCBI BLAST and parse the top hits. | |
| Uses Biopython's NCBIWWW wrapper (the public web service). Polite delays | |
| and rate limits are imposed by NCBI server-side; we keep hitlist small | |
| and expect strict to minimize bandwidth. | |
| """ | |
| try: | |
| # Biopython is heavy; import lazily so the rest of the server boots | |
| # without a hard Bio dependency at startup. | |
| from Bio.Blast import NCBIWWW, NCBIXML | |
| job.status = "submitting" | |
| result_handle = NCBIWWW.qblast( | |
| program="blastp", | |
| database="nr", | |
| sequence=protein, | |
| hitlist_size=5, | |
| expect=1e-5, | |
| ) | |
| job.status = "parsing" | |
| for record in NCBIXML.parse(result_handle): | |
| for alignment in record.alignments[:5]: | |
| if not alignment.hsps: | |
| continue | |
| # Use the best (lowest-evalue) HSP from each alignment. | |
| hsp = alignment.hsps[0] | |
| identity_pct = round( | |
| 100.0 * hsp.identities / max(1, hsp.align_length), 1 | |
| ) | |
| coverage_pct = round( | |
| 100.0 * hsp.align_length / max(1, len(protein)), 1 | |
| ) | |
| uniprot = _extract_uniprot(alignment) | |
| job.hits.append( | |
| { | |
| "accession": getattr(alignment, "accession", "") or "", | |
| "hit_id": getattr(alignment, "hit_id", "") or "", | |
| "description": _trim_description(alignment.hit_def), | |
| "organism": _extract_organism(alignment.hit_def), | |
| "length": alignment.length, | |
| "identity_pct": identity_pct, | |
| "coverage_pct": coverage_pct, | |
| "evalue": float(hsp.expect), | |
| "bit_score": float(hsp.bits), | |
| # UniProt accession when available — gates the | |
| # AlphaFold-DB structure embed on the frontend. | |
| "uniprot": uniprot, | |
| "alphafold_url": ( | |
| f"https://alphafold.ebi.ac.uk/files/AF-{uniprot}-F1-model_v4.pdb" | |
| if uniprot else None | |
| ), | |
| "alphafold_page": ( | |
| f"https://alphafold.ebi.ac.uk/entry/{uniprot}" | |
| if uniprot else None | |
| ), | |
| } | |
| ) | |
| break # only one query record | |
| job.status = "done" | |
| except Exception as exc: # noqa: BLE001 — surface anything as an error | |
| logger.exception("BLAST failed.") | |
| job.status = "error" | |
| job.error = f"{type(exc).__name__}: {exc}" | |
| finally: | |
| job.finished_at = time.time() | |
| def _new_session(payload: Dict[str, Any]) -> str: | |
| sid = uuid.uuid4().hex[:12] | |
| payload["created_at"] = time.time() | |
| with _SESSIONS_LOCK: | |
| _SESSIONS[sid] = payload | |
| # Garbage-collect anything older than 30 min. | |
| cutoff = time.time() - 30 * 60 | |
| stale = [k for k, v in _SESSIONS.items() if v.get("created_at", 0) < cutoff] | |
| for k in stale: | |
| _SESSIONS.pop(k, None) | |
| return sid | |
| def _get_session(sid: str) -> Optional[Dict[str, Any]]: | |
| with _SESSIONS_LOCK: | |
| return _SESSIONS.get(sid) | |
| # ----------------------------------------------------------------- Flask app | |
| def create_app() -> Flask: | |
| app = Flask(__name__, static_folder=None) | |
| def index() -> Response: | |
| return send_from_directory(STATIC_DIR, "index.html") | |
| def static_file(filename: str) -> Response: | |
| return send_from_directory(STATIC_DIR, filename) | |
| def preview() -> Response: | |
| """Accept a sequence and report what we found (no scoring yet). | |
| Accepts EITHER: | |
| * multipart/form-data with ``file`` field, or | |
| * application/json with ``{"text": "..."}``. | |
| If the input parses cleanly as a single CDS or protein, returns its | |
| metadata. If it's a structured plasmid file or a raw DNA blob with | |
| multiple ORFs, returns a list of choices in ``cds_options`` so the | |
| UI can show a picker before kicking off a run. | |
| """ | |
| try: | |
| tmp_path, original_name = _materialize_input() | |
| payload, session_extras = _summarize(tmp_path, original_name) | |
| sid = _new_session( | |
| {"path": str(tmp_path), "name": original_name, **session_extras} | |
| ) | |
| payload["session_id"] = sid | |
| return jsonify(payload) | |
| except SequenceValidationError as exc: | |
| return jsonify( | |
| { | |
| "error": str(exc), | |
| "kind": "validation", | |
| "nt_position": exc.nt_position, | |
| "code": exc.code, | |
| } | |
| ), 400 | |
| except Exception as exc: # noqa: BLE001 — surface anything else as 500. | |
| logger.exception("Preview failed.") | |
| return jsonify({"error": str(exc), "kind": "internal"}), 500 | |
| def start_run() -> Response: | |
| body = request.get_json(force=True, silent=True) or {} | |
| sid = body.get("session_id") | |
| if not sid: | |
| return jsonify({"error": "missing session_id"}), 400 | |
| session = _get_session(sid) | |
| if not session: | |
| return jsonify({"error": "session expired; re-upload the sequence"}), 410 | |
| settings = body.get("settings") or {} | |
| cds_feature = body.get("cds_feature") | |
| job = JobState(job_id=uuid.uuid4().hex[:12]) | |
| _put_job(job) | |
| # If the session has an ORF map and the user picked an ORF, materialize | |
| # that ORF's DNA into a fresh temp file and use it as the pipeline input. | |
| run_path = Path(session["path"]) | |
| run_name = session["name"] | |
| orf_map = session.get("orfs") or {} | |
| if orf_map and cds_feature and cds_feature in orf_map: | |
| tmp = tempfile.NamedTemporaryFile( | |
| delete=False, suffix=".fasta", mode="w", encoding="utf-8", | |
| dir=str(STATE_DIR), | |
| ) | |
| tmp.write(f">{cds_feature}\n{orf_map[cds_feature]}\n") | |
| tmp.close() | |
| run_path = Path(tmp.name) | |
| run_name = f"{Path(run_name).stem}_{cds_feature}" | |
| # Already extracted a clean ORF — don't re-pass the label downstream. | |
| cds_feature = None | |
| thread = threading.Thread( | |
| target=_run_pipeline, | |
| args=(job, run_path, run_name, cds_feature, settings), | |
| daemon=True, | |
| ) | |
| thread.start() | |
| return jsonify({"job_id": job.job_id}) | |
| def status(job_id: str) -> Response: | |
| job = _get_job(job_id) | |
| if not job: | |
| return jsonify({"error": "unknown job"}), 404 | |
| return jsonify(job.public()) | |
| def result(job_id: str) -> Response: | |
| job = _get_job(job_id) | |
| if not job: | |
| return jsonify({"error": "unknown job"}), 404 | |
| if job.status != "done": | |
| return jsonify({"error": f"job not done (status={job.status})"}), 409 | |
| return jsonify( | |
| { | |
| "wt_identifier": job.wt_identifier, | |
| "wt_protein": job.wt_protein, | |
| "variants": job.variants or [], | |
| "csv_path": job.csv_path, | |
| "desktop_path": job.desktop_path, | |
| "settings_used": job.settings_used, | |
| "started_at": job.started_at, | |
| "elapsed_seconds": job.elapsed(), | |
| } | |
| ) | |
| def download(job_id: str) -> Response: | |
| job = _get_job(job_id) | |
| if not job or not job.csv_path: | |
| return jsonify({"error": "no result available"}), 404 | |
| fmt = request.args.get("format", "csv").lower() | |
| stem = Path(job.csv_path).stem | |
| try: | |
| data, mimetype, ext = _render_download(job, fmt) | |
| except ValueError as exc: | |
| return jsonify({"error": str(exc)}), 400 | |
| return send_file( | |
| io.BytesIO(data) if isinstance(data, bytes) else data, | |
| mimetype=mimetype, | |
| as_attachment=True, | |
| download_name=f"{stem}.{ext}", | |
| ) | |
| def identify_start() -> Response: | |
| """Kick off an NCBI-BLAST identification for the session's WT protein. | |
| Returns ``{job_id, cached, hits?}``. If we already ran BLAST on this | |
| exact sequence we return the prior result inline with cached=true. | |
| """ | |
| body = request.get_json(force=True, silent=True) or {} | |
| sid = body.get("session_id") | |
| if not sid: | |
| return jsonify({"error": "missing session_id"}), 400 | |
| session = _get_session(sid) | |
| if not session: | |
| return jsonify({"error": "session expired"}), 410 | |
| cds_feature = body.get("cds_feature") | |
| # Re-derive the protein to BLAST. For sessions with discovered ORFs | |
| # the user may have picked a specific one — use that if provided. | |
| orf_map = session.get("orfs") or {} | |
| if orf_map and cds_feature and cds_feature in orf_map: | |
| tmp_path = Path(STATE_DIR / f"blast_input_{uuid.uuid4().hex[:8]}.fa") | |
| tmp_path.write_text(f">{cds_feature}\n{orf_map[cds_feature]}\n") | |
| record = parse_input(tmp_path, require_start=False, require_stop=False) | |
| tmp_path.unlink(missing_ok=True) | |
| else: | |
| record = parse_input( | |
| Path(session["path"]), | |
| require_start=False, | |
| require_stop=False, | |
| cds_feature=cds_feature, | |
| ) | |
| protein = record.protein | |
| if not protein: | |
| return jsonify({"error": "no protein available to identify"}), 400 | |
| seq_hash = _hash_sequence(protein) | |
| with _BLAST_LOCK: | |
| cached = _BLAST_CACHE.get(seq_hash) | |
| if cached and cached.status in {"done", "running", "submitting", "parsing", "pending"}: | |
| return jsonify( | |
| { | |
| "job_id": cached.job_id, | |
| "cached": cached.status == "done", | |
| **cached.public(), | |
| } | |
| ) | |
| job = BlastJob(job_id=uuid.uuid4().hex[:12], seq_hash=seq_hash) | |
| _BLAST_CACHE[seq_hash] = job | |
| _BLAST_BY_ID[job.job_id] = job | |
| threading.Thread( | |
| target=_run_blast, args=(job, protein), daemon=True | |
| ).start() | |
| return jsonify({"job_id": job.job_id, "cached": False, **job.public()}) | |
| def identify_status(job_id: str) -> Response: | |
| with _BLAST_LOCK: | |
| job = _BLAST_BY_ID.get(job_id) | |
| if not job: | |
| return jsonify({"error": "unknown job"}), 404 | |
| return jsonify(job.public()) | |
| def library() -> Response: | |
| """List every variant library CSV under ~/.dee/output/ for the | |
| Library + History pages. Each entry includes filename, on-disk | |
| size, timestamp parsed from the filename, the WT identifier embedded | |
| in the filename, variant count, and the top-fitness number — enough | |
| to populate a list view without re-parsing whole CSVs. | |
| """ | |
| import csv as csv_mod | |
| items: List[Dict[str, Any]] = [] | |
| for path in sorted(OUTPUT_DIR.glob("*_dee_library.csv"), reverse=True): | |
| try: | |
| stat = path.stat() | |
| except OSError: | |
| continue | |
| # Filename convention from _run_pipeline: | |
| # {stem}__{wt_id}__{YYYYMMDD_HHMMSS}_dee_library.csv | |
| m = re.match( | |
| r"^(?P<stem>.+?)__(?P<wt>[^_]+(?:_[^_]+)*?)__" | |
| r"(?P<ts>\d{8}_\d{6})_dee_library\.csv$", | |
| path.name, | |
| ) | |
| wt_id = m.group("wt") if m else path.stem | |
| ts_raw = m.group("ts") if m else None | |
| iso_ts = None | |
| if ts_raw: | |
| try: | |
| dt = time.strptime(ts_raw, "%Y%m%d_%H%M%S") | |
| iso_ts = time.strftime("%Y-%m-%dT%H:%M:%S", dt) | |
| except ValueError: | |
| pass | |
| # Quick scan: variant count + top fitness from first data row. | |
| variant_count = 0 | |
| top_fitness: Optional[float] = None | |
| mutations_preview = "" | |
| try: | |
| with path.open("r", encoding="utf-8") as f: | |
| reader = csv_mod.DictReader(f) | |
| for row in reader: | |
| variant_count += 1 | |
| if variant_count == 1: | |
| try: | |
| top_fitness = float(row.get("Predicted_Fitness_Score") or 0.0) | |
| except ValueError: | |
| top_fitness = None | |
| mutations_preview = row.get("Mutations_AA", "") | |
| except Exception as exc: # noqa: BLE001 | |
| logger.warning("Could not read %s: %s", path, exc) | |
| items.append( | |
| { | |
| "filename": path.name, | |
| "size_bytes": stat.st_size, | |
| "modified_iso": iso_ts or time.strftime( | |
| "%Y-%m-%dT%H:%M:%S", time.localtime(stat.st_mtime) | |
| ), | |
| "wt_identifier": wt_id, | |
| "variant_count": variant_count, | |
| "top_fitness": top_fitness, | |
| "top_mutations": mutations_preview, | |
| } | |
| ) | |
| return jsonify({"items": items, "output_dir": str(OUTPUT_DIR)}) | |
| def library_download(filename: str) -> Response: | |
| """Stream a stored library CSV by filename. Anchored to OUTPUT_DIR to | |
| prevent path traversal.""" | |
| target = (OUTPUT_DIR / filename).resolve() | |
| if not str(target).startswith(str(OUTPUT_DIR.resolve())) or not target.is_file(): | |
| return jsonify({"error": "not found"}), 404 | |
| return send_file( | |
| str(target), | |
| mimetype="text/csv", | |
| as_attachment=True, | |
| download_name=target.name, | |
| ) | |
| def library_delete(filename: str) -> Response: | |
| """Delete a stored library CSV. Same path-traversal guard.""" | |
| target = (OUTPUT_DIR / filename).resolve() | |
| if not str(target).startswith(str(OUTPUT_DIR.resolve())) or not target.is_file(): | |
| return jsonify({"error": "not found"}), 404 | |
| try: | |
| target.unlink() | |
| except OSError as exc: | |
| return jsonify({"error": str(exc)}), 500 | |
| return jsonify({"ok": True}) | |
| def shutdown() -> Response: | |
| # Werkzeug's old shutdown hook is gone in Flask 3 — we exit the | |
| # process directly. The browser tab can stay open; the user clicked | |
| # quit so they're done. | |
| def _bye() -> None: | |
| time.sleep(0.2) | |
| os._exit(0) | |
| threading.Thread(target=_bye, daemon=True).start() | |
| return jsonify({"ok": True}) | |
| return app | |
| # ----------------------------------------------------------------- helpers | |
| def _materialize_input() -> tuple[Path, str]: | |
| """Pull the request's sequence into a temp file we can hand to parse_input.""" | |
| if "file" in request.files: | |
| file = request.files["file"] | |
| suffix = Path(file.filename or "").suffix or ".txt" | |
| tmp = tempfile.NamedTemporaryFile( | |
| delete=False, suffix=suffix, dir=str(STATE_DIR) | |
| ) | |
| file.save(tmp.name) | |
| tmp.close() | |
| return Path(tmp.name), file.filename or "uploaded" | |
| body = request.get_json(force=True, silent=True) or {} | |
| text = body.get("text", "").strip() | |
| if not text: | |
| raise SequenceValidationError("No sequence provided.") | |
| tmp = tempfile.NamedTemporaryFile( | |
| delete=False, suffix=".txt", mode="w", encoding="utf-8", dir=str(STATE_DIR) | |
| ) | |
| tmp.write(text) | |
| tmp.close() | |
| return Path(tmp.name), body.get("name") or "pasted" | |
| def _summarize(path: Path, original_name: str) -> tuple[Dict[str, Any], Dict[str, Any]]: | |
| """Return (JSON-friendly summary, extra session payload to cache). | |
| Three flow modes: | |
| 1. Structured file (.dna/.gb/...) → list annotated CDS features. | |
| 2. Raw input parses cleanly as a single CDS or protein → return preview. | |
| 3. Raw input fails CDS validation (e.g. user pasted a whole plasmid) → | |
| fall back to a 6-frame ORF scan and return ORF choices. | |
| """ | |
| suffix = path.suffix.lower() | |
| # Mode 1 — annotated structured file. | |
| if suffix in {".dna", ".gb", ".gbk", ".genbank", ".embl"}: | |
| features = list_cds_features(path) | |
| cds_options = [ | |
| {"label": label, "length_nt": length} for label, length in features | |
| ] | |
| record = parse_input(path, require_start=False, require_stop=False) | |
| return ( | |
| { | |
| "original_name": original_name, | |
| "detected_kind": "plasmid", | |
| "identifier": record.identifier, | |
| "protein_length": len(record.protein), | |
| "protein_preview": record.protein[:80] | |
| + ("…" if len(record.protein) > 80 else ""), | |
| "cds_options": cds_options, | |
| }, | |
| {"mode": "structured_file"}, | |
| ) | |
| # Mode 2 — try clean single-CDS / protein parse. | |
| try: | |
| record = parse_input(path, require_start=False, require_stop=False) | |
| return ( | |
| { | |
| "original_name": original_name, | |
| "detected_kind": "protein" if not record.dna else "dna", | |
| "identifier": record.identifier, | |
| "protein_length": len(record.protein), | |
| "protein_preview": record.protein[:80] | |
| + ("…" if len(record.protein) > 80 else ""), | |
| "cds_options": None, | |
| }, | |
| {"mode": "single"}, | |
| ) | |
| except SequenceValidationError as clean_exc: | |
| # Mode 3 — fall back to 6-frame ORF discovery for raw DNA. | |
| text = path.read_text(errors="ignore") | |
| orfs = find_orfs_in_dna(text) | |
| if not orfs: | |
| # Not recoverable; bubble the original parse error up. | |
| raise clean_exc | |
| cds_options = [ | |
| { | |
| "label": o.label, | |
| "length_nt": len(o.dna), | |
| "frame": o.frame, | |
| "protein_length": len(o.protein), | |
| } | |
| for o in orfs | |
| ] | |
| return ( | |
| { | |
| "original_name": original_name, | |
| "detected_kind": "multi_orf", | |
| "identifier": f"{Path(original_name).stem or 'sequence'} (no annotation)", | |
| "protein_length": 0, | |
| "protein_preview": ( | |
| f"{len(orfs)} ORFs detected across 6 reading frames " | |
| "(no annotated CDS features). Pick one below." | |
| ), | |
| "cds_options": cds_options, | |
| }, | |
| {"mode": "orf", "orfs": {o.label: o.dna for o in orfs}}, | |
| ) | |
| def _run_pipeline( | |
| job: JobState, | |
| input_path: Path, | |
| original_name: str, | |
| cds_feature: Optional[str], | |
| settings: Dict[str, Any], | |
| ) -> None: | |
| """Background worker. Updates ``job`` in place as it makes progress.""" | |
| try: | |
| job.status = "parsing" | |
| job.message = "Reading sequence…" | |
| job.progress = 0.05 | |
| record = parse_input( | |
| input_path, | |
| require_start=False, | |
| require_stop=False, | |
| cds_feature=cds_feature, | |
| ) | |
| job.wt_identifier = record.identifier | |
| job.wt_protein = record.protein | |
| job.settings_used = { | |
| "model": settings.get("model", "small"), | |
| "percentile": float(settings.get("percentile", 85.0)), | |
| "k": int(settings.get("k", 30)), | |
| "min_mutations": int(settings.get("min_mutations", 2)), | |
| "max_mutations": int(settings.get("max_mutations", 5)), | |
| "restarts": int(settings.get("restarts", 8)), | |
| "steps_per_restart": int(settings.get("steps", 1200)), | |
| "host": settings.get("host", "e_coli"), | |
| "seed": settings.get("seed"), | |
| "device": settings.get("device") or "cpu", | |
| "cds_feature": cds_feature, | |
| } | |
| job.status = "scoring" | |
| job.message = ( | |
| f"Scoring {len(record.protein)} residues × 19 substitutions with ESM-2…" | |
| ) | |
| job.progress = 0.15 | |
| scorer = ESM2Scorer( | |
| ScorerConfig( | |
| model_name=settings.get("model", "small"), | |
| device=settings.get("device"), | |
| quantization=settings.get("quantization"), | |
| ) | |
| ) | |
| scores_df = scorer.score_all_substitutions(record.protein) | |
| job.progress = 0.55 | |
| pool = top_percentile_pool(scores_df, percentile=float(settings.get("percentile", 85.0))) | |
| job.message = f"Filtered to {len(pool)} high-impact single-site mutations." | |
| job.progress = 0.60 | |
| job.status = "searching" | |
| job.message = ( | |
| f"Optimizing multi-mutants (K={int(settings.get('k', 30))}, " | |
| f"max {int(settings.get('max_mutations', 5))} subs)…" | |
| ) | |
| variants = evolve( | |
| pool, | |
| SearchConfig( | |
| k=int(settings.get("k", 30)), | |
| max_mutations=int(settings.get("max_mutations", 5)), | |
| min_mutations=int(settings.get("min_mutations", 2)), | |
| n_restarts=int(settings.get("restarts", 8)), | |
| steps_per_restart=int(settings.get("steps", 1200)), | |
| seed=settings.get("seed"), | |
| ), | |
| ) | |
| job.progress = 0.85 | |
| job.status = "encoding" | |
| job.message = f"Reverse-translating and scrubbing restriction sites for {len(variants)} variants…" | |
| df = variants_to_dataframe( | |
| record.protein, | |
| variants, | |
| host=settings.get("host", "e_coli"), | |
| forbidden_sites=DEFAULT_FORBIDDEN_SITES, | |
| ) | |
| job.progress = 0.95 | |
| stem = Path(original_name).stem or "library" | |
| safe_id = "".join(c if c.isalnum() else "_" for c in record.identifier) or "wt" | |
| ts = time.strftime("%Y%m%d_%H%M%S") | |
| filename = f"{stem}__{safe_id}__{ts}_dee_library.csv" | |
| primary = OUTPUT_DIR / filename | |
| write_library_csv(df, str(primary)) | |
| job.csv_path = str(primary) | |
| # Best-effort copy to Desktop. Requires the user to have granted the | |
| # .app Files & Folders → Desktop access (NSDesktopFolderUsageDescription | |
| # in Info.plist triggers the system prompt). | |
| desktop_dest = DESKTOP_DIR / filename | |
| try: | |
| desktop_dest.write_bytes(primary.read_bytes()) | |
| job.desktop_path = str(desktop_dest) | |
| except (PermissionError, OSError) as exc: | |
| logger.warning("Could not copy to Desktop (%s); CSV stays in ~/.dee/output/.", exc) | |
| job.desktop_path = None | |
| job.variants = df.to_dict(orient="records") | |
| job.status = "done" | |
| job.progress = 1.0 | |
| job.message = f"Done. {len(variants)} variants ready." | |
| job.finished_at = time.time() | |
| except SequenceValidationError as exc: | |
| job.status = "error" | |
| job.error = f"Couldn't parse input: {exc}" | |
| job.message = job.error | |
| job.finished_at = time.time() | |
| except Exception as exc: # noqa: BLE001 | |
| logger.exception("Pipeline crashed.") | |
| job.status = "error" | |
| job.error = f"{type(exc).__name__}: {exc}" | |
| job.message = job.error | |
| job.finished_at = time.time() | |
| def _fasta_from_job(job: JobState, *, of: str = "protein") -> str: | |
| """Serialize a job's variants as FASTA. ``of`` selects which sequence to | |
| emit per record — protein or DNA. Protein is the more common ask (mutants | |
| aligned against WT); DNA is included for completeness.""" | |
| if not job.variants: | |
| return "" | |
| lines: List[str] = [] | |
| seq_key = "Mutant_AA_Seq" if of == "protein" else "Optimized_DNA_Seq" | |
| for v in job.variants: | |
| header = ( | |
| f">{v.get('Variant_ID')} {v.get('Mutations_AA')} " | |
| f"fitness={v.get('Predicted_Fitness_Score')}" | |
| ) | |
| seq = v.get(seq_key, "") | |
| lines.append(header) | |
| for i in range(0, len(seq), 60): | |
| lines.append(seq[i : i + 60]) | |
| return "\n".join(lines) + "\n" | |
| def _genbank_from_job(job: JobState) -> str: | |
| """Build a multi-record GenBank file with one record per variant. | |
| Each record gets a CDS feature whose ``/translation`` is the mutant | |
| protein, plus a misc_feature per substitution so the mutation positions | |
| are visible when opened in SnapGene / Benchling. | |
| """ | |
| from Bio.Seq import Seq | |
| from Bio.SeqFeature import FeatureLocation, SeqFeature | |
| from Bio.SeqRecord import SeqRecord | |
| from Bio import SeqIO | |
| records = [] | |
| for v in job.variants or []: | |
| dna = v.get("Optimized_DNA_Seq", "") | |
| protein = v.get("Mutant_AA_Seq", "") | |
| muts = v.get("Mutations_AA", "") | |
| record = SeqRecord( | |
| Seq(dna), | |
| id=v.get("Variant_ID", "variant"), | |
| name=v.get("Variant_ID", "variant"), | |
| description=f"DEE designed variant; mutations={muts}; " | |
| f"fitness={v.get('Predicted_Fitness_Score')}", | |
| annotations={"molecule_type": "DNA", "organism": "synthetic construct"}, | |
| ) | |
| # Whole-CDS feature with the translation in /translation. | |
| cds_qualifiers = { | |
| "gene": [job.wt_identifier or "designed_variant"], | |
| "product": ["designed variant"], | |
| "note": [f"mutations={muts}"], | |
| } | |
| if protein: | |
| cds_qualifiers["translation"] = [protein] | |
| record.features.append( | |
| SeqFeature( | |
| FeatureLocation(0, len(dna)), | |
| type="CDS", | |
| qualifiers=cds_qualifiers, | |
| ) | |
| ) | |
| # One misc_feature per substitution at the nucleotide it touches. | |
| for mut in muts.split(","): | |
| mut = mut.strip() | |
| if not mut or len(mut) < 3: | |
| continue | |
| try: | |
| pos_aa = int("".join(c for c in mut[1:-1] if c.isdigit())) | |
| except ValueError: | |
| continue | |
| nt_start = (pos_aa - 1) * 3 | |
| nt_end = nt_start + 3 | |
| if nt_end > len(dna): | |
| continue | |
| record.features.append( | |
| SeqFeature( | |
| FeatureLocation(nt_start, nt_end), | |
| type="misc_feature", | |
| qualifiers={"label": [mut], "note": [f"Substitution {mut}"]}, | |
| ) | |
| ) | |
| records.append(record) | |
| buf = io.StringIO() | |
| SeqIO.write(records, buf, "genbank") | |
| return buf.getvalue() | |
| def _xlsx_from_job(job: JobState) -> bytes: | |
| """Write the variant table to a production-quality .xlsx workbook. | |
| Two sheets: | |
| * "Variants" — the table itself, with bold-colored header row, frozen | |
| top row, auto-filter, column widths sized per content type, number | |
| formatting per column (3 decimals for fitness, 1 for GC/Tm, integer | |
| for bp), and horizontally-clamped sequence columns so the long | |
| Mutant_AA_Seq / Optimized_DNA_Seq cells don't visually drown out | |
| everything else. | |
| * "Run info" — key-value pairs of every parameter the engine used, | |
| plus the auto-generated methods paragraph for copy-pasting into a | |
| Materials & Methods section. | |
| Requires ``openpyxl``. Returns raw bytes for the Flask handler to stream. | |
| """ | |
| import openpyxl | |
| from openpyxl.styles import Alignment, Border, Font, PatternFill, Side | |
| from openpyxl.utils import get_column_letter | |
| wb = openpyxl.Workbook() | |
| ws = wb.active | |
| ws.title = "Variants" | |
| variants = job.variants or [] | |
| if not variants: | |
| wb.remove(ws) | |
| ws = wb.create_sheet("Variants") | |
| ws["A1"] = "No variants in this run." | |
| buf = io.BytesIO() | |
| wb.save(buf) | |
| buf.seek(0) | |
| return buf.getvalue() | |
| headers = list(variants[0].keys()) | |
| # Header row — bold white on deep brand background, all caps, slightly | |
| # taller so it reads as a real header band. | |
| header_fill = PatternFill("solid", fgColor="0A5F77") | |
| header_font = Font(name="Inter", size=11, bold=True, color="FFFFFF") | |
| header_align = Alignment(horizontal="left", vertical="center", wrap_text=False) | |
| body_font = Font(name="Inter", size=10) | |
| body_align = Alignment(horizontal="left", vertical="top", wrap_text=False) | |
| num_align = Alignment(horizontal="right", vertical="top") | |
| mono_font = Font(name="JetBrains Mono", size=9) | |
| band_fill = PatternFill("solid", fgColor="F4F6F9") # alternating row color | |
| thin = Side(style="thin", color="E0E5EC") | |
| border = Border(left=thin, right=thin, top=thin, bottom=thin) | |
| # Column geometry — width sized to typical content per column type. | |
| # Number formats keyed to specific column names (rest = text). | |
| COL_WIDTH = { | |
| "Variant_ID": 11, | |
| "Mutations_AA": 28, | |
| "Mutant_AA_Seq": 52, | |
| "Optimized_DNA_Seq": 70, | |
| "Predicted_Fitness_Score": 14, | |
| "Length_bp": 9, | |
| "GC_Percent": 10, | |
| "Primer_Fwd": 32, | |
| "Primer_Fwd_Tm_C": 12, | |
| "Primer_Fwd_GC_Percent": 14, | |
| "Primer_Rev": 32, | |
| "Primer_Rev_Tm_C": 12, | |
| "Primer_Rev_GC_Percent": 14, | |
| "Annealing_Temp_C": 14, | |
| "Restriction_Sites_Found": 14, | |
| "Restriction_Sites_Unresolved": 16, | |
| } | |
| NUM_FORMAT = { | |
| "Predicted_Fitness_Score": "0.000", | |
| "GC_Percent": "0.0", | |
| "Primer_Fwd_Tm_C": "0.0", | |
| "Primer_Fwd_GC_Percent": "0.0", | |
| "Primer_Rev_Tm_C": "0.0", | |
| "Primer_Rev_GC_Percent": "0.0", | |
| "Annealing_Temp_C": "0.0", | |
| "Length_bp": "0", | |
| "Restriction_Sites_Found": "0", | |
| "Restriction_Sites_Unresolved": "0", | |
| } | |
| MONO_COLS = {"Mutations_AA", "Mutant_AA_Seq", "Optimized_DNA_Seq", | |
| "Primer_Fwd", "Primer_Rev", "Variant_ID"} | |
| NUM_COLS = set(NUM_FORMAT.keys()) | |
| # Write header row. | |
| for col_idx, col in enumerate(headers, start=1): | |
| cell = ws.cell(row=1, column=col_idx, value=col) | |
| cell.fill = header_fill | |
| cell.font = header_font | |
| cell.alignment = header_align | |
| cell.border = border | |
| ws.row_dimensions[1].height = 22 | |
| # Write data rows with alternating band fill + column-specific formatting. | |
| for row_idx, variant in enumerate(variants, start=2): | |
| is_band = (row_idx % 2 == 0) | |
| for col_idx, col in enumerate(headers, start=1): | |
| val = variant.get(col) | |
| cell = ws.cell(row=row_idx, column=col_idx, value=val) | |
| cell.border = border | |
| if is_band: | |
| cell.fill = band_fill | |
| if col in NUM_COLS: | |
| cell.alignment = num_align | |
| cell.number_format = NUM_FORMAT[col] | |
| cell.font = body_font | |
| elif col in MONO_COLS: | |
| cell.alignment = body_align | |
| cell.font = mono_font | |
| else: | |
| cell.alignment = body_align | |
| cell.font = body_font | |
| # Column widths. | |
| for col_idx, col in enumerate(headers, start=1): | |
| width = COL_WIDTH.get(col, 14) | |
| ws.column_dimensions[get_column_letter(col_idx)].width = width | |
| # Freeze the header row + first two columns so Variant_ID + Mutations stay | |
| # in view while horizontally scrolling through primer + sequence columns. | |
| ws.freeze_panes = "C2" | |
| # Auto-filter the whole used range so reviewers can sort/filter natively. | |
| ws.auto_filter.ref = ws.dimensions | |
| # ---- Sheet 2: Run info ------------------------------------------------ | |
| info = wb.create_sheet("Run info") | |
| label_font = Font(name="Inter", size=11, bold=True, color="0A5F77") | |
| value_font = Font(name="Inter", size=11) | |
| methods_font = Font(name="Inter", size=10) | |
| s = job.settings_used or {} | |
| model_label = { | |
| "small": "ESM-2 35M (facebook/esm2_t12_35M_UR50D)", | |
| "medium": "ESM-2 650M (facebook/esm2_t33_650M_UR50D)", | |
| "large": "ESM-2 3B (facebook/esm2_t36_3B_UR50D)", | |
| }.get(s.get("model"), s.get("model", "")) | |
| host_label = { | |
| "e_coli": "Escherichia coli", | |
| "yeast": "Saccharomyces cerevisiae", | |
| "human": "Homo sapiens", | |
| }.get(s.get("host"), s.get("host", "")) | |
| rows = [ | |
| ("Wild-type identifier", job.wt_identifier), | |
| ("Wild-type length", f"{len(job.wt_protein)} aa"), | |
| ("Model", model_label), | |
| ("Percentile cutoff", | |
| f"≥ p{s.get('percentile')} (top {100 - float(s.get('percentile', 85)):.0f}%)"), | |
| ("K variants returned", s.get("k")), | |
| ("Min mutations / variant", s.get("min_mutations")), | |
| ("Max mutations / variant", s.get("max_mutations")), | |
| ("SA restarts", s.get("restarts")), | |
| ("SA steps / restart", s.get("steps_per_restart")), | |
| ("Expression host", host_label), | |
| ("Compute device", s.get("device")), | |
| ("Random seed", s.get("seed") if s.get("seed") is not None else "(random)"), | |
| ("CDS feature picked", s.get("cds_feature") or "(none / single CDS)"), | |
| ("Run started (epoch s)", job.started_at), | |
| ("Wall time (s)", job.elapsed()), | |
| ] | |
| for i, (k, v) in enumerate(rows, start=1): | |
| info.cell(row=i, column=1, value=k).font = label_font | |
| info.cell(row=i, column=2, value=v).font = value_font | |
| # Methods paragraph | |
| info.cell(row=len(rows) + 2, column=1, value="Materials & methods (copy-paste)").font = label_font | |
| methods = ( | |
| f"Variants of {job.wt_identifier} ({len(job.wt_protein)} aa) were designed " | |
| f"in silico with {model_label} (Lin et al., Science 2022) using the " | |
| f"wild-type marginal log-likelihood scoring scheme of Meier et al. " | |
| f"(Adv. Neural Inf. Process. Syst. 2021). Single-point substitutions " | |
| f"in the top {100 - float(s.get('percentile', 85)):.0f}% by ΔLL were " | |
| f"retained as the combinatorial search space. {s.get('k')} multi-mutant " | |
| f"variants with {s.get('min_mutations')}–{s.get('max_mutations')} " | |
| f"simultaneous substitutions were generated by simulated annealing " | |
| f"({s.get('restarts')} restarts × {s.get('steps_per_restart')} steps, " | |
| f"geometric cooling) maximizing cumulative ΔLL with stop-codon and " | |
| f"duplicate-position penalties. Optimized DNA was reverse-translated " | |
| f"using {host_label} codon-usage frequencies and synonymously cleaned " | |
| f"of BsaI, BsmBI, and NotI recognition sites for Golden Gate " | |
| f"compatibility." | |
| ) | |
| mcell = info.cell(row=len(rows) + 3, column=1, value=methods) | |
| mcell.font = methods_font | |
| mcell.alignment = Alignment(wrap_text=True, vertical="top") | |
| info.merge_cells(start_row=len(rows) + 3, start_column=1, end_row=len(rows) + 3, end_column=4) | |
| info.row_dimensions[len(rows) + 3].height = 140 | |
| info.column_dimensions["A"].width = 32 | |
| info.column_dimensions["B"].width = 48 | |
| info.column_dimensions["C"].width = 16 | |
| info.column_dimensions["D"].width = 16 | |
| buf = io.BytesIO() | |
| wb.save(buf) | |
| buf.seek(0) | |
| return buf.getvalue() | |
| def _json_from_job(job: JobState) -> bytes: | |
| """Compact, programmer-friendly JSON dump of the run.""" | |
| payload = { | |
| "wt_identifier": job.wt_identifier, | |
| "wt_protein": job.wt_protein, | |
| "wt_protein_length": len(job.wt_protein), | |
| "variants": job.variants or [], | |
| } | |
| return json.dumps(payload, indent=2).encode("utf-8") | |
| def _tsv_from_job(job: JobState) -> bytes: | |
| """Tab-separated equivalent of the CSV — friendlier for some analysis | |
| pipelines (R, awk, Perl) that hate quoted commas.""" | |
| import pandas as pd | |
| df = pd.DataFrame(job.variants or []) | |
| return df.to_csv(sep="\t", index=False).encode("utf-8") | |
| # Format registry — keyed by the value of ``?format=``. Each entry produces | |
| # (payload, mimetype, file-extension). New formats are a one-line addition. | |
| def _render_download(job: JobState, fmt: str): | |
| if fmt == "csv": | |
| # Excel in many European locales (incl. the user's Georgian Excel) | |
| # treats CSV with ',' delimiter as a single column unless told | |
| # otherwise. We prepend Excel's "sep=," hint line and a UTF-8 BOM so | |
| # double-clicking the file opens it with proper columns everywhere. | |
| with open(job.csv_path, "rb") as f: | |
| raw = f.read() | |
| decoded = raw.decode("utf-8", errors="replace") | |
| bom = b"\xef\xbb\xbf" | |
| body = (b"sep=,\r\n" + decoded.encode("utf-8")) | |
| return bom + body, "text/csv", "csv" | |
| if fmt == "tsv": | |
| return _tsv_from_job(job), "text/tab-separated-values", "tsv" | |
| if fmt == "fasta": | |
| return _fasta_from_job(job, of="protein").encode("utf-8"), "text/x-fasta", "fasta" | |
| if fmt == "fasta-dna": | |
| return _fasta_from_job(job, of="dna").encode("utf-8"), "text/x-fasta", "fna" | |
| if fmt in ("gb", "genbank"): | |
| return _genbank_from_job(job).encode("utf-8"), "text/x-genbank", "gb" | |
| if fmt in ("xlsx", "excel"): | |
| return ( | |
| _xlsx_from_job(job), | |
| "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| "xlsx", | |
| ) | |
| if fmt == "json": | |
| return _json_from_job(job), "application/json", "json" | |
| raise ValueError( | |
| "format must be one of: csv, tsv, fasta, fasta-dna, gb, xlsx, json" | |
| ) | |
| # ----------------------------------------------------------------- CLI entry | |
| def serve(host: Optional[str] = None, port: Optional[int] = None) -> None: | |
| """Run the Flask server. | |
| Honors ``HOST`` and ``PORT`` env vars first (so the same image runs under | |
| Hugging Face Spaces' fixed ``PORT=7860`` contract, Fly.io's ``8080``, | |
| Render's ``10000``, etc.) before falling back to local-dev defaults | |
| (``127.0.0.1:4789``). | |
| """ | |
| env_host = os.environ.get("HOST") | |
| env_port = os.environ.get("PORT") | |
| host = host or env_host or "127.0.0.1" | |
| port = port or (int(env_port) if env_port else 4789) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s %(name)s: %(message)s", | |
| ) | |
| app = create_app() | |
| logger.info("Digital Evolution Engine listening on http://%s:%s", host, port) | |
| print(f"DEE_READY http://{host}:{port}", flush=True) | |
| app.run(host=host, port=port, debug=False, use_reloader=False, threaded=True) | |
| if __name__ == "__main__": | |
| serve() | |