File size: 18,818 Bytes
089d665
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
"""Q-FENG β€” Ontological Friction quantifier for Gemeo recommendations.

Implements the core mathematics of Kaminski (2026) "Quantum-Fractal
Neurosymbolic Governance" as an operational module on top of Gemeo's
existing patient embedding and PCDT corpus.

Three exports:
  - `ontological_friction(psi_N, psi_S)` β†’ angle ΞΈ in [0, Ο€]
  - `born_rule(alpha, beta, theta)` β†’ action probability with interference
  - `circuit_breaker(theta, threshold=2.5)` β†’ bool, True = block
  - `assess_recommendation(recommendation, orpha, ...)` β†’ QFengAssessment

Definition recap (from Kaminski 2026, Β§2.1):
    |D⟩ = α|ψ_N⟩ + β|ψ_S⟩
    P(action) = |Ξ±|Β² + |Ξ²|Β² + 2|Ξ±||Ξ²|Β·cos(ΞΈ)
    ΞΈ = arccos(⟨ψ_N|ψ_S⟩ / (β€–Οˆ_Nβ€–Β·β€–Οˆ_Sβ€–))

Constructive interference (ΞΈ β‰ˆ 0): neural prediction aligns with
norm β†’ action allowed. Destructive interference (ΞΈ β‰ˆ Ο€): neural
prediction conflicts with norm β†’ Circuit Breaker triggers.
"""
from __future__ import annotations
import logging
import math
import os
from dataclasses import dataclass, field
from functools import lru_cache
from typing import Optional

import numpy as np

logger = logging.getLogger("gemeo.qfeng")

# Default Circuit Breaker threshold. Empirically chosen at ~140Β° (β‰ˆ2.44 rad)
# so genuinely orthogonal recommendations (ΞΈ=Ο€/2β‰ˆ1.57) still pass with a
# warning, and only strongly destructive ones (ΞΈ>2.4) are blocked.
DEFAULT_THETA_THRESHOLD = 2.40

# Yellow-zone threshold: 1.0 < ΞΈ < 2.40 β†’ flag but allow.
YELLOW_LO = 1.0


# ─────────────────────────── Core math ───────────────────────────

def _normalize(v: np.ndarray) -> np.ndarray:
    n = float(np.linalg.norm(v))
    return v / n if n > 1e-12 else v


def ontological_friction(psi_N: np.ndarray, psi_S: np.ndarray) -> float:
    """Compute ΞΈ = arccos(⟨ψ_N|ψ_S⟩ / (β€–Οˆ_Nβ€–Β·β€–Οˆ_Sβ€–)) ∈ [0, Ο€].

    Args:
        psi_N: Neural Evidence Vector (the recommendation embedding).
        psi_S: Symbolic Norm Vector (the PCDT/regulatory embedding).

    Returns:
        Friction angle ΞΈ in radians, [0, Ο€].
    """
    if psi_N is None or psi_S is None:
        return float("nan")
    psi_N = np.asarray(psi_N, dtype=np.float64).ravel()
    psi_S = np.asarray(psi_S, dtype=np.float64).ravel()
    if psi_N.shape != psi_S.shape:
        # Project onto shorter dim if mismatched (e.g., 3072 vs 768)
        d = min(psi_N.shape[0], psi_S.shape[0])
        psi_N = psi_N[:d]
        psi_S = psi_S[:d]
    a = _normalize(psi_N)
    b = _normalize(psi_S)
    dot = float(np.clip(np.dot(a, b), -1.0, 1.0))
    return float(math.acos(dot))


def born_rule(alpha: float, beta: float, theta: float) -> float:
    """P(Action) = |Ξ±|Β² + |Ξ²|Β² + 2|Ξ±||Ξ²|Β·cos(ΞΈ) β€” Born rule with interference.

    Note: in QDT, |Ξ±|Β² + |Ξ²|Β² should be ≀ 1 (the cross-term is the
    interference correction). For decision-making we report the cross-term
    explicitly so callers can interpret constructive/destructive contribution.
    """
    a = abs(alpha); b = abs(beta)
    return float(a * a + b * b + 2 * a * b * math.cos(theta))


