File size: 3,268 Bytes
089d665
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Feedback loop — capture user corrections to retrain.

Each piece of feedback is appended to a JSONL ledger:
  ./gemeo/artifacts/feedback.jsonl

Schema (one JSON per line):
  {
    "ts": "...",
    "twin_id": "gemeo_...",
    "case_id": "...",
    "kind": "diagnosis|trajectory|drug|trial|next_question|cohort",
    "target": {...},          # what the model said
    "user_correction": {...}, # what the user said is right
    "user_id": "...",
    "comment": "..."
  }

The training pipelines in `gemeo/train/` consume this ledger to:
  - HGT: re-rank patient embeddings via supervised contrastive (positive=
    confirmed-similar, negative=user-rejected)
  - TxGNN: hard-negative mining for drug recs the user marked wrong
  - TGNN: outcome supervision when user provides actual trajectory

This is the closed-loop piece — the feature that turns Gemeo from a
static SOTA model into a *learning* digital twin.
"""
from __future__ import annotations
import os
import json
import logging
from datetime import datetime, timezone

logger = logging.getLogger("gemeo.feedback")

LEDGER_PATH = os.environ.get(
    "GEMEO_FEEDBACK_LEDGER",
    os.path.join(os.path.dirname(__file__), "artifacts", "feedback.jsonl"),
)


def _ensure_dir():
    os.makedirs(os.path.dirname(LEDGER_PATH), exist_ok=True)


def record(
    *,
    twin_id: str,
    kind: str,
    target: dict,
    user_correction: dict,
    case_id: str = None,
    user_id: str = None,
    comment: str = None,
) -> dict:
    """Append a feedback record. Returns the record dict.

    kind ∈ {"diagnosis", "trajectory", "drug", "trial", "next_question", "cohort", "subgraph"}
    """
    _ensure_dir()
    rec = {
        "ts": datetime.now(timezone.utc).isoformat(),
        "twin_id": twin_id,
        "case_id": case_id,
        "kind": kind,
        "target": target,
        "user_correction": user_correction,
        "user_id": user_id,
        "comment": comment,
    }
    try:
        with open(LEDGER_PATH, "a") as f:
            f.write(json.dumps(rec, default=str) + "\n")
    except Exception as e:
        logger.error(f"failed to write feedback: {e}")
    return rec


def stats() -> dict:
    """Counts per kind — useful for /api/gemeo/health."""
    if not os.path.exists(LEDGER_PATH):
        return {"total": 0, "by_kind": {}, "ledger": LEDGER_PATH}
    counts = {}
    n = 0
    try:
        with open(LEDGER_PATH) as f:
            for line in f:
                try:
                    rec = json.loads(line)
                    k = rec.get("kind", "unknown")
                    counts[k] = counts.get(k, 0) + 1
                    n += 1
                except Exception:
                    continue
    except Exception as e:
        logger.error(f"failed to read ledger: {e}")
    return {"total": n, "by_kind": counts, "ledger": LEDGER_PATH}


def iter_records(kind: str = None):
    """Iterator over feedback records — used by training pipelines."""
    if not os.path.exists(LEDGER_PATH):
        return
    with open(LEDGER_PATH) as f:
        for line in f:
            try:
                rec = json.loads(line)
            except Exception:
                continue
            if kind and rec.get("kind") != kind:
                continue
            yield rec