gemeo-twin-stack / src /gemeo /qfeng.py
timmers's picture
GEMEO world-model β€” initial release (module + NeuralSurv ckpt + RareBench v49 + KG embeddings)
089d665 verified
"""Q-FENG β€” Ontological Friction quantifier for Gemeo recommendations.
Implements the core mathematics of Kaminski (2026) "Quantum-Fractal
Neurosymbolic Governance" as an operational module on top of Gemeo's
existing patient embedding and PCDT corpus.
Three exports:
- `ontological_friction(psi_N, psi_S)` β†’ angle ΞΈ in [0, Ο€]
- `born_rule(alpha, beta, theta)` β†’ action probability with interference
- `circuit_breaker(theta, threshold=2.5)` β†’ bool, True = block
- `assess_recommendation(recommendation, orpha, ...)` β†’ QFengAssessment
Definition recap (from Kaminski 2026, Β§2.1):
|D⟩ = α|ψ_N⟩ + β|ψ_S⟩
P(action) = |Ξ±|Β² + |Ξ²|Β² + 2|Ξ±||Ξ²|Β·cos(ΞΈ)
ΞΈ = arccos(⟨ψ_N|ψ_S⟩ / (β€–Οˆ_Nβ€–Β·β€–Οˆ_Sβ€–))
Constructive interference (ΞΈ β‰ˆ 0): neural prediction aligns with
norm β†’ action allowed. Destructive interference (ΞΈ β‰ˆ Ο€): neural
prediction conflicts with norm β†’ Circuit Breaker triggers.
"""
from __future__ import annotations
import logging
import math
import os
from dataclasses import dataclass, field
from functools import lru_cache
from typing import Optional
import numpy as np
logger = logging.getLogger("gemeo.qfeng")
# Default Circuit Breaker threshold. Empirically chosen at ~140Β° (β‰ˆ2.44 rad)
# so genuinely orthogonal recommendations (ΞΈ=Ο€/2β‰ˆ1.57) still pass with a
# warning, and only strongly destructive ones (ΞΈ>2.4) are blocked.
DEFAULT_THETA_THRESHOLD = 2.40
# Yellow-zone threshold: 1.0 < ΞΈ < 2.40 β†’ flag but allow.
YELLOW_LO = 1.0
# ─────────────────────────── Core math ───────────────────────────
def _normalize(v: np.ndarray) -> np.ndarray:
n = float(np.linalg.norm(v))
return v / n if n > 1e-12 else v
def ontological_friction(psi_N: np.ndarray, psi_S: np.ndarray) -> float:
"""Compute ΞΈ = arccos(⟨ψ_N|ψ_S⟩ / (β€–Οˆ_Nβ€–Β·β€–Οˆ_Sβ€–)) ∈ [0, Ο€].
Args:
psi_N: Neural Evidence Vector (the recommendation embedding).
psi_S: Symbolic Norm Vector (the PCDT/regulatory embedding).
Returns:
Friction angle ΞΈ in radians, [0, Ο€].
"""
if psi_N is None or psi_S is None:
return float("nan")
psi_N = np.asarray(psi_N, dtype=np.float64).ravel()
psi_S = np.asarray(psi_S, dtype=np.float64).ravel()
if psi_N.shape != psi_S.shape:
# Project onto shorter dim if mismatched (e.g., 3072 vs 768)
d = min(psi_N.shape[0], psi_S.shape[0])
psi_N = psi_N[:d]
psi_S = psi_S[:d]
a = _normalize(psi_N)
b = _normalize(psi_S)
dot = float(np.clip(np.dot(a, b), -1.0, 1.0))
return float(math.acos(dot))
def born_rule(alpha: float, beta: float, theta: float) -> float:
"""P(Action) = |Ξ±|Β² + |Ξ²|Β² + 2|Ξ±||Ξ²|Β·cos(ΞΈ) β€” Born rule with interference.
Note: in QDT, |Ξ±|Β² + |Ξ²|Β² should be ≀ 1 (the cross-term is the
interference correction). For decision-making we report the cross-term
explicitly so callers can interpret constructive/destructive contribution.
"""
a = abs(alpha); b = abs(beta)
return float(a * a + b * b + 2 * a * b * math.cos(theta))
def interference_term(alpha: float, beta: float, theta: float) -> float:
"""Just the cross-term: 2|Ξ±||Ξ²|Β·cos(ΞΈ). Negative = destructive."""
return float(2 * abs(alpha) * abs(beta) * math.cos(theta))
def circuit_breaker(theta: float, threshold: float = DEFAULT_THETA_THRESHOLD) -> bool:
"""Return True (block action) when ΞΈ β‰₯ threshold (destructive interference)."""
return theta is not None and not math.isnan(theta) and theta >= threshold
def friction_zone(theta: float, *, yellow_lo: float = YELLOW_LO,
red_lo: float = DEFAULT_THETA_THRESHOLD) -> str:
"""Return 'green' | 'yellow' | 'red' for a friction angle."""
if theta is None or math.isnan(theta): return "unknown"
if theta < yellow_lo: return "green"
if theta < red_lo: return "yellow"
return "red"
# ───────────────────────── Embedding helpers ─────────────────────────
@lru_cache(maxsize=1)
def _load_disease_emb_index():
"""Load the fused 3072-d disease embeddings index (raras-app graph-ml)."""
try:
from gemeo.external_kg import load_fused_embeddings
kg = load_fused_embeddings()
if "disease_emb" in kg and "disease_id2idx" in kg:
return kg["disease_emb"], kg["disease_id2idx"]
except Exception as e:
logger.debug(f"external_kg unavailable: {e}")
return None, None
@lru_cache(maxsize=1)
def _get_text_encoder():
"""Lazy-load a sentence-transformers BioLORD encoder for clinical text.
Tier 1: sentence-transformers + FremyCompany/BioLORD-2023 (preferred).
Tier 2: sentence-transformers + all-MiniLM-L6-v2 (fallback, lighter).
Tier 3: deterministic hash-based pseudo-embedding (offline-only).
"""
try:
from sentence_transformers import SentenceTransformer
for model_id in ("FremyCompany/BioLORD-2023", "sentence-transformers/all-MiniLM-L6-v2"):
try:
m = SentenceTransformer(model_id)
logger.info(f"qfeng text encoder: {model_id}")
return ("st", m)
except Exception as e:
logger.debug(f" failed {model_id}: {e}")
except ImportError:
logger.debug("sentence_transformers not installed; using hash fallback")
return ("hash", None)
def _hash_embed(text: str, dim: int = 768) -> np.ndarray:
"""Deterministic hash-based pseudo-embedding (offline fallback).
Only useful for development; replace with real encoder in production.
"""
import hashlib
rng = np.random.default_rng(int.from_bytes(
hashlib.sha256(text.encode()).digest()[:8], "big"
))
v = rng.standard_normal(dim)
return v / max(1e-12, np.linalg.norm(v))
def _embed_text(text: str) -> Optional[np.ndarray]:
"""Encode arbitrary clinical text into a sentence vector.
Returns None for empty/whitespace input. Otherwise tries BioLORD-2023,
then MiniLM, then a deterministic hash fallback.
"""
if not text or not text.strip():
return None
kind, model = _get_text_encoder()
if kind == "st" and model is not None:
try:
v = model.encode([text], convert_to_numpy=True, normalize_embeddings=True)[0]
return np.asarray(v, dtype=np.float64)
except Exception as e:
logger.warning(f"sentence-transformer encode failed: {e}; falling back")
return _hash_embed(text)
def _embed_disease_orpha(orpha: str) -> Optional[np.ndarray]:
"""Look up the fused 3072-d embedding for a disease by ORPHA code."""
if not orpha:
return None
de, id2idx = _load_disease_emb_index()
if de is None:
return None
key = str(orpha).strip()
if key not in id2idx:
return None
return np.asarray(de[id2idx[key]], dtype=np.float64)
def _embed_pcdt(orpha: str, pcdt_text: Optional[str]) -> Optional[np.ndarray]:
"""Build ψ_S from the PCDT text if available, else fall back to the
disease's normative embedding (which encodes the protocol-aligned
semantics learned during graph-ml training)."""
if pcdt_text:
v = _embed_text(pcdt_text)
if v is not None:
return v
return _embed_disease_orpha(orpha)
# ───────────────────────── Assessment dataclass ─────────────────────────
@dataclass
class QFengAssessment:
"""Per-recommendation Ontological Friction assessment."""
theta: float # friction angle [0, Ο€]
zone: str # "green" | "yellow" | "red" | "unknown"
interference: str # "constructive" | "destructive" | "ambiguous"
cross_term: float # 2|Ξ±||Ξ²|Β·cos(ΞΈ)
p_action: float # Born rule probability
blocked: bool # circuit-breaker fired?
threshold: float = DEFAULT_THETA_THRESHOLD
alpha: float = 0.7 # weight on neural side
beta: float = 0.7 # weight on symbolic side
psi_N_dim: int = 0
psi_S_dim: int = 0
notes: list = field(default_factory=list)
macro_theta: Optional[float] = None # regulatory layer (LGPD/EU AI Act)
meso_theta: Optional[float] = None # institutional (PCDT)
micro_theta: Optional[float] = None # algorithmic (model self-consistency)
# ─────────────────── Explicit normative violation check ───────────────────
#
# Topical embedding similarity (cosΞΈ) cannot, by construction, distinguish
# "iniciar fΓ‘rmaco X" from "nΓ£o iniciar fΓ‘rmaco X" β€” both cluster by topic.
# Q-FENG therefore composes two signals:
#
# ΞΈ_topic β€” semantic similarity in fused embedding space
# violation β€” boolean from explicit deontological rule check
#
# When a deontological violation is detected, ΞΈ_eff is forced to Ο€
# (destructive), regardless of topical similarity. This corresponds to
# Kaminski's "destructive interference triggers Circuit Breaker" but
# upgraded with a hard rule layer for cases where the embedding manifold
# does not separate prescription from prohibition.
# Patterns indicating a hard contraindication or explicit prohibition in
# PT-BR clinical text. Used both on the PCDT side ("Γ© contraindicado",
# "nΓ£o deve ser administrado") and on the recommendation side (catches
# whether the recommendation matches a prohibited intervention).
_PT_PROHIBITION = [
r"contraindica\w*", r"contra-indica\w*",
r"n[Γ£a]o\s+deve(?:m)?\s+ser",
r"n[Γ£a]o\s+(?:est[Γ‘a])?\s+indicad\w*",
r"proibid\w*", r"vedad\w*",
]
_PT_INDICATION = [
r"indicad\w*", r"prescri\w+", r"administra\w+", r"iniciar\b",
r"manter\b", r"continuar\b", r"dispensa\w*",
]
def _extract_prohibited_clauses(pcdt_text: str) -> list:
"""Extract the SUBJECT of each prohibition (what is being forbidden).
For PT-BR clinical text the subject sits immediately before the
prohibition verb: "X Γ© contraindicado", "X nΓ£o deve ser administrado".
We extract the noun phrase to the LEFT of the keyword (up to 80 chars,
bounded by sentence delimiters) plus a few tokens to the right for
context.
"""
import re
out = []
if not pcdt_text:
return out
for pat in _PT_PROHIBITION:
for m in re.finditer(pat, pcdt_text, re.IGNORECASE):
# Walk back to the nearest sentence delimiter
lo = max(0, m.start() - 80)
seg = pcdt_text[lo:m.start()]
for delim in (". ", "; ", "\n"):
pos = seg.rfind(delim)
if pos >= 0:
seg = seg[pos + len(delim):]
break
tail = pcdt_text[m.end():m.end() + 40]
tail_end = min((len(tail), tail.find("."), tail.find(";"),
tail.find("\n"))) if any(c in tail for c in ".;\n") else len(tail)
tail_end = max(0, tail_end if isinstance(tail_end, int) else 0)
phrase = (seg + " " + pat + " " + tail[:tail_end]).strip()
if phrase:
out.append(phrase)
return out
def _content_keywords(text: str) -> set:
"""Extract content-bearing keywords (4+ char alphabetic tokens, lowercased,
diacritics-stripped)."""
import re, unicodedata
nfkd = unicodedata.normalize("NFKD", text or "")
ascii_text = "".join(c for c in nfkd if not unicodedata.combining(c)).lower()
tokens = re.findall(r"[a-z]{4,}", ascii_text)
# Common stopwords (PT + EN)
stop = {"para", "como", "esse", "essa", "nesta", "neste", "deste", "desta",
"pelo", "pela", "pelos", "pelas", "deve", "devem", "esta", "este",
"with", "from", "that", "this", "have", "than", "then", "into",
"when", "such", "while", "after", "their", "where", "which",
"ainda", "tambem", "todos", "todas", "outros", "alta", "dose",
"anos", "anual"}
return {t for t in tokens if t not in stop}
def _check_violation(recommendation_text: str, pcdt_text: str) -> tuple[bool, list]:
"""Return (violation_flag, evidence_list).
Two-channel deontological check:
(1) sentence-level cosine similarity between the recommendation and
each prohibition clause in the PCDT;
(2) keyword-overlap between the recommendation and the prohibition
clause (catches the case where rec mentions a substance/procedure
explicitly named as forbidden).
A violation is flagged when (sim β‰₯ 0.30 AND content_overlap β‰₯ 2 unique
tokens) OR (sim β‰₯ 0.55), and the recommendation contains an indication
verb. For production replace with an LLM-as-judge call.
"""
import re
if not recommendation_text or not pcdt_text:
return False, []
rec_lower = recommendation_text.lower()
indicates = any(re.search(p, rec_lower) for p in _PT_INDICATION)
if not indicates:
return False, []
prohibitions = _extract_prohibited_clauses(pcdt_text)
if not prohibitions:
return False, []
rec_emb = _embed_text(recommendation_text)
rec_kw = _content_keywords(recommendation_text)
evidence = []
flag = False
for clause in prohibitions:
c_emb = _embed_text(clause)
c_kw = _content_keywords(clause)
sim = 0.0
if rec_emb is not None and c_emb is not None:
sim = float(np.dot(_normalize(rec_emb), _normalize(c_emb)))
overlap = rec_kw & c_kw
is_violation = (sim >= 0.55) or (sim >= 0.30 and len(overlap) >= 2)
if is_violation:
evidence.append({
"clause": clause[:160],
"similarity": round(sim, 3),
"overlap": sorted(overlap)[:6],
})
flag = True
return flag, evidence
def assess_recommendation(
*,
recommendation_text: str,
orpha: str,
pcdt_text: Optional[str] = None,
alpha: float = 0.7,
beta: float = 0.7,
threshold: float = DEFAULT_THETA_THRESHOLD,
) -> QFengAssessment:
"""Compute the full Q-FENG assessment for a single recommendation.
Args:
recommendation_text: free-text description of the proposed action
(e.g., "iniciar enzima alfa-galactosidase via CEAF").
orpha: ORPHA code of the disease the recommendation targets.
pcdt_text: optional PCDT excerpt describing the normative
constraints. If None, the fused disease embedding from
raras-app graph-ml is used as a proxy.
alpha, beta: weights on neural / symbolic basis vectors.
threshold: Circuit Breaker threshold in radians.
Returns:
QFengAssessment with ΞΈ, zone, P(action), and block flag.
"""
notes = []
psi_N = _embed_text(recommendation_text)
psi_S = _embed_pcdt(orpha, pcdt_text)
if psi_N is None:
notes.append("recommendation embedding unavailable")
if psi_S is None:
notes.append(f"normative embedding unavailable for ORPHA:{orpha}")
if psi_N is None or psi_S is None:
return QFengAssessment(
theta=float("nan"), zone="unknown",
interference="unknown", cross_term=float("nan"),
p_action=float("nan"), blocked=False, threshold=threshold,
alpha=alpha, beta=beta, notes=notes,
)
theta_topic = ontological_friction(psi_N, psi_S)
# Hard rule layer: explicit deontological violation check.
violation, evidence = (False, [])
if pcdt_text:
violation, evidence = _check_violation(recommendation_text, pcdt_text)
# ΞΈ_eff = Ο€ when explicit prohibition matched, else ΞΈ_topic.
theta = math.pi if violation else theta_topic
cross = interference_term(alpha, beta, theta)
p = born_rule(alpha, beta, theta)
z = friction_zone(theta, red_lo=threshold)
if violation:
interf = "destructive (deontological violation)"
elif cross > 0.05:
interf = "constructive"
elif cross < -0.05:
interf = "destructive"
else:
interf = "ambiguous"
blocked = circuit_breaker(theta, threshold)
if violation:
notes.append(f"prohibition match: {len(evidence)} clause(s) above sim 0.55")
for ev in evidence[:3]:
notes.append(f" ↳ '{ev['clause']}' (sim={ev['similarity']:.2f})")
if blocked:
notes.append(f"circuit_breaker fired at ΞΈ={theta:.3f} β‰₯ {threshold}")
return QFengAssessment(
theta=theta, zone=z, interference=interf,
cross_term=cross, p_action=p, blocked=blocked, threshold=threshold,
alpha=alpha, beta=beta,
psi_N_dim=int(psi_N.shape[0]),
psi_S_dim=int(psi_S.shape[0]),
notes=notes,
)
# ─────────────────── Fractal VSM audit (3 scales) ───────────────────
def fractal_audit(
*,
recommendation_text: str,
orpha: str,
pcdt_text: Optional[str] = None,
regulatory_text: Optional[str] = None,
model_state_text: Optional[str] = None,
alpha: float = 0.7,
beta: float = 0.7,
threshold: float = DEFAULT_THETA_THRESHOLD,
) -> QFengAssessment:
"""Compute ΞΈ at 3 scales of Beer's Viable System Model:
- macro_theta S5 regulatory (LGPD / EU AI Act / WHO)
- meso_theta S4 institutional (PCDT / CEAF / CNES)
- micro_theta S1-S3 algorithmic (model self-consistency)
The headline `theta` is the meso_theta (PCDT alignment); the macro
and micro thetas annotate it for fractal isomorphism.
"""
base = assess_recommendation(
recommendation_text=recommendation_text,
orpha=orpha, pcdt_text=pcdt_text,
alpha=alpha, beta=beta, threshold=threshold,
)
if regulatory_text:
psi_N = _embed_text(recommendation_text)
psi_S = _embed_text(regulatory_text)
if psi_N is not None and psi_S is not None:
base.macro_theta = ontological_friction(psi_N, psi_S)
if model_state_text:
psi_N = _embed_text(recommendation_text)
psi_M = _embed_text(model_state_text)
if psi_N is not None and psi_M is not None:
base.micro_theta = ontological_friction(psi_N, psi_M)
base.meso_theta = base.theta
return base
__all__ = [
"ontological_friction",
"born_rule",
"interference_term",
"circuit_breaker",
"friction_zone",
"assess_recommendation",
"fractal_audit",
"QFengAssessment",
"DEFAULT_THETA_THRESHOLD",
]