def interference_term(alpha: float, beta: float, theta: float) -> float:
    """Just the cross-term: 2|Ξ±||Ξ²|Β·cos(ΞΈ). Negative = destructive."""
    return float(2 * abs(alpha) * abs(beta) * math.cos(theta))


def circuit_breaker(theta: float, threshold: float = DEFAULT_THETA_THRESHOLD) -> bool:
    """Return True (block action) when ΞΈ β‰₯ threshold (destructive interference)."""
    return theta is not None and not math.isnan(theta) and theta >= threshold


def friction_zone(theta: float, *, yellow_lo: float = YELLOW_LO,
                  red_lo: float = DEFAULT_THETA_THRESHOLD) -> str:
    """Return 'green' | 'yellow' | 'red' for a friction angle."""
    if theta is None or math.isnan(theta): return "unknown"
    if theta < yellow_lo: return "green"
    if theta < red_lo:    return "yellow"
    return "red"


# ───────────────────────── Embedding helpers ─────────────────────────

@lru_cache(maxsize=1)
def _load_disease_emb_index():
    """Load the fused 3072-d disease embeddings index (raras-app graph-ml)."""
    try:
        from gemeo.external_kg import load_fused_embeddings
        kg = load_fused_embeddings()
        if "disease_emb" in kg and "disease_id2idx" in kg:
            return kg["disease_emb"], kg["disease_id2idx"]
    except Exception as e:
        logger.debug(f"external_kg unavailable: {e}")
    return None, None


@lru_cache(maxsize=1)
def _get_text_encoder():
    """Lazy-load a sentence-transformers BioLORD encoder for clinical text.

    Tier 1: sentence-transformers + FremyCompany/BioLORD-2023 (preferred).
    Tier 2: sentence-transformers + all-MiniLM-L6-v2 (fallback, lighter).
    Tier 3: deterministic hash-based pseudo-embedding (offline-only).
    """
    try:
        from sentence_transformers import SentenceTransformer
        for model_id in ("FremyCompany/BioLORD-2023", "sentence-transformers/all-MiniLM-L6-v2"):
            try:
                m = SentenceTransformer(model_id)
                logger.info(f"qfeng text encoder: {model_id}")
                return ("st", m)
            except Exception as e:
                logger.debug(f"  failed {model_id}: {e}")
    except ImportError:
        logger.debug("sentence_transformers not installed; using hash fallback")
    return ("hash", None)


def _hash_embed(text: str, dim: int = 768) -> np.ndarray:
    """Deterministic hash-based pseudo-embedding (offline fallback).
    Only useful for development; replace with real encoder in production.
    """
    import hashlib
    rng = np.random.default_rng(int.from_bytes(
        hashlib.sha256(text.encode()).digest()[:8], "big"
    ))
    v = rng.standard_normal(dim)
    return v / max(1e-12, np.linalg.norm(v))


def _embed_text(text: str) -> Optional[np.ndarray]:
    """Encode arbitrary clinical text into a sentence vector.

    Returns None for empty/whitespace input. Otherwise tries BioLORD-2023,
    then MiniLM, then a deterministic hash fallback.
    """
    if not text or not text.strip():
        return None
    kind, model = _get_text_encoder()
    if kind == "st" and model is not None:
        try:
            v = model.encode([text], convert_to_numpy=True, normalize_embeddings=True)[0]
            return np.asarray(v, dtype=np.float64)
        except Exception as e:
            logger.warning(f"sentence-transformer encode failed: {e}; falling back")
    return _hash_embed(text)


def _embed_disease_orpha(orpha: str) -> Optional[np.ndarray]:
    """Look up the fused 3072-d embedding for a disease by ORPHA code."""
    if not orpha:
        return None
    de, id2idx = _load_disease_emb_index()
    if de is None:
        return None
    key = str(orpha).strip()
    if key not in id2idx:
        return None
    return np.asarray(de[id2idx[key]], dtype=np.float64)


