| """Q-FENG β Ontological Friction quantifier for Gemeo recommendations. |
| |
| Implements the core mathematics of Kaminski (2026) "Quantum-Fractal |
| Neurosymbolic Governance" as an operational module on top of Gemeo's |
| existing patient embedding and PCDT corpus. |
| |
| Three exports: |
| - `ontological_friction(psi_N, psi_S)` β angle ΞΈ in [0, Ο] |
| - `born_rule(alpha, beta, theta)` β action probability with interference |
| - `circuit_breaker(theta, threshold=2.5)` β bool, True = block |
| - `assess_recommendation(recommendation, orpha, ...)` β QFengAssessment |
| |
| Definition recap (from Kaminski 2026, Β§2.1): |
| |Dβ© = Ξ±|Ο_Nβ© + Ξ²|Ο_Sβ© |
| P(action) = |Ξ±|Β² + |Ξ²|Β² + 2|Ξ±||Ξ²|Β·cos(ΞΈ) |
| ΞΈ = arccos(β¨Ο_N|Ο_Sβ© / (βΟ_NβΒ·βΟ_Sβ)) |
| |
| Constructive interference (ΞΈ β 0): neural prediction aligns with |
| norm β action allowed. Destructive interference (ΞΈ β Ο): neural |
| prediction conflicts with norm β Circuit Breaker triggers. |
| """ |
| from __future__ import annotations |
| import logging |
| import math |
| import os |
| from dataclasses import dataclass, field |
| from functools import lru_cache |
| from typing import Optional |
|
|
| import numpy as np |
|
|
| logger = logging.getLogger("gemeo.qfeng") |
|
|
| |
| |
| |
| DEFAULT_THETA_THRESHOLD = 2.40 |
|
|
| |
| YELLOW_LO = 1.0 |
|
|
|
|
| |
|
|
| def _normalize(v: np.ndarray) -> np.ndarray: |
| n = float(np.linalg.norm(v)) |
| return v / n if n > 1e-12 else v |
|
|
|
|
| def ontological_friction(psi_N: np.ndarray, psi_S: np.ndarray) -> float: |
| """Compute ΞΈ = arccos(β¨Ο_N|Ο_Sβ© / (βΟ_NβΒ·βΟ_Sβ)) β [0, Ο]. |
| |
| Args: |
| psi_N: Neural Evidence Vector (the recommendation embedding). |
| psi_S: Symbolic Norm Vector (the PCDT/regulatory embedding). |
| |
| Returns: |
| Friction angle ΞΈ in radians, [0, Ο]. |
| """ |
| if psi_N is None or psi_S is None: |
| return float("nan") |
| psi_N = np.asarray(psi_N, dtype=np.float64).ravel() |
| psi_S = np.asarray(psi_S, dtype=np.float64).ravel() |
| if psi_N.shape != psi_S.shape: |
| |
| d = min(psi_N.shape[0], psi_S.shape[0]) |
| psi_N = psi_N[:d] |
| psi_S = psi_S[:d] |
| a = _normalize(psi_N) |
| b = _normalize(psi_S) |
| dot = float(np.clip(np.dot(a, b), -1.0, 1.0)) |
| return float(math.acos(dot)) |
|
|
|
|
| def born_rule(alpha: float, beta: float, theta: float) -> float: |
| """P(Action) = |Ξ±|Β² + |Ξ²|Β² + 2|Ξ±||Ξ²|Β·cos(ΞΈ) β Born rule with interference. |
| |
| Note: in QDT, |Ξ±|Β² + |Ξ²|Β² should be β€ 1 (the cross-term is the |
| interference correction). For decision-making we report the cross-term |
| explicitly so callers can interpret constructive/destructive contribution. |
| """ |
| a = abs(alpha); b = abs(beta) |
| return float(a * a + b * b + 2 * a * b * math.cos(theta)) |
|
|
|
|
| def interference_term(alpha: float, beta: float, theta: float) -> float: |
| """Just the cross-term: 2|Ξ±||Ξ²|Β·cos(ΞΈ). Negative = destructive.""" |
| return float(2 * abs(alpha) * abs(beta) * math.cos(theta)) |
|
|
|
|
| def circuit_breaker(theta: float, threshold: float = DEFAULT_THETA_THRESHOLD) -> bool: |
| """Return True (block action) when ΞΈ β₯ threshold (destructive interference).""" |
| return theta is not None and not math.isnan(theta) and theta >= threshold |
|
|
|
|
| def friction_zone(theta: float, *, yellow_lo: float = YELLOW_LO, |
| red_lo: float = DEFAULT_THETA_THRESHOLD) -> str: |
| """Return 'green' | 'yellow' | 'red' for a friction angle.""" |
| if theta is None or math.isnan(theta): return "unknown" |
| if theta < yellow_lo: return "green" |
| if theta < red_lo: return "yellow" |
| return "red" |
|
|
|
|
| |
|
|
| @lru_cache(maxsize=1) |
| def _load_disease_emb_index(): |
| """Load the fused 3072-d disease embeddings index (raras-app graph-ml).""" |
| try: |
| from gemeo.external_kg import load_fused_embeddings |
| kg = load_fused_embeddings() |
| if "disease_emb" in kg and "disease_id2idx" in kg: |
| return kg["disease_emb"], kg["disease_id2idx"] |
| except Exception as e: |
| logger.debug(f"external_kg unavailable: {e}") |
| return None, None |
|
|
|
|
| @lru_cache(maxsize=1) |
| def _get_text_encoder(): |
| """Lazy-load a sentence-transformers BioLORD encoder for clinical text. |
| |
| Tier 1: sentence-transformers + FremyCompany/BioLORD-2023 (preferred). |
| Tier 2: sentence-transformers + all-MiniLM-L6-v2 (fallback, lighter). |
| Tier 3: deterministic hash-based pseudo-embedding (offline-only). |
| """ |
| try: |
| from sentence_transformers import SentenceTransformer |
| for model_id in ("FremyCompany/BioLORD-2023", "sentence-transformers/all-MiniLM-L6-v2"): |
| try: |
| m = SentenceTransformer(model_id) |
| logger.info(f"qfeng text encoder: {model_id}") |
| return ("st", m) |
| except Exception as e: |
| logger.debug(f" failed {model_id}: {e}") |
| except ImportError: |
| logger.debug("sentence_transformers not installed; using hash fallback") |
| return ("hash", None) |
|
|
|
|
| def _hash_embed(text: str, dim: int = 768) -> np.ndarray: |
| """Deterministic hash-based pseudo-embedding (offline fallback). |
| Only useful for development; replace with real encoder in production. |
| """ |
| import hashlib |
| rng = np.random.default_rng(int.from_bytes( |
| hashlib.sha256(text.encode()).digest()[:8], "big" |
| )) |
| v = rng.standard_normal(dim) |
| return v / max(1e-12, np.linalg.norm(v)) |
|
|
|
|
| def _embed_text(text: str) -> Optional[np.ndarray]: |
| """Encode arbitrary clinical text into a sentence vector. |
| |
| Returns None for empty/whitespace input. Otherwise tries BioLORD-2023, |
| then MiniLM, then a deterministic hash fallback. |
| """ |
| if not text or not text.strip(): |
| return None |
| kind, model = _get_text_encoder() |
| if kind == "st" and model is not None: |
| try: |
| v = model.encode([text], convert_to_numpy=True, normalize_embeddings=True)[0] |
| return np.asarray(v, dtype=np.float64) |
| except Exception as e: |
| logger.warning(f"sentence-transformer encode failed: {e}; falling back") |
| return _hash_embed(text) |
|
|
|
|
| def _embed_disease_orpha(orpha: str) -> Optional[np.ndarray]: |
| """Look up the fused 3072-d embedding for a disease by ORPHA code.""" |
| if not orpha: |
| return None |
| de, id2idx = _load_disease_emb_index() |
| if de is None: |
| return None |
| key = str(orpha).strip() |
| if key not in id2idx: |
| return None |
| return np.asarray(de[id2idx[key]], dtype=np.float64) |
|
|
|
|
| def _embed_pcdt(orpha: str, pcdt_text: Optional[str]) -> Optional[np.ndarray]: |
| """Build Ο_S from the PCDT text if available, else fall back to the |
| disease's normative embedding (which encodes the protocol-aligned |
| semantics learned during graph-ml training).""" |
| if pcdt_text: |
| v = _embed_text(pcdt_text) |
| if v is not None: |
| return v |
| return _embed_disease_orpha(orpha) |
|
|
|
|
| |
|
|
| @dataclass |
| class QFengAssessment: |
| """Per-recommendation Ontological Friction assessment.""" |
| theta: float |
| zone: str |
| interference: str |
| cross_term: float |
| p_action: float |
| blocked: bool |
| threshold: float = DEFAULT_THETA_THRESHOLD |
| alpha: float = 0.7 |
| beta: float = 0.7 |
| psi_N_dim: int = 0 |
| psi_S_dim: int = 0 |
| notes: list = field(default_factory=list) |
| macro_theta: Optional[float] = None |
| meso_theta: Optional[float] = None |
| micro_theta: Optional[float] = None |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| _PT_PROHIBITION = [ |
| r"contraindica\w*", r"contra-indica\w*", |
| r"n[Γ£a]o\s+deve(?:m)?\s+ser", |
| r"n[Γ£a]o\s+(?:est[Γ‘a])?\s+indicad\w*", |
| r"proibid\w*", r"vedad\w*", |
| ] |
| _PT_INDICATION = [ |
| r"indicad\w*", r"prescri\w+", r"administra\w+", r"iniciar\b", |
| r"manter\b", r"continuar\b", r"dispensa\w*", |
| ] |
|
|
|
|
| def _extract_prohibited_clauses(pcdt_text: str) -> list: |
| """Extract the SUBJECT of each prohibition (what is being forbidden). |
| |
| For PT-BR clinical text the subject sits immediately before the |
| prohibition verb: "X Γ© contraindicado", "X nΓ£o deve ser administrado". |
| We extract the noun phrase to the LEFT of the keyword (up to 80 chars, |
| bounded by sentence delimiters) plus a few tokens to the right for |
| context. |
| """ |
| import re |
| out = [] |
| if not pcdt_text: |
| return out |
| for pat in _PT_PROHIBITION: |
| for m in re.finditer(pat, pcdt_text, re.IGNORECASE): |
| |
| lo = max(0, m.start() - 80) |
| seg = pcdt_text[lo:m.start()] |
| for delim in (". ", "; ", "\n"): |
| pos = seg.rfind(delim) |
| if pos >= 0: |
| seg = seg[pos + len(delim):] |
| break |
| tail = pcdt_text[m.end():m.end() + 40] |
| tail_end = min((len(tail), tail.find("."), tail.find(";"), |
| tail.find("\n"))) if any(c in tail for c in ".;\n") else len(tail) |
| tail_end = max(0, tail_end if isinstance(tail_end, int) else 0) |
| phrase = (seg + " " + pat + " " + tail[:tail_end]).strip() |
| if phrase: |
| out.append(phrase) |
| return out |
|
|
|
|
| def _content_keywords(text: str) -> set: |
| """Extract content-bearing keywords (4+ char alphabetic tokens, lowercased, |
| diacritics-stripped).""" |
| import re, unicodedata |
| nfkd = unicodedata.normalize("NFKD", text or "") |
| ascii_text = "".join(c for c in nfkd if not unicodedata.combining(c)).lower() |
| tokens = re.findall(r"[a-z]{4,}", ascii_text) |
| |
| stop = {"para", "como", "esse", "essa", "nesta", "neste", "deste", "desta", |
| "pelo", "pela", "pelos", "pelas", "deve", "devem", "esta", "este", |
| "with", "from", "that", "this", "have", "than", "then", "into", |
| "when", "such", "while", "after", "their", "where", "which", |
| "ainda", "tambem", "todos", "todas", "outros", "alta", "dose", |
| "anos", "anual"} |
| return {t for t in tokens if t not in stop} |
|
|
|
|
| def _check_violation(recommendation_text: str, pcdt_text: str) -> tuple[bool, list]: |
| """Return (violation_flag, evidence_list). |
| |
| Two-channel deontological check: |
| (1) sentence-level cosine similarity between the recommendation and |
| each prohibition clause in the PCDT; |
| (2) keyword-overlap between the recommendation and the prohibition |
| clause (catches the case where rec mentions a substance/procedure |
| explicitly named as forbidden). |
| A violation is flagged when (sim β₯ 0.30 AND content_overlap β₯ 2 unique |
| tokens) OR (sim β₯ 0.55), and the recommendation contains an indication |
| verb. For production replace with an LLM-as-judge call. |
| """ |
| import re |
| if not recommendation_text or not pcdt_text: |
| return False, [] |
| rec_lower = recommendation_text.lower() |
| indicates = any(re.search(p, rec_lower) for p in _PT_INDICATION) |
| if not indicates: |
| return False, [] |
| prohibitions = _extract_prohibited_clauses(pcdt_text) |
| if not prohibitions: |
| return False, [] |
| rec_emb = _embed_text(recommendation_text) |
| rec_kw = _content_keywords(recommendation_text) |
| evidence = [] |
| flag = False |
| for clause in prohibitions: |
| c_emb = _embed_text(clause) |
| c_kw = _content_keywords(clause) |
| sim = 0.0 |
| if rec_emb is not None and c_emb is not None: |
| sim = float(np.dot(_normalize(rec_emb), _normalize(c_emb))) |
| overlap = rec_kw & c_kw |
| is_violation = (sim >= 0.55) or (sim >= 0.30 and len(overlap) >= 2) |
| if is_violation: |
| evidence.append({ |
| "clause": clause[:160], |
| "similarity": round(sim, 3), |
| "overlap": sorted(overlap)[:6], |
| }) |
| flag = True |
| return flag, evidence |
|
|
|
|
| def assess_recommendation( |
| *, |
| recommendation_text: str, |
| orpha: str, |
| pcdt_text: Optional[str] = None, |
| alpha: float = 0.7, |
| beta: float = 0.7, |
| threshold: float = DEFAULT_THETA_THRESHOLD, |
| ) -> QFengAssessment: |
| """Compute the full Q-FENG assessment for a single recommendation. |
| |
| Args: |
| recommendation_text: free-text description of the proposed action |
| (e.g., "iniciar enzima alfa-galactosidase via CEAF"). |
| orpha: ORPHA code of the disease the recommendation targets. |
| pcdt_text: optional PCDT excerpt describing the normative |
| constraints. If None, the fused disease embedding from |
| raras-app graph-ml is used as a proxy. |
| alpha, beta: weights on neural / symbolic basis vectors. |
| threshold: Circuit Breaker threshold in radians. |
| |
| Returns: |
| QFengAssessment with ΞΈ, zone, P(action), and block flag. |
| """ |
| notes = [] |
| psi_N = _embed_text(recommendation_text) |
| psi_S = _embed_pcdt(orpha, pcdt_text) |
|
|
| if psi_N is None: |
| notes.append("recommendation embedding unavailable") |
| if psi_S is None: |
| notes.append(f"normative embedding unavailable for ORPHA:{orpha}") |
|
|
| if psi_N is None or psi_S is None: |
| return QFengAssessment( |
| theta=float("nan"), zone="unknown", |
| interference="unknown", cross_term=float("nan"), |
| p_action=float("nan"), blocked=False, threshold=threshold, |
| alpha=alpha, beta=beta, notes=notes, |
| ) |
|
|
| theta_topic = ontological_friction(psi_N, psi_S) |
|
|
| |
| violation, evidence = (False, []) |
| if pcdt_text: |
| violation, evidence = _check_violation(recommendation_text, pcdt_text) |
|
|
| |
| theta = math.pi if violation else theta_topic |
|
|
| cross = interference_term(alpha, beta, theta) |
| p = born_rule(alpha, beta, theta) |
| z = friction_zone(theta, red_lo=threshold) |
| if violation: |
| interf = "destructive (deontological violation)" |
| elif cross > 0.05: |
| interf = "constructive" |
| elif cross < -0.05: |
| interf = "destructive" |
| else: |
| interf = "ambiguous" |
| blocked = circuit_breaker(theta, threshold) |
| if violation: |
| notes.append(f"prohibition match: {len(evidence)} clause(s) above sim 0.55") |
| for ev in evidence[:3]: |
| notes.append(f" β³ '{ev['clause']}' (sim={ev['similarity']:.2f})") |
| if blocked: |
| notes.append(f"circuit_breaker fired at ΞΈ={theta:.3f} β₯ {threshold}") |
| return QFengAssessment( |
| theta=theta, zone=z, interference=interf, |
| cross_term=cross, p_action=p, blocked=blocked, threshold=threshold, |
| alpha=alpha, beta=beta, |
| psi_N_dim=int(psi_N.shape[0]), |
| psi_S_dim=int(psi_S.shape[0]), |
| notes=notes, |
| ) |
|
|
|
|
| |
|
|
| def fractal_audit( |
| *, |
| recommendation_text: str, |
| orpha: str, |
| pcdt_text: Optional[str] = None, |
| regulatory_text: Optional[str] = None, |
| model_state_text: Optional[str] = None, |
| alpha: float = 0.7, |
| beta: float = 0.7, |
| threshold: float = DEFAULT_THETA_THRESHOLD, |
| ) -> QFengAssessment: |
| """Compute ΞΈ at 3 scales of Beer's Viable System Model: |
| |
| - macro_theta S5 regulatory (LGPD / EU AI Act / WHO) |
| - meso_theta S4 institutional (PCDT / CEAF / CNES) |
| - micro_theta S1-S3 algorithmic (model self-consistency) |
| |
| The headline `theta` is the meso_theta (PCDT alignment); the macro |
| and micro thetas annotate it for fractal isomorphism. |
| """ |
| base = assess_recommendation( |
| recommendation_text=recommendation_text, |
| orpha=orpha, pcdt_text=pcdt_text, |
| alpha=alpha, beta=beta, threshold=threshold, |
| ) |
| if regulatory_text: |
| psi_N = _embed_text(recommendation_text) |
| psi_S = _embed_text(regulatory_text) |
| if psi_N is not None and psi_S is not None: |
| base.macro_theta = ontological_friction(psi_N, psi_S) |
| if model_state_text: |
| psi_N = _embed_text(recommendation_text) |
| psi_M = _embed_text(model_state_text) |
| if psi_N is not None and psi_M is not None: |
| base.micro_theta = ontological_friction(psi_N, psi_M) |
| base.meso_theta = base.theta |
| return base |
|
|
|
|
| __all__ = [ |
| "ontological_friction", |
| "born_rule", |
| "interference_term", |
| "circuit_breaker", |
| "friction_zone", |
| "assess_recommendation", |
| "fractal_audit", |
| "QFengAssessment", |
| "DEFAULT_THETA_THRESHOLD", |
| ] |
|
|