"""Local web UI for the Digital Evolution Engine. Single-user Flask server intended to be launched by the desktop ``.app`` and opened in the user's browser. Stateful by design (long-running jobs are kept in memory, results held until a new job replaces them) — this is *not* a multi-tenant service. API surface: POST /api/preview parse a sequence (file upload OR JSON body) and return its metadata + any CDS features. POST /api/run enqueue a pipeline job. Returns immediately with a job_id; the work runs in a background thread. GET /api/status/:id poll job progress. GET /api/result/:id fetch the variant table once a job has finished. GET /api/download/:id?format=csv|fasta download outputs. POST /api/shutdown cleanly stop the server (used by the desktop app when the user explicitly quits). """ from __future__ import annotations import hashlib import io import json import logging import os import re import tempfile import threading import time import traceback import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional from flask import Flask, Response, jsonify, request, send_file, send_from_directory from dee.core.codon import ( DEFAULT_FORBIDDEN_SITES, pcr_metrics, reverse_translate, scrub_restriction_sites, variants_to_dataframe, write_library_csv, ) from dee.core.sequence import ( SequenceValidationError, find_orfs_in_dna, list_cds_features, parse_input, ) from dee.models.scorer import ESM2Scorer, ScorerConfig, top_percentile_pool from dee.optimizer.search import SearchConfig, apply_variant, evolve logger = logging.getLogger("dee.server") STATIC_DIR = Path(__file__).resolve().parent / "static" STATE_DIR = Path.home() / ".dee" / "state" OUTPUT_DIR = Path.home() / ".dee" / "output" DESKTOP_DIR = Path.home() / "Desktop" STATE_DIR.mkdir(parents=True, exist_ok=True) OUTPUT_DIR.mkdir(parents=True, exist_ok=True) def _patch_ssl_for_macos() -> None: """Some macOS Python builds ship without a system CA bundle, so ``urllib.request.urlopen`` (which Biopython's NCBIWWW.qblast uses internally) blows up with ``CERTIFICATE_VERIFY_FAILED: self-signed certificate in certificate chain`` when it tries to reach https://blast.ncbi.nlm.nih.gov. Fix it by installing a default urllib opener whose HTTPS handler uses certifi's bundled CA roots (certifi is already a transitive dep of huggingface-hub). Idempotent and safe to call multiple times. """ import ssl import urllib.request try: import certifi except ImportError: # pragma: no cover — certifi is a dep of HF hub logger.warning("certifi not installed; BLAST may fail on macOS Pythons.") return try: ctx = ssl.create_default_context(cafile=certifi.where()) https_handler = urllib.request.HTTPSHandler(context=ctx) opener = urllib.request.build_opener(https_handler) urllib.request.install_opener(opener) # Also override the module-level default context so callers that go # around urllib (e.g. http.client direct) still get the certifi roots. ssl._create_default_https_context = lambda: ssl.create_default_context( cafile=certifi.where() ) logger.info("Patched urllib SSL context with certifi CA bundle.") except Exception as exc: # noqa: BLE001 — best-effort patch logger.warning("Could not patch SSL CA bundle: %s", exc) _patch_ssl_for_macos() # ----------------------------------------------------------------- job state @dataclass class JobState: job_id: str status: str = "pending" progress: float = 0.0 message: str = "Queued." started_at: float = field(default_factory=time.time) finished_at: Optional[float] = None wt_identifier: str = "" wt_protein: str = "" csv_path: Optional[str] = None desktop_path: Optional[str] = None variants: Optional[List[Dict[str, Any]]] = None error: Optional[str] = None # Frozen copy of the user's settings + the parameters the engine actually # used. Surfaces in the UI's metadata pills and the auto-generated # methods paragraph; also a paper-trail for reproducibility. settings_used: Dict[str, Any] = field(default_factory=dict) def elapsed(self) -> float: end = self.finished_at if self.finished_at else time.time() return round(end - self.started_at, 1) def public(self) -> Dict[str, Any]: return { "job_id": self.job_id, "status": self.status, "progress": round(self.progress, 3), "message": self.message, "started_at": self.started_at, "elapsed_seconds": self.elapsed(), "wt_identifier": self.wt_identifier, "wt_protein_length": len(self.wt_protein), "csv_path": self.csv_path, "desktop_path": self.desktop_path, "error": self.error, } _JOBS: Dict[str, JobState] = {} _JOBS_LOCK = threading.Lock() def _get_job(job_id: str) -> Optional[JobState]: with _JOBS_LOCK: return _JOBS.get(job_id) def _put_job(job: JobState) -> None: with _JOBS_LOCK: _JOBS[job.job_id] = job # ----------------------------------------------------------------- preview cache # Uploaded files / pasted strings are cached briefly so /api/preview and the # subsequent /api/run don't have to re-receive the same payload. Keyed by a # session_id we hand back to the browser. _SESSIONS: Dict[str, Dict[str, Any]] = {} _SESSIONS_LOCK = threading.Lock() # ----------------------------------------------------------------- BLAST jobs @dataclass class BlastJob: """One asynchronous NCBI-BLAST submission. Keyed by sequence hash so repeat lookups on the same WT protein return instantly from cache.""" job_id: str seq_hash: str status: str = "pending" started_at: float = field(default_factory=time.time) finished_at: Optional[float] = None hits: List[Dict[str, Any]] = field(default_factory=list) error: Optional[str] = None def elapsed(self) -> float: end = self.finished_at if self.finished_at else time.time() return round(end - self.started_at, 1) def public(self) -> Dict[str, Any]: return { "job_id": self.job_id, "status": self.status, "elapsed_seconds": self.elapsed(), "hits": self.hits, "error": self.error, } _BLAST_CACHE: Dict[str, BlastJob] = {} # seq_hash -> BlastJob _BLAST_BY_ID: Dict[str, BlastJob] = {} # job_id -> BlastJob _BLAST_LOCK = threading.Lock() def _hash_sequence(protein: str) -> str: return hashlib.sha256(protein.encode("utf-8")).hexdigest()[:16] def _extract_organism(hit_def: str) -> str: """NCBI hit descriptions look like 'GFP [Aequorea victoria]'. Pull out the bracketed organism if present; fall back to empty string.""" m = re.search(r"\[([^\]]+)\]", hit_def or "") return m.group(1) if m else "" def _trim_description(hit_def: str) -> str: """Strip the trailing '[organism]' from the description for cleaner display — the organism is shown separately.""" return re.sub(r"\s*\[[^\]]+\]\s*$", "", hit_def or "").strip() def _extract_uniprot(alignment) -> Optional[str]: """Pull a UniProt accession out of an NCBI BLAST alignment if there is one. NCBI hit_id strings look like: ``sp|P12345|HUMAN_GENE`` (SwissProt — has UniProt accession) ``ref|NP_001234.1|`` (RefSeq — no direct UniProt mapping) ``pdb|1ABC|A`` (PDB) ``gb|AAA12345.1|`` (GenBank) AlphaFold-DB only indexes UniProt entries, so we only return something when we see ``sp|`` (SwissProt) or ``tr|`` (TrEMBL) prefixes. """ hit_id = getattr(alignment, "hit_id", "") or "" for token in hit_id.split(";"): token = token.strip() m = re.match(r"^(?:sp|tr)\|([A-Z0-9]+(?:-\d+)?)\|", token) if m: return m.group(1) return None def _run_blast(job: BlastJob, protein: str) -> None: """Submit to NCBI BLAST and parse the top hits. Uses Biopython's NCBIWWW wrapper (the public web service). Polite delays and rate limits are imposed by NCBI server-side; we keep hitlist small and expect strict to minimize bandwidth. """ try: # Biopython is heavy; import lazily so the rest of the server boots # without a hard Bio dependency at startup. from Bio.Blast import NCBIWWW, NCBIXML job.status = "submitting" result_handle = NCBIWWW.qblast( program="blastp", database="nr", sequence=protein, hitlist_size=5, expect=1e-5, ) job.status = "parsing" for record in NCBIXML.parse(result_handle): for alignment in record.alignments[:5]: if not alignment.hsps: continue # Use the best (lowest-evalue) HSP from each alignment. hsp = alignment.hsps[0] identity_pct = round( 100.0 * hsp.identities / max(1, hsp.align_length), 1 ) coverage_pct = round( 100.0 * hsp.align_length / max(1, len(protein)), 1 ) uniprot = _extract_uniprot(alignment) job.hits.append( { "accession": getattr(alignment, "accession", "") or "", "hit_id": getattr(alignment, "hit_id", "") or "", "description": _trim_description(alignment.hit_def), "organism": _extract_organism(alignment.hit_def), "length": alignment.length, "identity_pct": identity_pct, "coverage_pct": coverage_pct, "evalue": float(hsp.expect), "bit_score": float(hsp.bits), # UniProt accession when available — gates the # AlphaFold-DB structure embed on the frontend. "uniprot": uniprot, "alphafold_url": ( f"https://alphafold.ebi.ac.uk/files/AF-{uniprot}-F1-model_v4.pdb" if uniprot else None ), "alphafold_page": ( f"https://alphafold.ebi.ac.uk/entry/{uniprot}" if uniprot else None ), } ) break # only one query record job.status = "done" except Exception as exc: # noqa: BLE001 — surface anything as an error logger.exception("BLAST failed.") job.status = "error" job.error = f"{type(exc).__name__}: {exc}" finally: job.finished_at = time.time() def _new_session(payload: Dict[str, Any]) -> str: sid = uuid.uuid4().hex[:12] payload["created_at"] = time.time() with _SESSIONS_LOCK: _SESSIONS[sid] = payload # Garbage-collect anything older than 30 min. cutoff = time.time() - 30 * 60 stale = [k for k, v in _SESSIONS.items() if v.get("created_at", 0) < cutoff] for k in stale: _SESSIONS.pop(k, None) return sid def _get_session(sid: str) -> Optional[Dict[str, Any]]: with _SESSIONS_LOCK: return _SESSIONS.get(sid) # ----------------------------------------------------------------- Flask app def create_app() -> Flask: app = Flask(__name__, static_folder=None) @app.get("/") def index() -> Response: return send_from_directory(STATIC_DIR, "index.html") @app.get("/static/") def static_file(filename: str) -> Response: return send_from_directory(STATIC_DIR, filename) @app.post("/api/preview") def preview() -> Response: """Accept a sequence and report what we found (no scoring yet). Accepts EITHER: * multipart/form-data with ``file`` field, or * application/json with ``{"text": "..."}``. If the input parses cleanly as a single CDS or protein, returns its metadata. If it's a structured plasmid file or a raw DNA blob with multiple ORFs, returns a list of choices in ``cds_options`` so the UI can show a picker before kicking off a run. """ try: tmp_path, original_name = _materialize_input() payload, session_extras = _summarize(tmp_path, original_name) sid = _new_session( {"path": str(tmp_path), "name": original_name, **session_extras} ) payload["session_id"] = sid return jsonify(payload) except SequenceValidationError as exc: return jsonify( { "error": str(exc), "kind": "validation", "nt_position": exc.nt_position, "code": exc.code, } ), 400 except Exception as exc: # noqa: BLE001 — surface anything else as 500. logger.exception("Preview failed.") return jsonify({"error": str(exc), "kind": "internal"}), 500 @app.post("/api/run") def start_run() -> Response: body = request.get_json(force=True, silent=True) or {} sid = body.get("session_id") if not sid: return jsonify({"error": "missing session_id"}), 400 session = _get_session(sid) if not session: return jsonify({"error": "session expired; re-upload the sequence"}), 410 settings = body.get("settings") or {} cds_feature = body.get("cds_feature") job = JobState(job_id=uuid.uuid4().hex[:12]) _put_job(job) # If the session has an ORF map and the user picked an ORF, materialize # that ORF's DNA into a fresh temp file and use it as the pipeline input. run_path = Path(session["path"]) run_name = session["name"] orf_map = session.get("orfs") or {} if orf_map and cds_feature and cds_feature in orf_map: tmp = tempfile.NamedTemporaryFile( delete=False, suffix=".fasta", mode="w", encoding="utf-8", dir=str(STATE_DIR), ) tmp.write(f">{cds_feature}\n{orf_map[cds_feature]}\n") tmp.close() run_path = Path(tmp.name) run_name = f"{Path(run_name).stem}_{cds_feature}" # Already extracted a clean ORF — don't re-pass the label downstream. cds_feature = None thread = threading.Thread( target=_run_pipeline, args=(job, run_path, run_name, cds_feature, settings), daemon=True, ) thread.start() return jsonify({"job_id": job.job_id}) @app.get("/api/status/") def status(job_id: str) -> Response: job = _get_job(job_id) if not job: return jsonify({"error": "unknown job"}), 404 return jsonify(job.public()) @app.get("/api/result/") def result(job_id: str) -> Response: job = _get_job(job_id) if not job: return jsonify({"error": "unknown job"}), 404 if job.status != "done": return jsonify({"error": f"job not done (status={job.status})"}), 409 return jsonify( { "wt_identifier": job.wt_identifier, "wt_protein": job.wt_protein, "variants": job.variants or [], "csv_path": job.csv_path, "desktop_path": job.desktop_path, "settings_used": job.settings_used, "started_at": job.started_at, "elapsed_seconds": job.elapsed(), } ) @app.get("/api/download/") def download(job_id: str) -> Response: job = _get_job(job_id) if not job or not job.csv_path: return jsonify({"error": "no result available"}), 404 fmt = request.args.get("format", "csv").lower() stem = Path(job.csv_path).stem try: data, mimetype, ext = _render_download(job, fmt) except ValueError as exc: return jsonify({"error": str(exc)}), 400 return send_file( io.BytesIO(data) if isinstance(data, bytes) else data, mimetype=mimetype, as_attachment=True, download_name=f"{stem}.{ext}", ) @app.post("/api/identify") def identify_start() -> Response: """Kick off an NCBI-BLAST identification for the session's WT protein. Returns ``{job_id, cached, hits?}``. If we already ran BLAST on this exact sequence we return the prior result inline with cached=true. """ body = request.get_json(force=True, silent=True) or {} sid = body.get("session_id") if not sid: return jsonify({"error": "missing session_id"}), 400 session = _get_session(sid) if not session: return jsonify({"error": "session expired"}), 410 cds_feature = body.get("cds_feature") # Re-derive the protein to BLAST. For sessions with discovered ORFs # the user may have picked a specific one — use that if provided. orf_map = session.get("orfs") or {} if orf_map and cds_feature and cds_feature in orf_map: tmp_path = Path(STATE_DIR / f"blast_input_{uuid.uuid4().hex[:8]}.fa") tmp_path.write_text(f">{cds_feature}\n{orf_map[cds_feature]}\n") record = parse_input(tmp_path, require_start=False, require_stop=False) tmp_path.unlink(missing_ok=True) else: record = parse_input( Path(session["path"]), require_start=False, require_stop=False, cds_feature=cds_feature, ) protein = record.protein if not protein: return jsonify({"error": "no protein available to identify"}), 400 seq_hash = _hash_sequence(protein) with _BLAST_LOCK: cached = _BLAST_CACHE.get(seq_hash) if cached and cached.status in {"done", "running", "submitting", "parsing", "pending"}: return jsonify( { "job_id": cached.job_id, "cached": cached.status == "done", **cached.public(), } ) job = BlastJob(job_id=uuid.uuid4().hex[:12], seq_hash=seq_hash) _BLAST_CACHE[seq_hash] = job _BLAST_BY_ID[job.job_id] = job threading.Thread( target=_run_blast, args=(job, protein), daemon=True ).start() return jsonify({"job_id": job.job_id, "cached": False, **job.public()}) @app.get("/api/identify/") def identify_status(job_id: str) -> Response: with _BLAST_LOCK: job = _BLAST_BY_ID.get(job_id) if not job: return jsonify({"error": "unknown job"}), 404 return jsonify(job.public()) @app.get("/api/library") def library() -> Response: """List every variant library CSV under ~/.dee/output/ for the Library + History pages. Each entry includes filename, on-disk size, timestamp parsed from the filename, the WT identifier embedded in the filename, variant count, and the top-fitness number — enough to populate a list view without re-parsing whole CSVs. """ import csv as csv_mod items: List[Dict[str, Any]] = [] for path in sorted(OUTPUT_DIR.glob("*_dee_library.csv"), reverse=True): try: stat = path.stat() except OSError: continue # Filename convention from _run_pipeline: # {stem}__{wt_id}__{YYYYMMDD_HHMMSS}_dee_library.csv m = re.match( r"^(?P.+?)__(?P[^_]+(?:_[^_]+)*?)__" r"(?P\d{8}_\d{6})_dee_library\.csv$", path.name, ) wt_id = m.group("wt") if m else path.stem ts_raw = m.group("ts") if m else None iso_ts = None if ts_raw: try: dt = time.strptime(ts_raw, "%Y%m%d_%H%M%S") iso_ts = time.strftime("%Y-%m-%dT%H:%M:%S", dt) except ValueError: pass # Quick scan: variant count + top fitness from first data row. variant_count = 0 top_fitness: Optional[float] = None mutations_preview = "" try: with path.open("r", encoding="utf-8") as f: reader = csv_mod.DictReader(f) for row in reader: variant_count += 1 if variant_count == 1: try: top_fitness = float(row.get("Predicted_Fitness_Score") or 0.0) except ValueError: top_fitness = None mutations_preview = row.get("Mutations_AA", "") except Exception as exc: # noqa: BLE001 logger.warning("Could not read %s: %s", path, exc) items.append( { "filename": path.name, "size_bytes": stat.st_size, "modified_iso": iso_ts or time.strftime( "%Y-%m-%dT%H:%M:%S", time.localtime(stat.st_mtime) ), "wt_identifier": wt_id, "variant_count": variant_count, "top_fitness": top_fitness, "top_mutations": mutations_preview, } ) return jsonify({"items": items, "output_dir": str(OUTPUT_DIR)}) @app.get("/api/library/") def library_download(filename: str) -> Response: """Stream a stored library CSV by filename. Anchored to OUTPUT_DIR to prevent path traversal.""" target = (OUTPUT_DIR / filename).resolve() if not str(target).startswith(str(OUTPUT_DIR.resolve())) or not target.is_file(): return jsonify({"error": "not found"}), 404 return send_file( str(target), mimetype="text/csv", as_attachment=True, download_name=target.name, ) @app.delete("/api/library/") def library_delete(filename: str) -> Response: """Delete a stored library CSV. Same path-traversal guard.""" target = (OUTPUT_DIR / filename).resolve() if not str(target).startswith(str(OUTPUT_DIR.resolve())) or not target.is_file(): return jsonify({"error": "not found"}), 404 try: target.unlink() except OSError as exc: return jsonify({"error": str(exc)}), 500 return jsonify({"ok": True}) @app.post("/api/shutdown") def shutdown() -> Response: # Werkzeug's old shutdown hook is gone in Flask 3 — we exit the # process directly. The browser tab can stay open; the user clicked # quit so they're done. def _bye() -> None: time.sleep(0.2) os._exit(0) threading.Thread(target=_bye, daemon=True).start() return jsonify({"ok": True}) return app # ----------------------------------------------------------------- helpers def _materialize_input() -> tuple[Path, str]: """Pull the request's sequence into a temp file we can hand to parse_input.""" if "file" in request.files: file = request.files["file"] suffix = Path(file.filename or "").suffix or ".txt" tmp = tempfile.NamedTemporaryFile( delete=False, suffix=suffix, dir=str(STATE_DIR) ) file.save(tmp.name) tmp.close() return Path(tmp.name), file.filename or "uploaded" body = request.get_json(force=True, silent=True) or {} text = body.get("text", "").strip() if not text: raise SequenceValidationError("No sequence provided.") tmp = tempfile.NamedTemporaryFile( delete=False, suffix=".txt", mode="w", encoding="utf-8", dir=str(STATE_DIR) ) tmp.write(text) tmp.close() return Path(tmp.name), body.get("name") or "pasted" def _summarize(path: Path, original_name: str) -> tuple[Dict[str, Any], Dict[str, Any]]: """Return (JSON-friendly summary, extra session payload to cache). Three flow modes: 1. Structured file (.dna/.gb/...) → list annotated CDS features. 2. Raw input parses cleanly as a single CDS or protein → return preview. 3. Raw input fails CDS validation (e.g. user pasted a whole plasmid) → fall back to a 6-frame ORF scan and return ORF choices. """ suffix = path.suffix.lower() # Mode 1 — annotated structured file. if suffix in {".dna", ".gb", ".gbk", ".genbank", ".embl"}: features = list_cds_features(path) cds_options = [ {"label": label, "length_nt": length} for label, length in features ] record = parse_input(path, require_start=False, require_stop=False) return ( { "original_name": original_name, "detected_kind": "plasmid", "identifier": record.identifier, "protein_length": len(record.protein), "protein_preview": record.protein[:80] + ("…" if len(record.protein) > 80 else ""), "cds_options": cds_options, }, {"mode": "structured_file"}, ) # Mode 2 — try clean single-CDS / protein parse. try: record = parse_input(path, require_start=False, require_stop=False) return ( { "original_name": original_name, "detected_kind": "protein" if not record.dna else "dna", "identifier": record.identifier, "protein_length": len(record.protein), "protein_preview": record.protein[:80] + ("…" if len(record.protein) > 80 else ""), "cds_options": None, }, {"mode": "single"}, ) except SequenceValidationError as clean_exc: # Mode 3 — fall back to 6-frame ORF discovery for raw DNA. text = path.read_text(errors="ignore") orfs = find_orfs_in_dna(text) if not orfs: # Not recoverable; bubble the original parse error up. raise clean_exc cds_options = [ { "label": o.label, "length_nt": len(o.dna), "frame": o.frame, "protein_length": len(o.protein), } for o in orfs ] return ( { "original_name": original_name, "detected_kind": "multi_orf", "identifier": f"{Path(original_name).stem or 'sequence'} (no annotation)", "protein_length": 0, "protein_preview": ( f"{len(orfs)} ORFs detected across 6 reading frames " "(no annotated CDS features). Pick one below." ), "cds_options": cds_options, }, {"mode": "orf", "orfs": {o.label: o.dna for o in orfs}}, ) def _run_pipeline( job: JobState, input_path: Path, original_name: str, cds_feature: Optional[str], settings: Dict[str, Any], ) -> None: """Background worker. Updates ``job`` in place as it makes progress.""" try: job.status = "parsing" job.message = "Reading sequence…" job.progress = 0.05 record = parse_input( input_path, require_start=False, require_stop=False, cds_feature=cds_feature, ) job.wt_identifier = record.identifier job.wt_protein = record.protein job.settings_used = { "model": settings.get("model", "small"), "percentile": float(settings.get("percentile", 85.0)), "k": int(settings.get("k", 30)), "min_mutations": int(settings.get("min_mutations", 2)), "max_mutations": int(settings.get("max_mutations", 5)), "restarts": int(settings.get("restarts", 8)), "steps_per_restart": int(settings.get("steps", 1200)), "host": settings.get("host", "e_coli"), "seed": settings.get("seed"), "device": settings.get("device") or "cpu", "cds_feature": cds_feature, } job.status = "scoring" job.message = ( f"Scoring {len(record.protein)} residues × 19 substitutions with ESM-2…" ) job.progress = 0.15 scorer = ESM2Scorer( ScorerConfig( model_name=settings.get("model", "small"), device=settings.get("device"), quantization=settings.get("quantization"), ) ) scores_df = scorer.score_all_substitutions(record.protein) job.progress = 0.55 pool = top_percentile_pool(scores_df, percentile=float(settings.get("percentile", 85.0))) job.message = f"Filtered to {len(pool)} high-impact single-site mutations." job.progress = 0.60 job.status = "searching" job.message = ( f"Optimizing multi-mutants (K={int(settings.get('k', 30))}, " f"max {int(settings.get('max_mutations', 5))} subs)…" ) variants = evolve( pool, SearchConfig( k=int(settings.get("k", 30)), max_mutations=int(settings.get("max_mutations", 5)), min_mutations=int(settings.get("min_mutations", 2)), n_restarts=int(settings.get("restarts", 8)), steps_per_restart=int(settings.get("steps", 1200)), seed=settings.get("seed"), ), ) job.progress = 0.85 job.status = "encoding" job.message = f"Reverse-translating and scrubbing restriction sites for {len(variants)} variants…" df = variants_to_dataframe( record.protein, variants, host=settings.get("host", "e_coli"), forbidden_sites=DEFAULT_FORBIDDEN_SITES, ) # --------------------------------------------------------------- # Build a WT pseudo-row with the SAME codon-optimization and # restriction-site scrub applied to variants, so the user can # order the unmutated wild-type alongside the evolved variants. # Variant_ID 'WT' is special-cased on the client. # --------------------------------------------------------------- try: wt_host = settings.get("host", "e_coli") wt_raw_dna = reverse_translate(record.protein, host=wt_host, append_stop=True) wt_clean_dna, wt_report = scrub_restriction_sites( wt_raw_dna, record.protein + "*", host=wt_host, forbidden_sites=DEFAULT_FORBIDDEN_SITES, ) wt_pcr = pcr_metrics(wt_clean_dna) wt_row = { "Variant_ID": "WT", "Mutations_AA": "", "Mutant_AA_Seq": record.protein, "Optimized_DNA_Seq": wt_clean_dna, "Predicted_Fitness_Score": 0.0, "Length_bp": wt_pcr.get("length_bp"), "GC_Percent": wt_pcr.get("gc_percent"), "Primer_Fwd": wt_pcr.get("primer_fwd"), "Primer_Fwd_Tm_C": wt_pcr.get("primer_fwd_tm_c"), "Primer_Fwd_GC_Percent": wt_pcr.get("primer_fwd_gc"), "Primer_Rev": wt_pcr.get("primer_rev"), "Primer_Rev_Tm_C": wt_pcr.get("primer_rev_tm_c"), "Primer_Rev_GC_Percent": wt_pcr.get("primer_rev_gc"), "Annealing_Temp_C": wt_pcr.get("annealing_temp_c"), "Restriction_Sites_Found": sum(wt_report.sites_found.values()), "Restriction_Sites_Unresolved": len(wt_report.unresolved), } except Exception as wt_exc: # noqa: BLE001 — never fail the whole run for the WT row logger.warning("Couldn't build WT pseudo-row: %s", wt_exc) wt_row = None job.progress = 0.95 stem = Path(original_name).stem or "library" safe_id = "".join(c if c.isalnum() else "_" for c in record.identifier) or "wt" ts = time.strftime("%Y%m%d_%H%M%S") filename = f"{stem}__{safe_id}__{ts}_dee_library.csv" primary = OUTPUT_DIR / filename write_library_csv(df, str(primary)) job.csv_path = str(primary) # Best-effort copy to Desktop. Requires the user to have granted the # .app Files & Folders → Desktop access (NSDesktopFolderUsageDescription # in Info.plist triggers the system prompt). desktop_dest = DESKTOP_DIR / filename try: desktop_dest.write_bytes(primary.read_bytes()) job.desktop_path = str(desktop_dest) except (PermissionError, OSError) as exc: logger.warning("Could not copy to Desktop (%s); CSV stays in ~/.dee/output/.", exc) job.desktop_path = None variant_rows = df.to_dict(orient="records") # WT row first — UI sorts by Variant_ID so 'WT' sorts naturally last by # default; the client explicitly pins it to row 0. job.variants = ([wt_row] + variant_rows) if wt_row else variant_rows job.status = "done" job.progress = 1.0 job.message = f"Done. {len(variants)} variants ready." job.finished_at = time.time() except SequenceValidationError as exc: job.status = "error" job.error = f"Couldn't parse input: {exc}" job.message = job.error job.finished_at = time.time() except Exception as exc: # noqa: BLE001 logger.exception("Pipeline crashed.") job.status = "error" job.error = f"{type(exc).__name__}: {exc}" job.message = job.error job.finished_at = time.time() def _fasta_from_job(job: JobState, *, of: str = "protein") -> str: """Serialize a job's variants as FASTA. ``of`` selects which sequence to emit per record — protein or DNA. Protein is the more common ask (mutants aligned against WT); DNA is included for completeness.""" if not job.variants: return "" lines: List[str] = [] seq_key = "Mutant_AA_Seq" if of == "protein" else "Optimized_DNA_Seq" for v in job.variants: header = ( f">{v.get('Variant_ID')} {v.get('Mutations_AA')} " f"fitness={v.get('Predicted_Fitness_Score')}" ) seq = v.get(seq_key, "") lines.append(header) for i in range(0, len(seq), 60): lines.append(seq[i : i + 60]) return "\n".join(lines) + "\n" def _genbank_from_job(job: JobState) -> str: """Build a multi-record GenBank file with one record per variant. Each record gets a CDS feature whose ``/translation`` is the mutant protein, plus a misc_feature per substitution so the mutation positions are visible when opened in SnapGene / Benchling. """ from Bio.Seq import Seq from Bio.SeqFeature import FeatureLocation, SeqFeature from Bio.SeqRecord import SeqRecord from Bio import SeqIO records = [] for v in job.variants or []: dna = v.get("Optimized_DNA_Seq", "") protein = v.get("Mutant_AA_Seq", "") muts = v.get("Mutations_AA", "") record = SeqRecord( Seq(dna), id=v.get("Variant_ID", "variant"), name=v.get("Variant_ID", "variant"), description=f"DEE designed variant; mutations={muts}; " f"fitness={v.get('Predicted_Fitness_Score')}", annotations={"molecule_type": "DNA", "organism": "synthetic construct"}, ) # Whole-CDS feature with the translation in /translation. cds_qualifiers = { "gene": [job.wt_identifier or "designed_variant"], "product": ["designed variant"], "note": [f"mutations={muts}"], } if protein: cds_qualifiers["translation"] = [protein] record.features.append( SeqFeature( FeatureLocation(0, len(dna)), type="CDS", qualifiers=cds_qualifiers, ) ) # One misc_feature per substitution at the nucleotide it touches. for mut in muts.split(","): mut = mut.strip() if not mut or len(mut) < 3: continue try: pos_aa = int("".join(c for c in mut[1:-1] if c.isdigit())) except ValueError: continue nt_start = (pos_aa - 1) * 3 nt_end = nt_start + 3 if nt_end > len(dna): continue record.features.append( SeqFeature( FeatureLocation(nt_start, nt_end), type="misc_feature", qualifiers={"label": [mut], "note": [f"Substitution {mut}"]}, ) ) records.append(record) buf = io.StringIO() SeqIO.write(records, buf, "genbank") return buf.getvalue() def _xlsx_from_job(job: JobState) -> bytes: """Write the variant table to a production-quality .xlsx workbook. Two sheets: * "Variants" — the table itself, with bold-colored header row, frozen top row, auto-filter, column widths sized per content type, number formatting per column (3 decimals for fitness, 1 for GC/Tm, integer for bp), and horizontally-clamped sequence columns so the long Mutant_AA_Seq / Optimized_DNA_Seq cells don't visually drown out everything else. * "Run info" — key-value pairs of every parameter the engine used, plus the auto-generated methods paragraph for copy-pasting into a Materials & Methods section. Requires ``openpyxl``. Returns raw bytes for the Flask handler to stream. """ import openpyxl from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter wb = openpyxl.Workbook() ws = wb.active ws.title = "Variants" variants = job.variants or [] if not variants: wb.remove(ws) ws = wb.create_sheet("Variants") ws["A1"] = "No variants in this run." buf = io.BytesIO() wb.save(buf) buf.seek(0) return buf.getvalue() headers = list(variants[0].keys()) # Header row — bold white on deep brand background, all caps, slightly # taller so it reads as a real header band. header_fill = PatternFill("solid", fgColor="0A5F77") header_font = Font(name="Inter", size=11, bold=True, color="FFFFFF") header_align = Alignment(horizontal="left", vertical="center", wrap_text=False) body_font = Font(name="Inter", size=10) body_align = Alignment(horizontal="left", vertical="top", wrap_text=False) num_align = Alignment(horizontal="right", vertical="top") mono_font = Font(name="JetBrains Mono", size=9) band_fill = PatternFill("solid", fgColor="F4F6F9") # alternating row color thin = Side(style="thin", color="E0E5EC") border = Border(left=thin, right=thin, top=thin, bottom=thin) # Column geometry — width sized to typical content per column type. # Number formats keyed to specific column names (rest = text). COL_WIDTH = { "Variant_ID": 11, "Mutations_AA": 28, "Mutant_AA_Seq": 52, "Optimized_DNA_Seq": 70, "Predicted_Fitness_Score": 14, "Length_bp": 9, "GC_Percent": 10, "Primer_Fwd": 32, "Primer_Fwd_Tm_C": 12, "Primer_Fwd_GC_Percent": 14, "Primer_Rev": 32, "Primer_Rev_Tm_C": 12, "Primer_Rev_GC_Percent": 14, "Annealing_Temp_C": 14, "Restriction_Sites_Found": 14, "Restriction_Sites_Unresolved": 16, } NUM_FORMAT = { "Predicted_Fitness_Score": "0.000", "GC_Percent": "0.0", "Primer_Fwd_Tm_C": "0.0", "Primer_Fwd_GC_Percent": "0.0", "Primer_Rev_Tm_C": "0.0", "Primer_Rev_GC_Percent": "0.0", "Annealing_Temp_C": "0.0", "Length_bp": "0", "Restriction_Sites_Found": "0", "Restriction_Sites_Unresolved": "0", } MONO_COLS = {"Mutations_AA", "Mutant_AA_Seq", "Optimized_DNA_Seq", "Primer_Fwd", "Primer_Rev", "Variant_ID"} NUM_COLS = set(NUM_FORMAT.keys()) # Write header row. for col_idx, col in enumerate(headers, start=1): cell = ws.cell(row=1, column=col_idx, value=col) cell.fill = header_fill cell.font = header_font cell.alignment = header_align cell.border = border ws.row_dimensions[1].height = 22 # Write data rows with alternating band fill + column-specific formatting. for row_idx, variant in enumerate(variants, start=2): is_band = (row_idx % 2 == 0) for col_idx, col in enumerate(headers, start=1): val = variant.get(col) cell = ws.cell(row=row_idx, column=col_idx, value=val) cell.border = border if is_band: cell.fill = band_fill if col in NUM_COLS: cell.alignment = num_align cell.number_format = NUM_FORMAT[col] cell.font = body_font elif col in MONO_COLS: cell.alignment = body_align cell.font = mono_font else: cell.alignment = body_align cell.font = body_font # Column widths. for col_idx, col in enumerate(headers, start=1): width = COL_WIDTH.get(col, 14) ws.column_dimensions[get_column_letter(col_idx)].width = width # Freeze the header row + first two columns so Variant_ID + Mutations stay # in view while horizontally scrolling through primer + sequence columns. ws.freeze_panes = "C2" # Auto-filter the whole used range so reviewers can sort/filter natively. ws.auto_filter.ref = ws.dimensions # ---- Sheet 2: Run info ------------------------------------------------ info = wb.create_sheet("Run info") label_font = Font(name="Inter", size=11, bold=True, color="0A5F77") value_font = Font(name="Inter", size=11) methods_font = Font(name="Inter", size=10) s = job.settings_used or {} model_label = { "small": "ESM-2 35M (facebook/esm2_t12_35M_UR50D)", "medium": "ESM-2 650M (facebook/esm2_t33_650M_UR50D)", "large": "ESM-2 3B (facebook/esm2_t36_3B_UR50D)", }.get(s.get("model"), s.get("model", "")) host_label = { "e_coli": "Escherichia coli", "yeast": "Saccharomyces cerevisiae", "human": "Homo sapiens", }.get(s.get("host"), s.get("host", "")) rows = [ ("Wild-type identifier", job.wt_identifier), ("Wild-type length", f"{len(job.wt_protein)} aa"), ("Model", model_label), ("Percentile cutoff", f"≥ p{s.get('percentile')} (top {100 - float(s.get('percentile', 85)):.0f}%)"), ("K variants returned", s.get("k")), ("Min mutations / variant", s.get("min_mutations")), ("Max mutations / variant", s.get("max_mutations")), ("SA restarts", s.get("restarts")), ("SA steps / restart", s.get("steps_per_restart")), ("Expression host", host_label), ("Compute device", s.get("device")), ("Random seed", s.get("seed") if s.get("seed") is not None else "(random)"), ("CDS feature picked", s.get("cds_feature") or "(none / single CDS)"), ("Run started (epoch s)", job.started_at), ("Wall time (s)", job.elapsed()), ] for i, (k, v) in enumerate(rows, start=1): info.cell(row=i, column=1, value=k).font = label_font info.cell(row=i, column=2, value=v).font = value_font # Methods paragraph info.cell(row=len(rows) + 2, column=1, value="Materials & methods (copy-paste)").font = label_font methods = ( f"Variants of {job.wt_identifier} ({len(job.wt_protein)} aa) were designed " f"in silico with {model_label} (Lin et al., Science 2022) using the " f"wild-type marginal log-likelihood scoring scheme of Meier et al. " f"(Adv. Neural Inf. Process. Syst. 2021). Single-point substitutions " f"in the top {100 - float(s.get('percentile', 85)):.0f}% by ΔLL were " f"retained as the combinatorial search space. {s.get('k')} multi-mutant " f"variants with {s.get('min_mutations')}–{s.get('max_mutations')} " f"simultaneous substitutions were generated by simulated annealing " f"({s.get('restarts')} restarts × {s.get('steps_per_restart')} steps, " f"geometric cooling) maximizing cumulative ΔLL with stop-codon and " f"duplicate-position penalties. Optimized DNA was reverse-translated " f"using {host_label} codon-usage frequencies and synonymously cleaned " f"of BsaI, BsmBI, and NotI recognition sites for Golden Gate " f"compatibility." ) mcell = info.cell(row=len(rows) + 3, column=1, value=methods) mcell.font = methods_font mcell.alignment = Alignment(wrap_text=True, vertical="top") info.merge_cells(start_row=len(rows) + 3, start_column=1, end_row=len(rows) + 3, end_column=4) info.row_dimensions[len(rows) + 3].height = 140 info.column_dimensions["A"].width = 32 info.column_dimensions["B"].width = 48 info.column_dimensions["C"].width = 16 info.column_dimensions["D"].width = 16 buf = io.BytesIO() wb.save(buf) buf.seek(0) return buf.getvalue() def _json_from_job(job: JobState) -> bytes: """Compact, programmer-friendly JSON dump of the run.""" payload = { "wt_identifier": job.wt_identifier, "wt_protein": job.wt_protein, "wt_protein_length": len(job.wt_protein), "variants": job.variants or [], } return json.dumps(payload, indent=2).encode("utf-8") def _tsv_from_job(job: JobState) -> bytes: """Tab-separated equivalent of the CSV — friendlier for some analysis pipelines (R, awk, Perl) that hate quoted commas.""" import pandas as pd df = pd.DataFrame(job.variants or []) return df.to_csv(sep="\t", index=False).encode("utf-8") # Format registry — keyed by the value of ``?format=``. Each entry produces # (payload, mimetype, file-extension). New formats are a one-line addition. def _render_download(job: JobState, fmt: str): if fmt == "csv": # Excel in many European locales (incl. the user's Georgian Excel) # treats CSV with ',' delimiter as a single column unless told # otherwise. We prepend Excel's "sep=," hint line and a UTF-8 BOM so # double-clicking the file opens it with proper columns everywhere. with open(job.csv_path, "rb") as f: raw = f.read() decoded = raw.decode("utf-8", errors="replace") bom = b"\xef\xbb\xbf" body = (b"sep=,\r\n" + decoded.encode("utf-8")) return bom + body, "text/csv", "csv" if fmt == "tsv": return _tsv_from_job(job), "text/tab-separated-values", "tsv" if fmt == "fasta": return _fasta_from_job(job, of="protein").encode("utf-8"), "text/x-fasta", "fasta" if fmt == "fasta-dna": return _fasta_from_job(job, of="dna").encode("utf-8"), "text/x-fasta", "fna" if fmt in ("gb", "genbank"): return _genbank_from_job(job).encode("utf-8"), "text/x-genbank", "gb" if fmt in ("xlsx", "excel"): return ( _xlsx_from_job(job), "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "xlsx", ) if fmt == "json": return _json_from_job(job), "application/json", "json" raise ValueError( "format must be one of: csv, tsv, fasta, fasta-dna, gb, xlsx, json" ) # ----------------------------------------------------------------- CLI entry def serve(host: Optional[str] = None, port: Optional[int] = None) -> None: """Run the Flask server. Honors ``HOST`` and ``PORT`` env vars first (so the same image runs under Hugging Face Spaces' fixed ``PORT=7860`` contract, Fly.io's ``8080``, Render's ``10000``, etc.) before falling back to local-dev defaults (``127.0.0.1:4789``). """ env_host = os.environ.get("HOST") env_port = os.environ.get("PORT") host = host or env_host or "127.0.0.1" port = port or (int(env_port) if env_port else 4789) logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s", ) app = create_app() logger.info("Digital Evolution Engine listening on http://%s:%s", host, port) print(f"DEE_READY http://{host}:{port}", flush=True) app.run(host=host, port=port, debug=False, use_reloader=False, threaded=True) if __name__ == "__main__": serve()