def _embed_pcdt(orpha: str, pcdt_text: Optional[str]) -> Optional[np.ndarray]:
    """Build ψ_S from the PCDT text if available, else fall back to the
    disease's normative embedding (which encodes the protocol-aligned
    semantics learned during graph-ml training)."""
    if pcdt_text:
        v = _embed_text(pcdt_text)
        if v is not None:
            return v
    return _embed_disease_orpha(orpha)


# ───────────────────────── Assessment dataclass ─────────────────────────

@dataclass
class QFengAssessment:
    """Per-recommendation Ontological Friction assessment."""
    theta: float                      # friction angle [0, Ο€]
    zone: str                         # "green" | "yellow" | "red" | "unknown"
    interference: str                 # "constructive" | "destructive" | "ambiguous"
    cross_term: float                 # 2|Ξ±||Ξ²|Β·cos(ΞΈ)
    p_action: float                   # Born rule probability
    blocked: bool                     # circuit-breaker fired?
    threshold: float = DEFAULT_THETA_THRESHOLD
    alpha: float = 0.7                # weight on neural side
    beta: float = 0.7                 # weight on symbolic side
    psi_N_dim: int = 0
    psi_S_dim: int = 0
    notes: list = field(default_factory=list)
    macro_theta: Optional[float] = None  # regulatory layer (LGPD/EU AI Act)
    meso_theta: Optional[float] = None   # institutional (PCDT)
    micro_theta: Optional[float] = None  # algorithmic (model self-consistency)


# ─────────────────── Explicit normative violation check ───────────────────
#
# Topical embedding similarity (cosΞΈ) cannot, by construction, distinguish
# "iniciar fΓ‘rmaco X" from "nΓ£o iniciar fΓ‘rmaco X" β€” both cluster by topic.
# Q-FENG therefore composes two signals:
#
#   ΞΈ_topic       β€” semantic similarity in fused embedding space
#   violation     β€” boolean from explicit deontological rule check
#
# When a deontological violation is detected, ΞΈ_eff is forced to Ο€
# (destructive), regardless of topical similarity. This corresponds to
# Kaminski's "destructive interference triggers Circuit Breaker" but
# upgraded with a hard rule layer for cases where the embedding manifold
# does not separate prescription from prohibition.

# Patterns indicating a hard contraindication or explicit prohibition in
# PT-BR clinical text. Used both on the PCDT side ("Γ© contraindicado",
# "nΓ£o deve ser administrado") and on the recommendation side (catches
# whether the recommendation matches a prohibited intervention).
_PT_PROHIBITION = [
    r"contraindica\w*", r"contra-indica\w*",
    r"n[Γ£a]o\s+deve(?:m)?\s+ser",
    r"n[Γ£a]o\s+(?:est[Γ‘a])?\s+indicad\w*",
    r"proibid\w*", r"vedad\w*",
]
_PT_INDICATION = [
    r"indicad\w*", r"prescri\w+", r"administra\w+", r"iniciar\b",
    r"manter\b", r"continuar\b", r"dispensa\w*",
]


def _extract_prohibited_clauses(pcdt_text: str) -> list:
    """Extract the SUBJECT of each prohibition (what is being forbidden).

    For PT-BR clinical text the subject sits immediately before the
    prohibition verb: "X Γ© contraindicado", "X nΓ£o deve ser administrado".
    We extract the noun phrase to the LEFT of the keyword (up to 80 chars,
    bounded by sentence delimiters) plus a few tokens to the right for
    context.
    """
    import re
    out = []
    if not pcdt_text:
        return out
    for pat in _PT_PROHIBITION:
        for m in re.finditer(pat, pcdt_text, re.IGNORECASE):
            # Walk back to the nearest sentence delimiter
            lo = max(0, m.start() - 80)
            seg = pcdt_text[lo:m.start()]
            for delim in (". ", "; ", "\n"):
                pos = seg.rfind(delim)
                if pos >= 0:
                    seg = seg[pos + len(delim):]
                    break
            tail = pcdt_text[m.end():m.end() + 40]
            tail_end = min((len(tail), tail.find("."), tail.find(";"),
                            tail.find("\n"))) if any(c in tail for c in ".;\n") else len(tail)
            tail_end = max(0, tail_end if isinstance(tail_end, int) else 0)
            phrase = (seg + " " + pat + " " + tail[:tail_end]).strip()
            if phrase:
                out.append(phrase)
    return out


