| """Bridge to raras-app graph-ml artifacts. |
| |
| raras-app trains PhenoGNet monthly and writes: |
| /Users/dimas/raras-app/data/graph-ml/biolord_embeddings.npz (768-dim, init) |
| /Users/dimas/raras-app/data/graph-ml/graph_embeddings.npz (64-dim, contrastive GNN) |
| /Users/dimas/raras-app/data/graph-ml/fused_embeddings.npz (3072-dim, final, Neo4j-indexed) |
| /Users/dimas/raras-app/data/graph-ml/node_ids.json (index → ORPHA/HPO/HGNC) |
| /Users/dimas/raras-app/data/graph-ml/hetero_graph.json (edges adjacency) |
| |
| Gemeo loads these read-only. We never retrain inside swarm-py — retrain |
| runs in raras-app's `retrain-scheduled.sh` cron. Phase-2 GNN training |
| lives in `gemeo/train/` and writes its own artifacts under `gemeo/artifacts/`. |
| """ |
| from __future__ import annotations |
| import os |
| import json |
| import logging |
| from functools import lru_cache |
| from typing import Optional |
|
|
| logger = logging.getLogger("gemeo.bridge") |
|
|
| |
| |
| |
| |
| |
| _REPO_DATA = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") |
| RARAS_APP_GRAPH_ML = os.environ.get( |
| "RARAS_APP_GRAPH_ML", |
| _REPO_DATA if os.path.exists(_REPO_DATA) else "/Users/dimas/raras-app/data/graph-ml", |
| ) |
|
|
| |
| FUSED_FNAME = ( |
| "fused_embeddings_fp16.npz" |
| if os.path.exists(os.path.join(RARAS_APP_GRAPH_ML, "fused_embeddings_fp16.npz")) |
| else "fused_embeddings.npz" |
| ) |
| logger.info(f"gemeo.bridge: data dir={RARAS_APP_GRAPH_ML} fused={FUSED_FNAME}") |
|
|
| |
| _ARTIFACT_CACHE: dict = {} |
|
|
|
|
| def _path(name: str) -> str: |
| return os.path.join(RARAS_APP_GRAPH_ML, name) |
|
|
|
|
| def has_raras_artifacts() -> bool: |
| return all( |
| os.path.exists(_path(f)) |
| for f in (FUSED_FNAME, "node_ids.json") |
| ) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def load_node_ids() -> dict: |
| """Returns {'disease': [orpha,...], 'phenotype': [hpo,...], 'gene': [symbol,...]}.""" |
| p = _path("node_ids.json") |
| if not os.path.exists(p): |
| logger.warning(f"node_ids.json not found at {p}") |
| return {"disease": [], "phenotype": [], "gene": []} |
| with open(p) as f: |
| return json.load(f) |
|
|
|
|
| @lru_cache(maxsize=1) |
| def load_node_index() -> dict: |
| """Inverted: {'disease': {orpha: idx}, 'phenotype': {hpo: idx}, 'gene': {symbol: idx}}.""" |
| ids = load_node_ids() |
| return { |
| kind: {nid: i for i, nid in enumerate(lst)} |
| for kind, lst in ids.items() |
| } |
|
|
|
|
| def load_fused_embeddings(): |
| """Returns dict: {'disease': np.ndarray, 'phenotype': np.ndarray, 'gene': np.ndarray} (3072-dim). |
| |
| Auto-detects fp16-quantized variant (`fused_embeddings_fp16.npz`, |
| ~41 MB) or the original fp64 (`fused_embeddings.npz`, ~649 MB). |
| The fp16 version is what ships in `gemeo/data/`; fp64 only exists |
| in the dev workstation export from raras-app. |
| """ |
| if "fused" in _ARTIFACT_CACHE: |
| return _ARTIFACT_CACHE["fused"] |
| p = _path(FUSED_FNAME) |
| if not os.path.exists(p): |
| logger.warning(f"{FUSED_FNAME} not found at {p}") |
| return None |
| try: |
| import numpy as np |
| npz = np.load(p) |
| out = {k: npz[k] for k in npz.files} |
| _ARTIFACT_CACHE["fused"] = out |
| return out |
| except Exception as e: |
| logger.error(f"Failed to load fused embeddings: {e}") |
| return None |
|
|
|
|
| def load_graph_embeddings(): |
| """64-dim PhenoGNet embeddings (lighter, for fast in-memory ops).""" |
| if "graph" in _ARTIFACT_CACHE: |
| return _ARTIFACT_CACHE["graph"] |
| p = _path("graph_embeddings.npz") |
| if not os.path.exists(p): |
| return None |
| try: |
| import numpy as np |
| npz = np.load(p) |
| out = {k: npz[k] for k in npz.files} |
| _ARTIFACT_CACHE["graph"] = out |
| return out |
| except Exception as e: |
| logger.error(f"Failed to load graph embeddings: {e}") |
| return None |
|
|
|
|
| @lru_cache(maxsize=1) |
| def load_hetero_graph() -> Optional[dict]: |
| """The exported heterogeneous graph — node counts and edges by relation.""" |
| p = _path("hetero_graph.json") |
| if not os.path.exists(p): |
| return None |
| try: |
| with open(p) as f: |
| return json.load(f) |
| except Exception as e: |
| logger.error(f"Failed to load hetero_graph.json: {e}") |
| return None |
|
|
|
|
| def lookup_disease_embedding(orpha: str, kind: str = "fused"): |
| emb = load_fused_embeddings() if kind == "fused" else load_graph_embeddings() |
| if emb is None: |
| return None |
| idx = load_node_index().get("disease", {}).get(orpha) |
| if idx is None: |
| return None |
| return emb["disease"][idx] |
|
|
|
|
| def lookup_phenotype_embedding(hpo: str, kind: str = "fused"): |
| emb = load_fused_embeddings() if kind == "fused" else load_graph_embeddings() |
| if emb is None: |
| return None |
| idx = load_node_index().get("phenotype", {}).get(hpo) |
| if idx is None: |
| return None |
| return emb["phenotype"][idx] |
|
|
|
|
| def lookup_gene_embedding(symbol: str, kind: str = "fused"): |
| emb = load_fused_embeddings() if kind == "fused" else load_graph_embeddings() |
| if emb is None: |
| return None |
| idx = load_node_index().get("gene", {}).get(symbol) |
| if idx is None: |
| return None |
| return emb["gene"][idx] |
|
|
|
|
| def stats() -> dict: |
| """Diagnostic info for /api/gemeo/health.""" |
| out = { |
| "graph_ml_dir": RARAS_APP_GRAPH_ML, |
| "available": has_raras_artifacts(), |
| "fused_loaded": "fused" in _ARTIFACT_CACHE, |
| "graph_loaded": "graph" in _ARTIFACT_CACHE, |
| } |
| ids = load_node_ids() |
| out["n_diseases"] = len(ids.get("disease", [])) |
| out["n_phenotypes"] = len(ids.get("phenotype", [])) |
| out["n_genes"] = len(ids.get("gene", [])) |
| fused = load_fused_embeddings() |
| if fused is not None: |
| out["fused_dim"] = int(fused["disease"].shape[1]) if "disease" in fused else 0 |
| return out |
|
|