"""Q-FENG — Ontological Friction quantifier for Gemeo recommendations. Implements the core mathematics of Kaminski (2026) "Quantum-Fractal Neurosymbolic Governance" as an operational module on top of Gemeo's existing patient embedding and PCDT corpus. Three exports: - `ontological_friction(psi_N, psi_S)` → angle θ in [0, π] - `born_rule(alpha, beta, theta)` → action probability with interference - `circuit_breaker(theta, threshold=2.5)` → bool, True = block - `assess_recommendation(recommendation, orpha, ...)` → QFengAssessment Definition recap (from Kaminski 2026, §2.1): |D⟩ = α|ψ_N⟩ + β|ψ_S⟩ P(action) = |α|² + |β|² + 2|α||β|·cos(θ) θ = arccos(⟨ψ_N|ψ_S⟩ / (‖ψ_N‖·‖ψ_S‖)) Constructive interference (θ ≈ 0): neural prediction aligns with norm → action allowed. Destructive interference (θ ≈ π): neural prediction conflicts with norm → Circuit Breaker triggers. """ from __future__ import annotations import logging import math import os from dataclasses import dataclass, field from functools import lru_cache from typing import Optional import numpy as np logger = logging.getLogger("gemeo.qfeng") # Default Circuit Breaker threshold. Empirically chosen at ~140° (≈2.44 rad) # so genuinely orthogonal recommendations (θ=π/2≈1.57) still pass with a # warning, and only strongly destructive ones (θ>2.4) are blocked. DEFAULT_THETA_THRESHOLD = 2.40 # Yellow-zone threshold: 1.0 < θ < 2.40 → flag but allow. YELLOW_LO = 1.0 # ─────────────────────────── Core math ─────────────────────────── def _normalize(v: np.ndarray) -> np.ndarray: n = float(np.linalg.norm(v)) return v / n if n > 1e-12 else v def ontological_friction(psi_N: np.ndarray, psi_S: np.ndarray) -> float: """Compute θ = arccos(⟨ψ_N|ψ_S⟩ / (‖ψ_N‖·‖ψ_S‖)) ∈ [0, π]. Args: psi_N: Neural Evidence Vector (the recommendation embedding). psi_S: Symbolic Norm Vector (the PCDT/regulatory embedding). Returns: Friction angle θ in radians, [0, π]. """ if psi_N is None or psi_S is None: return float("nan") psi_N = np.asarray(psi_N, dtype=np.float64).ravel() psi_S = np.asarray(psi_S, dtype=np.float64).ravel() if psi_N.shape != psi_S.shape: # Project onto shorter dim if mismatched (e.g., 3072 vs 768) d = min(psi_N.shape[0], psi_S.shape[0]) psi_N = psi_N[:d] psi_S = psi_S[:d] a = _normalize(psi_N) b = _normalize(psi_S) dot = float(np.clip(np.dot(a, b), -1.0, 1.0)) return float(math.acos(dot)) def born_rule(alpha: float, beta: float, theta: float) -> float: """P(Action) = |α|² + |β|² + 2|α||β|·cos(θ) — Born rule with interference. Note: in QDT, |α|² + |β|² should be ≤ 1 (the cross-term is the interference correction). For decision-making we report the cross-term explicitly so callers can interpret constructive/destructive contribution. """ a = abs(alpha); b = abs(beta) return float(a * a + b * b + 2 * a * b * math.cos(theta)) def interference_term(alpha: float, beta: float, theta: float) -> float: """Just the cross-term: 2|α||β|·cos(θ). Negative = destructive.""" return float(2 * abs(alpha) * abs(beta) * math.cos(theta)) def circuit_breaker(theta: float, threshold: float = DEFAULT_THETA_THRESHOLD) -> bool: """Return True (block action) when θ ≥ threshold (destructive interference).""" return theta is not None and not math.isnan(theta) and theta >= threshold def friction_zone(theta: float, *, yellow_lo: float = YELLOW_LO, red_lo: float = DEFAULT_THETA_THRESHOLD) -> str: """Return 'green' | 'yellow' | 'red' for a friction angle.""" if theta is None or math.isnan(theta): return "unknown" if theta < yellow_lo: return "green" if theta < red_lo: return "yellow" return "red" # ───────────────────────── Embedding helpers ───────────────────────── @lru_cache(maxsize=1) def _load_disease_emb_index(): """Load the fused 3072-d disease embeddings index (raras-app graph-ml).""" try: from gemeo.external_kg import load_fused_embeddings kg = load_fused_embeddings() if "disease_emb" in kg and "disease_id2idx" in kg: return kg["disease_emb"], kg["disease_id2idx"] except Exception as e: logger.debug(f"external_kg unavailable: {e}") return None, None @lru_cache(maxsize=1) def _get_text_encoder(): """Lazy-load a sentence-transformers BioLORD encoder for clinical text. Tier 1: sentence-transformers + FremyCompany/BioLORD-2023 (preferred). Tier 2: sentence-transformers + all-MiniLM-L6-v2 (fallback, lighter). Tier 3: deterministic hash-based pseudo-embedding (offline-only). """ try: from sentence_transformers import SentenceTransformer for model_id in ("FremyCompany/BioLORD-2023", "sentence-transformers/all-MiniLM-L6-v2"): try: m = SentenceTransformer(model_id) logger.info(f"qfeng text encoder: {model_id}") return ("st", m) except Exception as e: logger.debug(f" failed {model_id}: {e}") except ImportError: logger.debug("sentence_transformers not installed; using hash fallback") return ("hash", None) def _hash_embed(text: str, dim: int = 768) -> np.ndarray: """Deterministic hash-based pseudo-embedding (offline fallback). Only useful for development; replace with real encoder in production. """ import hashlib rng = np.random.default_rng(int.from_bytes( hashlib.sha256(text.encode()).digest()[:8], "big" )) v = rng.standard_normal(dim) return v / max(1e-12, np.linalg.norm(v)) def _embed_text(text: str) -> Optional[np.ndarray]: """Encode arbitrary clinical text into a sentence vector. Returns None for empty/whitespace input. Otherwise tries BioLORD-2023, then MiniLM, then a deterministic hash fallback. """ if not text or not text.strip(): return None kind, model = _get_text_encoder() if kind == "st" and model is not None: try: v = model.encode([text], convert_to_numpy=True, normalize_embeddings=True)[0] return np.asarray(v, dtype=np.float64) except Exception as e: logger.warning(f"sentence-transformer encode failed: {e}; falling back") return _hash_embed(text) def _embed_disease_orpha(orpha: str) -> Optional[np.ndarray]: """Look up the fused 3072-d embedding for a disease by ORPHA code.""" if not orpha: return None de, id2idx = _load_disease_emb_index() if de is None: return None key = str(orpha).strip() if key not in id2idx: return None return np.asarray(de[id2idx[key]], dtype=np.float64) def _embed_pcdt(orpha: str, pcdt_text: Optional[str]) -> Optional[np.ndarray]: """Build ψ_S from the PCDT text if available, else fall back to the disease's normative embedding (which encodes the protocol-aligned semantics learned during graph-ml training).""" if pcdt_text: v = _embed_text(pcdt_text) if v is not None: return v return _embed_disease_orpha(orpha) # ───────────────────────── Assessment dataclass ───────────────────────── @dataclass class QFengAssessment: """Per-recommendation Ontological Friction assessment.""" theta: float # friction angle [0, π] zone: str # "green" | "yellow" | "red" | "unknown" interference: str # "constructive" | "destructive" | "ambiguous" cross_term: float # 2|α||β|·cos(θ) p_action: float # Born rule probability blocked: bool # circuit-breaker fired? threshold: float = DEFAULT_THETA_THRESHOLD alpha: float = 0.7 # weight on neural side beta: float = 0.7 # weight on symbolic side psi_N_dim: int = 0 psi_S_dim: int = 0 notes: list = field(default_factory=list) macro_theta: Optional[float] = None # regulatory layer (LGPD/EU AI Act) meso_theta: Optional[float] = None # institutional (PCDT) micro_theta: Optional[float] = None # algorithmic (model self-consistency) # ─────────────────── Explicit normative violation check ─────────────────── # # Topical embedding similarity (cosθ) cannot, by construction, distinguish # "iniciar fármaco X" from "não iniciar fármaco X" — both cluster by topic. # Q-FENG therefore composes two signals: # # θ_topic — semantic similarity in fused embedding space # violation — boolean from explicit deontological rule check # # When a deontological violation is detected, θ_eff is forced to π # (destructive), regardless of topical similarity. This corresponds to # Kaminski's "destructive interference triggers Circuit Breaker" but # upgraded with a hard rule layer for cases where the embedding manifold # does not separate prescription from prohibition. # Patterns indicating a hard contraindication or explicit prohibition in # PT-BR clinical text. Used both on the PCDT side ("é contraindicado", # "não deve ser administrado") and on the recommendation side (catches # whether the recommendation matches a prohibited intervention). _PT_PROHIBITION = [ r"contraindica\w*", r"contra-indica\w*", r"n[ãa]o\s+deve(?:m)?\s+ser", r"n[ãa]o\s+(?:est[áa])?\s+indicad\w*", r"proibid\w*", r"vedad\w*", ] _PT_INDICATION = [ r"indicad\w*", r"prescri\w+", r"administra\w+", r"iniciar\b", r"manter\b", r"continuar\b", r"dispensa\w*", ] def _extract_prohibited_clauses(pcdt_text: str) -> list: """Extract the SUBJECT of each prohibition (what is being forbidden). For PT-BR clinical text the subject sits immediately before the prohibition verb: "X é contraindicado", "X não deve ser administrado". We extract the noun phrase to the LEFT of the keyword (up to 80 chars, bounded by sentence delimiters) plus a few tokens to the right for context. """ import re out = [] if not pcdt_text: return out for pat in _PT_PROHIBITION: for m in re.finditer(pat, pcdt_text, re.IGNORECASE): # Walk back to the nearest sentence delimiter lo = max(0, m.start() - 80) seg = pcdt_text[lo:m.start()] for delim in (". ", "; ", "\n"): pos = seg.rfind(delim) if pos >= 0: seg = seg[pos + len(delim):] break tail = pcdt_text[m.end():m.end() + 40] tail_end = min((len(tail), tail.find("."), tail.find(";"), tail.find("\n"))) if any(c in tail for c in ".;\n") else len(tail) tail_end = max(0, tail_end if isinstance(tail_end, int) else 0) phrase = (seg + " " + pat + " " + tail[:tail_end]).strip() if phrase: out.append(phrase) return out def _content_keywords(text: str) -> set: """Extract content-bearing keywords (4+ char alphabetic tokens, lowercased, diacritics-stripped).""" import re, unicodedata nfkd = unicodedata.normalize("NFKD", text or "") ascii_text = "".join(c for c in nfkd if not unicodedata.combining(c)).lower() tokens = re.findall(r"[a-z]{4,}", ascii_text) # Common stopwords (PT + EN) stop = {"para", "como", "esse", "essa", "nesta", "neste", "deste", "desta", "pelo", "pela", "pelos", "pelas", "deve", "devem", "esta", "este", "with", "from", "that", "this", "have", "than", "then", "into", "when", "such", "while", "after", "their", "where", "which", "ainda", "tambem", "todos", "todas", "outros", "alta", "dose", "anos", "anual"} return {t for t in tokens if t not in stop} def _check_violation(recommendation_text: str, pcdt_text: str) -> tuple[bool, list]: """Return (violation_flag, evidence_list). Two-channel deontological check: (1) sentence-level cosine similarity between the recommendation and each prohibition clause in the PCDT; (2) keyword-overlap between the recommendation and the prohibition clause (catches the case where rec mentions a substance/procedure explicitly named as forbidden). A violation is flagged when (sim ≥ 0.30 AND content_overlap ≥ 2 unique tokens) OR (sim ≥ 0.55), and the recommendation contains an indication verb. For production replace with an LLM-as-judge call. """ import re if not recommendation_text or not pcdt_text: return False, [] rec_lower = recommendation_text.lower() indicates = any(re.search(p, rec_lower) for p in _PT_INDICATION) if not indicates: return False, [] prohibitions = _extract_prohibited_clauses(pcdt_text) if not prohibitions: return False, [] rec_emb = _embed_text(recommendation_text) rec_kw = _content_keywords(recommendation_text) evidence = [] flag = False for clause in prohibitions: c_emb = _embed_text(clause) c_kw = _content_keywords(clause) sim = 0.0 if rec_emb is not None and c_emb is not None: sim = float(np.dot(_normalize(rec_emb), _normalize(c_emb))) overlap = rec_kw & c_kw is_violation = (sim >= 0.55) or (sim >= 0.30 and len(overlap) >= 2) if is_violation: evidence.append({ "clause": clause[:160], "similarity": round(sim, 3), "overlap": sorted(overlap)[:6], }) flag = True return flag, evidence def assess_recommendation( *, recommendation_text: str, orpha: str, pcdt_text: Optional[str] = None, alpha: float = 0.7, beta: float = 0.7, threshold: float = DEFAULT_THETA_THRESHOLD, ) -> QFengAssessment: """Compute the full Q-FENG assessment for a single recommendation. Args: recommendation_text: free-text description of the proposed action (e.g., "iniciar enzima alfa-galactosidase via CEAF"). orpha: ORPHA code of the disease the recommendation targets. pcdt_text: optional PCDT excerpt describing the normative constraints. If None, the fused disease embedding from raras-app graph-ml is used as a proxy. alpha, beta: weights on neural / symbolic basis vectors. threshold: Circuit Breaker threshold in radians. Returns: QFengAssessment with θ, zone, P(action), and block flag. """ notes = [] psi_N = _embed_text(recommendation_text) psi_S = _embed_pcdt(orpha, pcdt_text) if psi_N is None: notes.append("recommendation embedding unavailable") if psi_S is None: notes.append(f"normative embedding unavailable for ORPHA:{orpha}") if psi_N is None or psi_S is None: return QFengAssessment( theta=float("nan"), zone="unknown", interference="unknown", cross_term=float("nan"), p_action=float("nan"), blocked=False, threshold=threshold, alpha=alpha, beta=beta, notes=notes, ) theta_topic = ontological_friction(psi_N, psi_S) # Hard rule layer: explicit deontological violation check. violation, evidence = (False, []) if pcdt_text: violation, evidence = _check_violation(recommendation_text, pcdt_text) # θ_eff = π when explicit prohibition matched, else θ_topic. theta = math.pi if violation else theta_topic cross = interference_term(alpha, beta, theta) p = born_rule(alpha, beta, theta) z = friction_zone(theta, red_lo=threshold) if violation: interf = "destructive (deontological violation)" elif cross > 0.05: interf = "constructive" elif cross < -0.05: interf = "destructive" else: interf = "ambiguous" blocked = circuit_breaker(theta, threshold) if violation: notes.append(f"prohibition match: {len(evidence)} clause(s) above sim 0.55") for ev in evidence[:3]: notes.append(f" ↳ '{ev['clause']}' (sim={ev['similarity']:.2f})") if blocked: notes.append(f"circuit_breaker fired at θ={theta:.3f} ≥ {threshold}") return QFengAssessment( theta=theta, zone=z, interference=interf, cross_term=cross, p_action=p, blocked=blocked, threshold=threshold, alpha=alpha, beta=beta, psi_N_dim=int(psi_N.shape[0]), psi_S_dim=int(psi_S.shape[0]), notes=notes, ) # ─────────────────── Fractal VSM audit (3 scales) ─────────────────── def fractal_audit( *, recommendation_text: str, orpha: str, pcdt_text: Optional[str] = None, regulatory_text: Optional[str] = None, model_state_text: Optional[str] = None, alpha: float = 0.7, beta: float = 0.7, threshold: float = DEFAULT_THETA_THRESHOLD, ) -> QFengAssessment: """Compute θ at 3 scales of Beer's Viable System Model: - macro_theta S5 regulatory (LGPD / EU AI Act / WHO) - meso_theta S4 institutional (PCDT / CEAF / CNES) - micro_theta S1-S3 algorithmic (model self-consistency) The headline `theta` is the meso_theta (PCDT alignment); the macro and micro thetas annotate it for fractal isomorphism. """ base = assess_recommendation( recommendation_text=recommendation_text, orpha=orpha, pcdt_text=pcdt_text, alpha=alpha, beta=beta, threshold=threshold, ) if regulatory_text: psi_N = _embed_text(recommendation_text) psi_S = _embed_text(regulatory_text) if psi_N is not None and psi_S is not None: base.macro_theta = ontological_friction(psi_N, psi_S) if model_state_text: psi_N = _embed_text(recommendation_text) psi_M = _embed_text(model_state_text) if psi_N is not None and psi_M is not None: base.micro_theta = ontological_friction(psi_N, psi_M) base.meso_theta = base.theta return base __all__ = [ "ontological_friction", "born_rule", "interference_term", "circuit_breaker", "friction_zone", "assess_recommendation", "fractal_audit", "QFengAssessment", "DEFAULT_THETA_THRESHOLD", ]