def _content_keywords(text: str) -> set:
    """Extract content-bearing keywords (4+ char alphabetic tokens, lowercased,
    diacritics-stripped)."""
    import re, unicodedata
    nfkd = unicodedata.normalize("NFKD", text or "")
    ascii_text = "".join(c for c in nfkd if not unicodedata.combining(c)).lower()
    tokens = re.findall(r"[a-z]{4,}", ascii_text)
    # Common stopwords (PT + EN)
    stop = {"para", "como", "esse", "essa", "nesta", "neste", "deste", "desta",
            "pelo", "pela", "pelos", "pelas", "deve", "devem", "esta", "este",
            "with", "from", "that", "this", "have", "than", "then", "into",
            "when", "such", "while", "after", "their", "where", "which",
            "ainda", "tambem", "todos", "todas", "outros", "alta", "dose",
            "anos", "anual"}
    return {t for t in tokens if t not in stop}


def _check_violation(recommendation_text: str, pcdt_text: str) -> tuple[bool, list]:
    """Return (violation_flag, evidence_list).

    Two-channel deontological check:
      (1) sentence-level cosine similarity between the recommendation and
          each prohibition clause in the PCDT;
      (2) keyword-overlap between the recommendation and the prohibition
          clause (catches the case where rec mentions a substance/procedure
          explicitly named as forbidden).
    A violation is flagged when (sim β‰₯ 0.30 AND content_overlap β‰₯ 2 unique
    tokens) OR (sim β‰₯ 0.55), and the recommendation contains an indication
    verb. For production replace with an LLM-as-judge call.
    """
    import re
    if not recommendation_text or not pcdt_text:
        return False, []
    rec_lower = recommendation_text.lower()
    indicates = any(re.search(p, rec_lower) for p in _PT_INDICATION)
    if not indicates:
        return False, []
    prohibitions = _extract_prohibited_clauses(pcdt_text)
    if not prohibitions:
        return False, []
    rec_emb = _embed_text(recommendation_text)
    rec_kw = _content_keywords(recommendation_text)
    evidence = []
    flag = False
    for clause in prohibitions:
        c_emb = _embed_text(clause)
        c_kw = _content_keywords(clause)
        sim = 0.0
        if rec_emb is not None and c_emb is not None:
            sim = float(np.dot(_normalize(rec_emb), _normalize(c_emb)))
        overlap = rec_kw & c_kw
        is_violation = (sim >= 0.55) or (sim >= 0.30 and len(overlap) >= 2)
        if is_violation:
            evidence.append({
                "clause": clause[:160],
                "similarity": round(sim, 3),
                "overlap": sorted(overlap)[:6],
            })
            flag = True
    return flag, evidence


