syntheogenesis / dee /server.py
WINTER4000's picture
Initial deploy · DEE Flask app via Docker SDK
060bb47 verified
"""Local web UI for the Digital Evolution Engine.
Single-user Flask server intended to be launched by the desktop ``.app`` and
opened in the user's browser. Stateful by design (long-running jobs are kept
in memory, results held until a new job replaces them) — this is *not* a
multi-tenant service.
API surface:
POST /api/preview parse a sequence (file upload OR JSON body) and
return its metadata + any CDS features.
POST /api/run enqueue a pipeline job. Returns immediately with
a job_id; the work runs in a background thread.
GET /api/status/:id poll job progress.
GET /api/result/:id fetch the variant table once a job has finished.
GET /api/download/:id?format=csv|fasta download outputs.
POST /api/shutdown cleanly stop the server (used by the desktop app
when the user explicitly quits).
"""
from __future__ import annotations
import hashlib
import io
import json
import logging
import os
import re
import tempfile
import threading
import time
import traceback
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from flask import Flask, Response, jsonify, request, send_file, send_from_directory
from dee.core.codon import (
DEFAULT_FORBIDDEN_SITES,
variants_to_dataframe,
write_library_csv,
)
from dee.core.sequence import (
SequenceValidationError,
find_orfs_in_dna,
list_cds_features,
parse_input,
)
from dee.models.scorer import ESM2Scorer, ScorerConfig, top_percentile_pool
from dee.optimizer.search import SearchConfig, apply_variant, evolve
logger = logging.getLogger("dee.server")
STATIC_DIR = Path(__file__).resolve().parent / "static"
STATE_DIR = Path.home() / ".dee" / "state"
OUTPUT_DIR = Path.home() / ".dee" / "output"
DESKTOP_DIR = Path.home() / "Desktop"
STATE_DIR.mkdir(parents=True, exist_ok=True)
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
def _patch_ssl_for_macos() -> None:
"""Some macOS Python builds ship without a system CA bundle, so
``urllib.request.urlopen`` (which Biopython's NCBIWWW.qblast uses
internally) blows up with ``CERTIFICATE_VERIFY_FAILED: self-signed
certificate in certificate chain`` when it tries to reach
https://blast.ncbi.nlm.nih.gov.
Fix it by installing a default urllib opener whose HTTPS handler uses
certifi's bundled CA roots (certifi is already a transitive dep of
huggingface-hub). Idempotent and safe to call multiple times.
"""
import ssl
import urllib.request
try:
import certifi
except ImportError: # pragma: no cover — certifi is a dep of HF hub
logger.warning("certifi not installed; BLAST may fail on macOS Pythons.")
return
try:
ctx = ssl.create_default_context(cafile=certifi.where())
https_handler = urllib.request.HTTPSHandler(context=ctx)
opener = urllib.request.build_opener(https_handler)
urllib.request.install_opener(opener)
# Also override the module-level default context so callers that go
# around urllib (e.g. http.client direct) still get the certifi roots.
ssl._create_default_https_context = lambda: ssl.create_default_context(
cafile=certifi.where()
)
logger.info("Patched urllib SSL context with certifi CA bundle.")
except Exception as exc: # noqa: BLE001 — best-effort patch
logger.warning("Could not patch SSL CA bundle: %s", exc)
_patch_ssl_for_macos()
# ----------------------------------------------------------------- job state
@dataclass
class JobState:
job_id: str
status: str = "pending"
progress: float = 0.0
message: str = "Queued."
started_at: float = field(default_factory=time.time)
finished_at: Optional[float] = None
wt_identifier: str = ""
wt_protein: str = ""
csv_path: Optional[str] = None
desktop_path: Optional[str] = None
variants: Optional[List[Dict[str, Any]]] = None
error: Optional[str] = None
# Frozen copy of the user's settings + the parameters the engine actually
# used. Surfaces in the UI's metadata pills and the auto-generated
# methods paragraph; also a paper-trail for reproducibility.
settings_used: Dict[str, Any] = field(default_factory=dict)
def elapsed(self) -> float:
end = self.finished_at if self.finished_at else time.time()
return round(end - self.started_at, 1)
def public(self) -> Dict[str, Any]:
return {
"job_id": self.job_id,
"status": self.status,
"progress": round(self.progress, 3),
"message": self.message,
"started_at": self.started_at,
"elapsed_seconds": self.elapsed(),
"wt_identifier": self.wt_identifier,
"wt_protein_length": len(self.wt_protein),
"csv_path": self.csv_path,
"desktop_path": self.desktop_path,
"error": self.error,
}
_JOBS: Dict[str, JobState] = {}
_JOBS_LOCK = threading.Lock()
def _get_job(job_id: str) -> Optional[JobState]:
with _JOBS_LOCK:
return _JOBS.get(job_id)
def _put_job(job: JobState) -> None:
with _JOBS_LOCK:
_JOBS[job.job_id] = job
# ----------------------------------------------------------------- preview cache
# Uploaded files / pasted strings are cached briefly so /api/preview and the
# subsequent /api/run don't have to re-receive the same payload. Keyed by a
# session_id we hand back to the browser.
_SESSIONS: Dict[str, Dict[str, Any]] = {}
_SESSIONS_LOCK = threading.Lock()
# ----------------------------------------------------------------- BLAST jobs
@dataclass
class BlastJob:
"""One asynchronous NCBI-BLAST submission. Keyed by sequence hash so
repeat lookups on the same WT protein return instantly from cache."""
job_id: str
seq_hash: str
status: str = "pending"
started_at: float = field(default_factory=time.time)
finished_at: Optional[float] = None
hits: List[Dict[str, Any]] = field(default_factory=list)
error: Optional[str] = None
def elapsed(self) -> float:
end = self.finished_at if self.finished_at else time.time()
return round(end - self.started_at, 1)
def public(self) -> Dict[str, Any]:
return {
"job_id": self.job_id,
"status": self.status,
"elapsed_seconds": self.elapsed(),
"hits": self.hits,
"error": self.error,
}
_BLAST_CACHE: Dict[str, BlastJob] = {} # seq_hash -> BlastJob
_BLAST_BY_ID: Dict[str, BlastJob] = {} # job_id -> BlastJob
_BLAST_LOCK = threading.Lock()
def _hash_sequence(protein: str) -> str:
return hashlib.sha256(protein.encode("utf-8")).hexdigest()[:16]
def _extract_organism(hit_def: str) -> str:
"""NCBI hit descriptions look like 'GFP [Aequorea victoria]'. Pull out
the bracketed organism if present; fall back to empty string."""
m = re.search(r"\[([^\]]+)\]", hit_def or "")
return m.group(1) if m else ""
def _trim_description(hit_def: str) -> str:
"""Strip the trailing '[organism]' from the description for cleaner
display — the organism is shown separately."""
return re.sub(r"\s*\[[^\]]+\]\s*$", "", hit_def or "").strip()
def _extract_uniprot(alignment) -> Optional[str]:
"""Pull a UniProt accession out of an NCBI BLAST alignment if there is one.
NCBI hit_id strings look like:
``sp|P12345|HUMAN_GENE`` (SwissProt — has UniProt accession)
``ref|NP_001234.1|`` (RefSeq — no direct UniProt mapping)
``pdb|1ABC|A`` (PDB)
``gb|AAA12345.1|`` (GenBank)
AlphaFold-DB only indexes UniProt entries, so we only return something
when we see ``sp|`` (SwissProt) or ``tr|`` (TrEMBL) prefixes.
"""
hit_id = getattr(alignment, "hit_id", "") or ""
for token in hit_id.split(";"):
token = token.strip()
m = re.match(r"^(?:sp|tr)\|([A-Z0-9]+(?:-\d+)?)\|", token)
if m:
return m.group(1)
return None
def _run_blast(job: BlastJob, protein: str) -> None:
"""Submit to NCBI BLAST and parse the top hits.
Uses Biopython's NCBIWWW wrapper (the public web service). Polite delays
and rate limits are imposed by NCBI server-side; we keep hitlist small
and expect strict to minimize bandwidth.
"""
try:
# Biopython is heavy; import lazily so the rest of the server boots
# without a hard Bio dependency at startup.
from Bio.Blast import NCBIWWW, NCBIXML
job.status = "submitting"
result_handle = NCBIWWW.qblast(
program="blastp",
database="nr",
sequence=protein,
hitlist_size=5,
expect=1e-5,
)
job.status = "parsing"
for record in NCBIXML.parse(result_handle):
for alignment in record.alignments[:5]:
if not alignment.hsps:
continue
# Use the best (lowest-evalue) HSP from each alignment.
hsp = alignment.hsps[0]
identity_pct = round(
100.0 * hsp.identities / max(1, hsp.align_length), 1
)
coverage_pct = round(
100.0 * hsp.align_length / max(1, len(protein)), 1
)
uniprot = _extract_uniprot(alignment)
job.hits.append(
{
"accession": getattr(alignment, "accession", "") or "",
"hit_id": getattr(alignment, "hit_id", "") or "",
"description": _trim_description(alignment.hit_def),
"organism": _extract_organism(alignment.hit_def),
"length": alignment.length,
"identity_pct": identity_pct,
"coverage_pct": coverage_pct,
"evalue": float(hsp.expect),
"bit_score": float(hsp.bits),
# UniProt accession when available — gates the
# AlphaFold-DB structure embed on the frontend.
"uniprot": uniprot,
"alphafold_url": (
f"https://alphafold.ebi.ac.uk/files/AF-{uniprot}-F1-model_v4.pdb"
if uniprot else None
),
"alphafold_page": (
f"https://alphafold.ebi.ac.uk/entry/{uniprot}"
if uniprot else None
),
}
)
break # only one query record
job.status = "done"
except Exception as exc: # noqa: BLE001 — surface anything as an error
logger.exception("BLAST failed.")
job.status = "error"
job.error = f"{type(exc).__name__}: {exc}"
finally:
job.finished_at = time.time()
def _new_session(payload: Dict[str, Any]) -> str:
sid = uuid.uuid4().hex[:12]
payload["created_at"] = time.time()
with _SESSIONS_LOCK:
_SESSIONS[sid] = payload
# Garbage-collect anything older than 30 min.
cutoff = time.time() - 30 * 60
stale = [k for k, v in _SESSIONS.items() if v.get("created_at", 0) < cutoff]
for k in stale:
_SESSIONS.pop(k, None)
return sid
def _get_session(sid: str) -> Optional[Dict[str, Any]]:
with _SESSIONS_LOCK:
return _SESSIONS.get(sid)
# ----------------------------------------------------------------- Flask app
def create_app() -> Flask:
app = Flask(__name__, static_folder=None)
@app.get("/")
def index() -> Response:
return send_from_directory(STATIC_DIR, "index.html")
@app.get("/static/<path:filename>")
def static_file(filename: str) -> Response:
return send_from_directory(STATIC_DIR, filename)
@app.post("/api/preview")
def preview() -> Response:
"""Accept a sequence and report what we found (no scoring yet).
Accepts EITHER:
* multipart/form-data with ``file`` field, or
* application/json with ``{"text": "..."}``.
If the input parses cleanly as a single CDS or protein, returns its
metadata. If it's a structured plasmid file or a raw DNA blob with
multiple ORFs, returns a list of choices in ``cds_options`` so the
UI can show a picker before kicking off a run.
"""
try:
tmp_path, original_name = _materialize_input()
payload, session_extras = _summarize(tmp_path, original_name)
sid = _new_session(
{"path": str(tmp_path), "name": original_name, **session_extras}
)
payload["session_id"] = sid
return jsonify(payload)
except SequenceValidationError as exc:
return jsonify(
{
"error": str(exc),
"kind": "validation",
"nt_position": exc.nt_position,
"code": exc.code,
}
), 400
except Exception as exc: # noqa: BLE001 — surface anything else as 500.
logger.exception("Preview failed.")
return jsonify({"error": str(exc), "kind": "internal"}), 500
@app.post("/api/run")
def start_run() -> Response:
body = request.get_json(force=True, silent=True) or {}
sid = body.get("session_id")
if not sid:
return jsonify({"error": "missing session_id"}), 400
session = _get_session(sid)
if not session:
return jsonify({"error": "session expired; re-upload the sequence"}), 410
settings = body.get("settings") or {}
cds_feature = body.get("cds_feature")
job = JobState(job_id=uuid.uuid4().hex[:12])
_put_job(job)
# If the session has an ORF map and the user picked an ORF, materialize
# that ORF's DNA into a fresh temp file and use it as the pipeline input.
run_path = Path(session["path"])
run_name = session["name"]
orf_map = session.get("orfs") or {}
if orf_map and cds_feature and cds_feature in orf_map:
tmp = tempfile.NamedTemporaryFile(
delete=False, suffix=".fasta", mode="w", encoding="utf-8",
dir=str(STATE_DIR),
)
tmp.write(f">{cds_feature}\n{orf_map[cds_feature]}\n")
tmp.close()
run_path = Path(tmp.name)
run_name = f"{Path(run_name).stem}_{cds_feature}"
# Already extracted a clean ORF — don't re-pass the label downstream.
cds_feature = None
thread = threading.Thread(
target=_run_pipeline,
args=(job, run_path, run_name, cds_feature, settings),
daemon=True,
)
thread.start()
return jsonify({"job_id": job.job_id})
@app.get("/api/status/<job_id>")
def status(job_id: str) -> Response:
job = _get_job(job_id)
if not job:
return jsonify({"error": "unknown job"}), 404
return jsonify(job.public())
@app.get("/api/result/<job_id>")
def result(job_id: str) -> Response:
job = _get_job(job_id)
if not job:
return jsonify({"error": "unknown job"}), 404
if job.status != "done":
return jsonify({"error": f"job not done (status={job.status})"}), 409
return jsonify(
{
"wt_identifier": job.wt_identifier,
"wt_protein": job.wt_protein,
"variants": job.variants or [],
"csv_path": job.csv_path,
"desktop_path": job.desktop_path,
"settings_used": job.settings_used,
"started_at": job.started_at,
"elapsed_seconds": job.elapsed(),
}
)
@app.get("/api/download/<job_id>")
def download(job_id: str) -> Response:
job = _get_job(job_id)
if not job or not job.csv_path:
return jsonify({"error": "no result available"}), 404
fmt = request.args.get("format", "csv").lower()
stem = Path(job.csv_path).stem
try:
data, mimetype, ext = _render_download(job, fmt)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
return send_file(
io.BytesIO(data) if isinstance(data, bytes) else data,
mimetype=mimetype,
as_attachment=True,
download_name=f"{stem}.{ext}",
)
@app.post("/api/identify")
def identify_start() -> Response:
"""Kick off an NCBI-BLAST identification for the session's WT protein.
Returns ``{job_id, cached, hits?}``. If we already ran BLAST on this
exact sequence we return the prior result inline with cached=true.
"""
body = request.get_json(force=True, silent=True) or {}
sid = body.get("session_id")
if not sid:
return jsonify({"error": "missing session_id"}), 400
session = _get_session(sid)
if not session:
return jsonify({"error": "session expired"}), 410
cds_feature = body.get("cds_feature")
# Re-derive the protein to BLAST. For sessions with discovered ORFs
# the user may have picked a specific one — use that if provided.
orf_map = session.get("orfs") or {}
if orf_map and cds_feature and cds_feature in orf_map:
tmp_path = Path(STATE_DIR / f"blast_input_{uuid.uuid4().hex[:8]}.fa")
tmp_path.write_text(f">{cds_feature}\n{orf_map[cds_feature]}\n")
record = parse_input(tmp_path, require_start=False, require_stop=False)
tmp_path.unlink(missing_ok=True)
else:
record = parse_input(
Path(session["path"]),
require_start=False,
require_stop=False,
cds_feature=cds_feature,
)
protein = record.protein
if not protein:
return jsonify({"error": "no protein available to identify"}), 400
seq_hash = _hash_sequence(protein)
with _BLAST_LOCK:
cached = _BLAST_CACHE.get(seq_hash)
if cached and cached.status in {"done", "running", "submitting", "parsing", "pending"}:
return jsonify(
{
"job_id": cached.job_id,
"cached": cached.status == "done",
**cached.public(),
}
)
job = BlastJob(job_id=uuid.uuid4().hex[:12], seq_hash=seq_hash)
_BLAST_CACHE[seq_hash] = job
_BLAST_BY_ID[job.job_id] = job
threading.Thread(
target=_run_blast, args=(job, protein), daemon=True
).start()
return jsonify({"job_id": job.job_id, "cached": False, **job.public()})
@app.get("/api/identify/<job_id>")
def identify_status(job_id: str) -> Response:
with _BLAST_LOCK:
job = _BLAST_BY_ID.get(job_id)
if not job:
return jsonify({"error": "unknown job"}), 404
return jsonify(job.public())
@app.get("/api/library")
def library() -> Response:
"""List every variant library CSV under ~/.dee/output/ for the
Library + History pages. Each entry includes filename, on-disk
size, timestamp parsed from the filename, the WT identifier embedded
in the filename, variant count, and the top-fitness number — enough
to populate a list view without re-parsing whole CSVs.
"""
import csv as csv_mod
items: List[Dict[str, Any]] = []
for path in sorted(OUTPUT_DIR.glob("*_dee_library.csv"), reverse=True):
try:
stat = path.stat()
except OSError:
continue
# Filename convention from _run_pipeline:
# {stem}__{wt_id}__{YYYYMMDD_HHMMSS}_dee_library.csv
m = re.match(
r"^(?P<stem>.+?)__(?P<wt>[^_]+(?:_[^_]+)*?)__"
r"(?P<ts>\d{8}_\d{6})_dee_library\.csv$",
path.name,
)
wt_id = m.group("wt") if m else path.stem
ts_raw = m.group("ts") if m else None
iso_ts = None
if ts_raw:
try:
dt = time.strptime(ts_raw, "%Y%m%d_%H%M%S")
iso_ts = time.strftime("%Y-%m-%dT%H:%M:%S", dt)
except ValueError:
pass
# Quick scan: variant count + top fitness from first data row.
variant_count = 0
top_fitness: Optional[float] = None
mutations_preview = ""
try:
with path.open("r", encoding="utf-8") as f:
reader = csv_mod.DictReader(f)
for row in reader:
variant_count += 1
if variant_count == 1:
try:
top_fitness = float(row.get("Predicted_Fitness_Score") or 0.0)
except ValueError:
top_fitness = None
mutations_preview = row.get("Mutations_AA", "")
except Exception as exc: # noqa: BLE001
logger.warning("Could not read %s: %s", path, exc)
items.append(
{
"filename": path.name,
"size_bytes": stat.st_size,
"modified_iso": iso_ts or time.strftime(
"%Y-%m-%dT%H:%M:%S", time.localtime(stat.st_mtime)
),
"wt_identifier": wt_id,
"variant_count": variant_count,
"top_fitness": top_fitness,
"top_mutations": mutations_preview,
}
)
return jsonify({"items": items, "output_dir": str(OUTPUT_DIR)})
@app.get("/api/library/<path:filename>")
def library_download(filename: str) -> Response:
"""Stream a stored library CSV by filename. Anchored to OUTPUT_DIR to
prevent path traversal."""
target = (OUTPUT_DIR / filename).resolve()
if not str(target).startswith(str(OUTPUT_DIR.resolve())) or not target.is_file():
return jsonify({"error": "not found"}), 404
return send_file(
str(target),
mimetype="text/csv",
as_attachment=True,
download_name=target.name,
)
@app.delete("/api/library/<path:filename>")
def library_delete(filename: str) -> Response:
"""Delete a stored library CSV. Same path-traversal guard."""
target = (OUTPUT_DIR / filename).resolve()
if not str(target).startswith(str(OUTPUT_DIR.resolve())) or not target.is_file():
return jsonify({"error": "not found"}), 404
try:
target.unlink()
except OSError as exc:
return jsonify({"error": str(exc)}), 500
return jsonify({"ok": True})
@app.post("/api/shutdown")
def shutdown() -> Response:
# Werkzeug's old shutdown hook is gone in Flask 3 — we exit the
# process directly. The browser tab can stay open; the user clicked
# quit so they're done.
def _bye() -> None:
time.sleep(0.2)
os._exit(0)
threading.Thread(target=_bye, daemon=True).start()
return jsonify({"ok": True})
return app
# ----------------------------------------------------------------- helpers
def _materialize_input() -> tuple[Path, str]:
"""Pull the request's sequence into a temp file we can hand to parse_input."""
if "file" in request.files:
file = request.files["file"]
suffix = Path(file.filename or "").suffix or ".txt"
tmp = tempfile.NamedTemporaryFile(
delete=False, suffix=suffix, dir=str(STATE_DIR)
)
file.save(tmp.name)
tmp.close()
return Path(tmp.name), file.filename or "uploaded"
body = request.get_json(force=True, silent=True) or {}
text = body.get("text", "").strip()
if not text:
raise SequenceValidationError("No sequence provided.")
tmp = tempfile.NamedTemporaryFile(
delete=False, suffix=".txt", mode="w", encoding="utf-8", dir=str(STATE_DIR)
)
tmp.write(text)
tmp.close()
return Path(tmp.name), body.get("name") or "pasted"
def _summarize(path: Path, original_name: str) -> tuple[Dict[str, Any], Dict[str, Any]]:
"""Return (JSON-friendly summary, extra session payload to cache).
Three flow modes:
1. Structured file (.dna/.gb/...) → list annotated CDS features.
2. Raw input parses cleanly as a single CDS or protein → return preview.
3. Raw input fails CDS validation (e.g. user pasted a whole plasmid) →
fall back to a 6-frame ORF scan and return ORF choices.
"""
suffix = path.suffix.lower()
# Mode 1 — annotated structured file.
if suffix in {".dna", ".gb", ".gbk", ".genbank", ".embl"}:
features = list_cds_features(path)
cds_options = [
{"label": label, "length_nt": length} for label, length in features
]
record = parse_input(path, require_start=False, require_stop=False)
return (
{
"original_name": original_name,
"detected_kind": "plasmid",
"identifier": record.identifier,
"protein_length": len(record.protein),
"protein_preview": record.protein[:80]
+ ("…" if len(record.protein) > 80 else ""),
"cds_options": cds_options,
},
{"mode": "structured_file"},
)
# Mode 2 — try clean single-CDS / protein parse.
try:
record = parse_input(path, require_start=False, require_stop=False)
return (
{
"original_name": original_name,
"detected_kind": "protein" if not record.dna else "dna",
"identifier": record.identifier,
"protein_length": len(record.protein),
"protein_preview": record.protein[:80]
+ ("…" if len(record.protein) > 80 else ""),
"cds_options": None,
},
{"mode": "single"},
)
except SequenceValidationError as clean_exc:
# Mode 3 — fall back to 6-frame ORF discovery for raw DNA.
text = path.read_text(errors="ignore")
orfs = find_orfs_in_dna(text)
if not orfs:
# Not recoverable; bubble the original parse error up.
raise clean_exc
cds_options = [
{
"label": o.label,
"length_nt": len(o.dna),
"frame": o.frame,
"protein_length": len(o.protein),
}
for o in orfs
]
return (
{
"original_name": original_name,
"detected_kind": "multi_orf",
"identifier": f"{Path(original_name).stem or 'sequence'} (no annotation)",
"protein_length": 0,
"protein_preview": (
f"{len(orfs)} ORFs detected across 6 reading frames "
"(no annotated CDS features). Pick one below."
),
"cds_options": cds_options,
},
{"mode": "orf", "orfs": {o.label: o.dna for o in orfs}},
)
def _run_pipeline(
job: JobState,
input_path: Path,
original_name: str,
cds_feature: Optional[str],
settings: Dict[str, Any],
) -> None:
"""Background worker. Updates ``job`` in place as it makes progress."""
try:
job.status = "parsing"
job.message = "Reading sequence…"
job.progress = 0.05
record = parse_input(
input_path,
require_start=False,
require_stop=False,
cds_feature=cds_feature,
)
job.wt_identifier = record.identifier
job.wt_protein = record.protein
job.settings_used = {
"model": settings.get("model", "small"),
"percentile": float(settings.get("percentile", 85.0)),
"k": int(settings.get("k", 30)),
"min_mutations": int(settings.get("min_mutations", 2)),
"max_mutations": int(settings.get("max_mutations", 5)),
"restarts": int(settings.get("restarts", 8)),
"steps_per_restart": int(settings.get("steps", 1200)),
"host": settings.get("host", "e_coli"),
"seed": settings.get("seed"),
"device": settings.get("device") or "cpu",
"cds_feature": cds_feature,
}
job.status = "scoring"
job.message = (
f"Scoring {len(record.protein)} residues × 19 substitutions with ESM-2…"
)
job.progress = 0.15
scorer = ESM2Scorer(
ScorerConfig(
model_name=settings.get("model", "small"),
device=settings.get("device"),
quantization=settings.get("quantization"),
)
)
scores_df = scorer.score_all_substitutions(record.protein)
job.progress = 0.55
pool = top_percentile_pool(scores_df, percentile=float(settings.get("percentile", 85.0)))
job.message = f"Filtered to {len(pool)} high-impact single-site mutations."
job.progress = 0.60
job.status = "searching"
job.message = (
f"Optimizing multi-mutants (K={int(settings.get('k', 30))}, "
f"max {int(settings.get('max_mutations', 5))} subs)…"
)
variants = evolve(
pool,
SearchConfig(
k=int(settings.get("k", 30)),
max_mutations=int(settings.get("max_mutations", 5)),
min_mutations=int(settings.get("min_mutations", 2)),
n_restarts=int(settings.get("restarts", 8)),
steps_per_restart=int(settings.get("steps", 1200)),
seed=settings.get("seed"),
),
)
job.progress = 0.85
job.status = "encoding"
job.message = f"Reverse-translating and scrubbing restriction sites for {len(variants)} variants…"
df = variants_to_dataframe(
record.protein,
variants,
host=settings.get("host", "e_coli"),
forbidden_sites=DEFAULT_FORBIDDEN_SITES,
)
job.progress = 0.95
stem = Path(original_name).stem or "library"
safe_id = "".join(c if c.isalnum() else "_" for c in record.identifier) or "wt"
ts = time.strftime("%Y%m%d_%H%M%S")
filename = f"{stem}__{safe_id}__{ts}_dee_library.csv"
primary = OUTPUT_DIR / filename
write_library_csv(df, str(primary))
job.csv_path = str(primary)
# Best-effort copy to Desktop. Requires the user to have granted the
# .app Files & Folders → Desktop access (NSDesktopFolderUsageDescription
# in Info.plist triggers the system prompt).
desktop_dest = DESKTOP_DIR / filename
try:
desktop_dest.write_bytes(primary.read_bytes())
job.desktop_path = str(desktop_dest)
except (PermissionError, OSError) as exc:
logger.warning("Could not copy to Desktop (%s); CSV stays in ~/.dee/output/.", exc)
job.desktop_path = None
job.variants = df.to_dict(orient="records")
job.status = "done"
job.progress = 1.0
job.message = f"Done. {len(variants)} variants ready."
job.finished_at = time.time()
except SequenceValidationError as exc:
job.status = "error"
job.error = f"Couldn't parse input: {exc}"
job.message = job.error
job.finished_at = time.time()
except Exception as exc: # noqa: BLE001
logger.exception("Pipeline crashed.")
job.status = "error"
job.error = f"{type(exc).__name__}: {exc}"
job.message = job.error
job.finished_at = time.time()
def _fasta_from_job(job: JobState, *, of: str = "protein") -> str:
"""Serialize a job's variants as FASTA. ``of`` selects which sequence to
emit per record — protein or DNA. Protein is the more common ask (mutants
aligned against WT); DNA is included for completeness."""
if not job.variants:
return ""
lines: List[str] = []
seq_key = "Mutant_AA_Seq" if of == "protein" else "Optimized_DNA_Seq"
for v in job.variants:
header = (
f">{v.get('Variant_ID')} {v.get('Mutations_AA')} "
f"fitness={v.get('Predicted_Fitness_Score')}"
)
seq = v.get(seq_key, "")
lines.append(header)
for i in range(0, len(seq), 60):
lines.append(seq[i : i + 60])
return "\n".join(lines) + "\n"
def _genbank_from_job(job: JobState) -> str:
"""Build a multi-record GenBank file with one record per variant.
Each record gets a CDS feature whose ``/translation`` is the mutant
protein, plus a misc_feature per substitution so the mutation positions
are visible when opened in SnapGene / Benchling.
"""
from Bio.Seq import Seq
from Bio.SeqFeature import FeatureLocation, SeqFeature
from Bio.SeqRecord import SeqRecord
from Bio import SeqIO
records = []
for v in job.variants or []:
dna = v.get("Optimized_DNA_Seq", "")
protein = v.get("Mutant_AA_Seq", "")
muts = v.get("Mutations_AA", "")
record = SeqRecord(
Seq(dna),
id=v.get("Variant_ID", "variant"),
name=v.get("Variant_ID", "variant"),
description=f"DEE designed variant; mutations={muts}; "
f"fitness={v.get('Predicted_Fitness_Score')}",
annotations={"molecule_type": "DNA", "organism": "synthetic construct"},
)
# Whole-CDS feature with the translation in /translation.
cds_qualifiers = {
"gene": [job.wt_identifier or "designed_variant"],
"product": ["designed variant"],
"note": [f"mutations={muts}"],
}
if protein:
cds_qualifiers["translation"] = [protein]
record.features.append(
SeqFeature(
FeatureLocation(0, len(dna)),
type="CDS",
qualifiers=cds_qualifiers,
)
)
# One misc_feature per substitution at the nucleotide it touches.
for mut in muts.split(","):
mut = mut.strip()
if not mut or len(mut) < 3:
continue
try:
pos_aa = int("".join(c for c in mut[1:-1] if c.isdigit()))
except ValueError:
continue
nt_start = (pos_aa - 1) * 3
nt_end = nt_start + 3
if nt_end > len(dna):
continue
record.features.append(
SeqFeature(
FeatureLocation(nt_start, nt_end),
type="misc_feature",
qualifiers={"label": [mut], "note": [f"Substitution {mut}"]},
)
)
records.append(record)
buf = io.StringIO()
SeqIO.write(records, buf, "genbank")
return buf.getvalue()
def _xlsx_from_job(job: JobState) -> bytes:
"""Write the variant table to a production-quality .xlsx workbook.
Two sheets:
* "Variants" — the table itself, with bold-colored header row, frozen
top row, auto-filter, column widths sized per content type, number
formatting per column (3 decimals for fitness, 1 for GC/Tm, integer
for bp), and horizontally-clamped sequence columns so the long
Mutant_AA_Seq / Optimized_DNA_Seq cells don't visually drown out
everything else.
* "Run info" — key-value pairs of every parameter the engine used,
plus the auto-generated methods paragraph for copy-pasting into a
Materials & Methods section.
Requires ``openpyxl``. Returns raw bytes for the Flask handler to stream.
"""
import openpyxl
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Variants"
variants = job.variants or []
if not variants:
wb.remove(ws)
ws = wb.create_sheet("Variants")
ws["A1"] = "No variants in this run."
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
return buf.getvalue()
headers = list(variants[0].keys())
# Header row — bold white on deep brand background, all caps, slightly
# taller so it reads as a real header band.
header_fill = PatternFill("solid", fgColor="0A5F77")
header_font = Font(name="Inter", size=11, bold=True, color="FFFFFF")
header_align = Alignment(horizontal="left", vertical="center", wrap_text=False)
body_font = Font(name="Inter", size=10)
body_align = Alignment(horizontal="left", vertical="top", wrap_text=False)
num_align = Alignment(horizontal="right", vertical="top")
mono_font = Font(name="JetBrains Mono", size=9)
band_fill = PatternFill("solid", fgColor="F4F6F9") # alternating row color
thin = Side(style="thin", color="E0E5EC")
border = Border(left=thin, right=thin, top=thin, bottom=thin)
# Column geometry — width sized to typical content per column type.
# Number formats keyed to specific column names (rest = text).
COL_WIDTH = {
"Variant_ID": 11,
"Mutations_AA": 28,
"Mutant_AA_Seq": 52,
"Optimized_DNA_Seq": 70,
"Predicted_Fitness_Score": 14,
"Length_bp": 9,
"GC_Percent": 10,
"Primer_Fwd": 32,
"Primer_Fwd_Tm_C": 12,
"Primer_Fwd_GC_Percent": 14,
"Primer_Rev": 32,
"Primer_Rev_Tm_C": 12,
"Primer_Rev_GC_Percent": 14,
"Annealing_Temp_C": 14,
"Restriction_Sites_Found": 14,
"Restriction_Sites_Unresolved": 16,
}
NUM_FORMAT = {
"Predicted_Fitness_Score": "0.000",
"GC_Percent": "0.0",
"Primer_Fwd_Tm_C": "0.0",
"Primer_Fwd_GC_Percent": "0.0",
"Primer_Rev_Tm_C": "0.0",
"Primer_Rev_GC_Percent": "0.0",
"Annealing_Temp_C": "0.0",
"Length_bp": "0",
"Restriction_Sites_Found": "0",
"Restriction_Sites_Unresolved": "0",
}
MONO_COLS = {"Mutations_AA", "Mutant_AA_Seq", "Optimized_DNA_Seq",
"Primer_Fwd", "Primer_Rev", "Variant_ID"}
NUM_COLS = set(NUM_FORMAT.keys())
# Write header row.
for col_idx, col in enumerate(headers, start=1):
cell = ws.cell(row=1, column=col_idx, value=col)
cell.fill = header_fill
cell.font = header_font
cell.alignment = header_align
cell.border = border
ws.row_dimensions[1].height = 22
# Write data rows with alternating band fill + column-specific formatting.
for row_idx, variant in enumerate(variants, start=2):
is_band = (row_idx % 2 == 0)
for col_idx, col in enumerate(headers, start=1):
val = variant.get(col)
cell = ws.cell(row=row_idx, column=col_idx, value=val)
cell.border = border
if is_band:
cell.fill = band_fill
if col in NUM_COLS:
cell.alignment = num_align
cell.number_format = NUM_FORMAT[col]
cell.font = body_font
elif col in MONO_COLS:
cell.alignment = body_align
cell.font = mono_font
else:
cell.alignment = body_align
cell.font = body_font
# Column widths.
for col_idx, col in enumerate(headers, start=1):
width = COL_WIDTH.get(col, 14)
ws.column_dimensions[get_column_letter(col_idx)].width = width
# Freeze the header row + first two columns so Variant_ID + Mutations stay
# in view while horizontally scrolling through primer + sequence columns.
ws.freeze_panes = "C2"
# Auto-filter the whole used range so reviewers can sort/filter natively.
ws.auto_filter.ref = ws.dimensions
# ---- Sheet 2: Run info ------------------------------------------------
info = wb.create_sheet("Run info")
label_font = Font(name="Inter", size=11, bold=True, color="0A5F77")
value_font = Font(name="Inter", size=11)
methods_font = Font(name="Inter", size=10)
s = job.settings_used or {}
model_label = {
"small": "ESM-2 35M (facebook/esm2_t12_35M_UR50D)",
"medium": "ESM-2 650M (facebook/esm2_t33_650M_UR50D)",
"large": "ESM-2 3B (facebook/esm2_t36_3B_UR50D)",
}.get(s.get("model"), s.get("model", ""))
host_label = {
"e_coli": "Escherichia coli",
"yeast": "Saccharomyces cerevisiae",
"human": "Homo sapiens",
}.get(s.get("host"), s.get("host", ""))
rows = [
("Wild-type identifier", job.wt_identifier),
("Wild-type length", f"{len(job.wt_protein)} aa"),
("Model", model_label),
("Percentile cutoff",
f"≥ p{s.get('percentile')} (top {100 - float(s.get('percentile', 85)):.0f}%)"),
("K variants returned", s.get("k")),
("Min mutations / variant", s.get("min_mutations")),
("Max mutations / variant", s.get("max_mutations")),
("SA restarts", s.get("restarts")),
("SA steps / restart", s.get("steps_per_restart")),
("Expression host", host_label),
("Compute device", s.get("device")),
("Random seed", s.get("seed") if s.get("seed") is not None else "(random)"),
("CDS feature picked", s.get("cds_feature") or "(none / single CDS)"),
("Run started (epoch s)", job.started_at),
("Wall time (s)", job.elapsed()),
]
for i, (k, v) in enumerate(rows, start=1):
info.cell(row=i, column=1, value=k).font = label_font
info.cell(row=i, column=2, value=v).font = value_font
# Methods paragraph
info.cell(row=len(rows) + 2, column=1, value="Materials & methods (copy-paste)").font = label_font
methods = (
f"Variants of {job.wt_identifier} ({len(job.wt_protein)} aa) were designed "
f"in silico with {model_label} (Lin et al., Science 2022) using the "
f"wild-type marginal log-likelihood scoring scheme of Meier et al. "
f"(Adv. Neural Inf. Process. Syst. 2021). Single-point substitutions "
f"in the top {100 - float(s.get('percentile', 85)):.0f}% by ΔLL were "
f"retained as the combinatorial search space. {s.get('k')} multi-mutant "
f"variants with {s.get('min_mutations')}{s.get('max_mutations')} "
f"simultaneous substitutions were generated by simulated annealing "
f"({s.get('restarts')} restarts × {s.get('steps_per_restart')} steps, "
f"geometric cooling) maximizing cumulative ΔLL with stop-codon and "
f"duplicate-position penalties. Optimized DNA was reverse-translated "
f"using {host_label} codon-usage frequencies and synonymously cleaned "
f"of BsaI, BsmBI, and NotI recognition sites for Golden Gate "
f"compatibility."
)
mcell = info.cell(row=len(rows) + 3, column=1, value=methods)
mcell.font = methods_font
mcell.alignment = Alignment(wrap_text=True, vertical="top")
info.merge_cells(start_row=len(rows) + 3, start_column=1, end_row=len(rows) + 3, end_column=4)
info.row_dimensions[len(rows) + 3].height = 140
info.column_dimensions["A"].width = 32
info.column_dimensions["B"].width = 48
info.column_dimensions["C"].width = 16
info.column_dimensions["D"].width = 16
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
return buf.getvalue()
def _json_from_job(job: JobState) -> bytes:
"""Compact, programmer-friendly JSON dump of the run."""
payload = {
"wt_identifier": job.wt_identifier,
"wt_protein": job.wt_protein,
"wt_protein_length": len(job.wt_protein),
"variants": job.variants or [],
}
return json.dumps(payload, indent=2).encode("utf-8")
def _tsv_from_job(job: JobState) -> bytes:
"""Tab-separated equivalent of the CSV — friendlier for some analysis
pipelines (R, awk, Perl) that hate quoted commas."""
import pandas as pd
df = pd.DataFrame(job.variants or [])
return df.to_csv(sep="\t", index=False).encode("utf-8")
# Format registry — keyed by the value of ``?format=``. Each entry produces
# (payload, mimetype, file-extension). New formats are a one-line addition.
def _render_download(job: JobState, fmt: str):
if fmt == "csv":
# Excel in many European locales (incl. the user's Georgian Excel)
# treats CSV with ',' delimiter as a single column unless told
# otherwise. We prepend Excel's "sep=," hint line and a UTF-8 BOM so
# double-clicking the file opens it with proper columns everywhere.
with open(job.csv_path, "rb") as f:
raw = f.read()
decoded = raw.decode("utf-8", errors="replace")
bom = b"\xef\xbb\xbf"
body = (b"sep=,\r\n" + decoded.encode("utf-8"))
return bom + body, "text/csv", "csv"
if fmt == "tsv":
return _tsv_from_job(job), "text/tab-separated-values", "tsv"
if fmt == "fasta":
return _fasta_from_job(job, of="protein").encode("utf-8"), "text/x-fasta", "fasta"
if fmt == "fasta-dna":
return _fasta_from_job(job, of="dna").encode("utf-8"), "text/x-fasta", "fna"
if fmt in ("gb", "genbank"):
return _genbank_from_job(job).encode("utf-8"), "text/x-genbank", "gb"
if fmt in ("xlsx", "excel"):
return (
_xlsx_from_job(job),
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"xlsx",
)
if fmt == "json":
return _json_from_job(job), "application/json", "json"
raise ValueError(
"format must be one of: csv, tsv, fasta, fasta-dna, gb, xlsx, json"
)
# ----------------------------------------------------------------- CLI entry
def serve(host: Optional[str] = None, port: Optional[int] = None) -> None:
"""Run the Flask server.
Honors ``HOST`` and ``PORT`` env vars first (so the same image runs under
Hugging Face Spaces' fixed ``PORT=7860`` contract, Fly.io's ``8080``,
Render's ``10000``, etc.) before falling back to local-dev defaults
(``127.0.0.1:4789``).
"""
env_host = os.environ.get("HOST")
env_port = os.environ.get("PORT")
host = host or env_host or "127.0.0.1"
port = port or (int(env_port) if env_port else 4789)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
app = create_app()
logger.info("Digital Evolution Engine listening on http://%s:%s", host, port)
print(f"DEE_READY http://{host}:{port}", flush=True)
app.run(host=host, port=port, debug=False, use_reloader=False, threaded=True)
if __name__ == "__main__":
serve()