File size: 3,268 Bytes
089d665 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | """Feedback loop — capture user corrections to retrain.
Each piece of feedback is appended to a JSONL ledger:
./gemeo/artifacts/feedback.jsonl
Schema (one JSON per line):
{
"ts": "...",
"twin_id": "gemeo_...",
"case_id": "...",
"kind": "diagnosis|trajectory|drug|trial|next_question|cohort",
"target": {...}, # what the model said
"user_correction": {...}, # what the user said is right
"user_id": "...",
"comment": "..."
}
The training pipelines in `gemeo/train/` consume this ledger to:
- HGT: re-rank patient embeddings via supervised contrastive (positive=
confirmed-similar, negative=user-rejected)
- TxGNN: hard-negative mining for drug recs the user marked wrong
- TGNN: outcome supervision when user provides actual trajectory
This is the closed-loop piece — the feature that turns Gemeo from a
static SOTA model into a *learning* digital twin.
"""
from __future__ import annotations
import os
import json
import logging
from datetime import datetime, timezone
logger = logging.getLogger("gemeo.feedback")
LEDGER_PATH = os.environ.get(
"GEMEO_FEEDBACK_LEDGER",
os.path.join(os.path.dirname(__file__), "artifacts", "feedback.jsonl"),
)
def _ensure_dir():
os.makedirs(os.path.dirname(LEDGER_PATH), exist_ok=True)
def record(
*,
twin_id: str,
kind: str,
target: dict,
user_correction: dict,
case_id: str = None,
user_id: str = None,
comment: str = None,
) -> dict:
"""Append a feedback record. Returns the record dict.
kind ∈ {"diagnosis", "trajectory", "drug", "trial", "next_question", "cohort", "subgraph"}
"""
_ensure_dir()
rec = {
"ts": datetime.now(timezone.utc).isoformat(),
"twin_id": twin_id,
"case_id": case_id,
"kind": kind,
"target": target,
"user_correction": user_correction,
"user_id": user_id,
"comment": comment,
}
try:
with open(LEDGER_PATH, "a") as f:
f.write(json.dumps(rec, default=str) + "\n")
except Exception as e:
logger.error(f"failed to write feedback: {e}")
return rec
def stats() -> dict:
"""Counts per kind — useful for /api/gemeo/health."""
if not os.path.exists(LEDGER_PATH):
return {"total": 0, "by_kind": {}, "ledger": LEDGER_PATH}
counts = {}
n = 0
try:
with open(LEDGER_PATH) as f:
for line in f:
try:
rec = json.loads(line)
k = rec.get("kind", "unknown")
counts[k] = counts.get(k, 0) + 1
n += 1
except Exception:
continue
except Exception as e:
logger.error(f"failed to read ledger: {e}")
return {"total": n, "by_kind": counts, "ledger": LEDGER_PATH}
def iter_records(kind: str = None):
"""Iterator over feedback records — used by training pipelines."""
if not os.path.exists(LEDGER_PATH):
return
with open(LEDGER_PATH) as f:
for line in f:
try:
rec = json.loads(line)
except Exception:
continue
if kind and rec.get("kind") != kind:
continue
yield rec
|