def assess_recommendation(
    *,
    recommendation_text: str,
    orpha: str,
    pcdt_text: Optional[str] = None,
    alpha: float = 0.7,
    beta: float = 0.7,
    threshold: float = DEFAULT_THETA_THRESHOLD,
) -> QFengAssessment:
    """Compute the full Q-FENG assessment for a single recommendation.

    Args:
        recommendation_text: free-text description of the proposed action
            (e.g., "iniciar enzima alfa-galactosidase via CEAF").
        orpha: ORPHA code of the disease the recommendation targets.
        pcdt_text: optional PCDT excerpt describing the normative
            constraints. If None, the fused disease embedding from
            raras-app graph-ml is used as a proxy.
        alpha, beta: weights on neural / symbolic basis vectors.
        threshold: Circuit Breaker threshold in radians.

    Returns:
        QFengAssessment with ΞΈ, zone, P(action), and block flag.
    """
    notes = []
    psi_N = _embed_text(recommendation_text)
    psi_S = _embed_pcdt(orpha, pcdt_text)

    if psi_N is None:
        notes.append("recommendation embedding unavailable")
    if psi_S is None:
        notes.append(f"normative embedding unavailable for ORPHA:{orpha}")

    if psi_N is None or psi_S is None:
        return QFengAssessment(
            theta=float("nan"), zone="unknown",
            interference="unknown", cross_term=float("nan"),
            p_action=float("nan"), blocked=False, threshold=threshold,
            alpha=alpha, beta=beta, notes=notes,
        )

    theta_topic = ontological_friction(psi_N, psi_S)

    # Hard rule layer: explicit deontological violation check.
    violation, evidence = (False, [])
    if pcdt_text:
        violation, evidence = _check_violation(recommendation_text, pcdt_text)

    # ΞΈ_eff = Ο€ when explicit prohibition matched, else ΞΈ_topic.
    theta = math.pi if violation else theta_topic

    cross = interference_term(alpha, beta, theta)
    p = born_rule(alpha, beta, theta)
    z = friction_zone(theta, red_lo=threshold)
    if violation:
        interf = "destructive (deontological violation)"
    elif cross > 0.05:
        interf = "constructive"
    elif cross < -0.05:
        interf = "destructive"
    else:
        interf = "ambiguous"
    blocked = circuit_breaker(theta, threshold)
    if violation:
        notes.append(f"prohibition match: {len(evidence)} clause(s) above sim 0.55")
        for ev in evidence[:3]:
            notes.append(f"   ↳ '{ev['clause']}' (sim={ev['similarity']:.2f})")
    if blocked:
        notes.append(f"circuit_breaker fired at ΞΈ={theta:.3f} β‰₯ {threshold}")
    return QFengAssessment(
        theta=theta, zone=z, interference=interf,
        cross_term=cross, p_action=p, blocked=blocked, threshold=threshold,
        alpha=alpha, beta=beta,
        psi_N_dim=int(psi_N.shape[0]),
        psi_S_dim=int(psi_S.shape[0]),
        notes=notes,
    )


# ─────────────────── Fractal VSM audit (3 scales) ───────────────────

def fractal_audit(
    *,
    recommendation_text: str,
    orpha: str,
    pcdt_text: Optional[str] = None,
    regulatory_text: Optional[str] = None,
    model_state_text: Optional[str] = None,
    alpha: float = 0.7,
    beta: float = 0.7,
    threshold: float = DEFAULT_THETA_THRESHOLD,
) -> QFengAssessment:
    """Compute ΞΈ at 3 scales of Beer's Viable System Model:

      - macro_theta  S5 regulatory   (LGPD / EU AI Act / WHO)
      - meso_theta   S4 institutional (PCDT / CEAF / CNES)
      - micro_theta  S1-S3 algorithmic (model self-consistency)

    The headline `theta` is the meso_theta (PCDT alignment); the macro
    and micro thetas annotate it for fractal isomorphism.
    """
    base = assess_recommendation(
        recommendation_text=recommendation_text,
        orpha=orpha, pcdt_text=pcdt_text,
        alpha=alpha, beta=beta, threshold=threshold,
    )
    if regulatory_text:
        psi_N = _embed_text(recommendation_text)
        psi_S = _embed_text(regulatory_text)
        if psi_N is not None and psi_S is not None:
            base.macro_theta = ontological_friction(psi_N, psi_S)
    if model_state_text:
        psi_N = _embed_text(recommendation_text)
        psi_M = _embed_text(model_state_text)
        if psi_N is not None and psi_M is not None:
            base.micro_theta = ontological_friction(psi_N, psi_M)
    base.meso_theta = base.theta
    return base


__all__ = [
    "ontological_friction",
    "born_rule",
    "interference_term",
    "circuit_breaker",
    "friction_zone",
    "assess_recommendation",
    "fractal_audit",
    "QFengAssessment",
    "DEFAULT_THETA_THRESHOLD",
]