jdmagent / src /jdm_agent /enrich /validators.py
expAge
ui+ux: header epure / file preserve sur error / submit Drops / pending sobre / flush right
8e780f6
"""Validation et consolidation des candidats proposés.
Deux étapes distinctes :
* `validate_candidate` — validation STRUCTURELLE déterministe, en CONTENANCE
pure (`verify_claim` à effort 0). Statuts : `unknown_term`, `duplicate`,
`inconsistent` (triplet déjà nié explicitement dans JDM), `ok`.
* `consolidate_candidate` — consolidation SÉMANTIQUE par INFÉRENCE. Cherche
si le réseau JDM permet de déduire (ou de réfuter) le triplet. Statuts :
`consolidated` (déduit → point d'entrée vers la soumission),
`rejected` (réfuté), `not_consolidated` (silence — pas forcément faux).
Registry d'exclusion (option A — anti-doublons) :
ContextVar `_EXCLUSION_REGISTRY` qui stocke, par (term, relation), la
liste normalisée des cibles déjà connues dans JDM (renseignée par
`list_existing_for_enrichment` au moment du pré-fetch). Quand le LLM
appelle ensuite `validate_candidate` sur un candidat dont la cible est
dans cette liste, on court-circuite SANS appeler verify_claim (pas
d'HTTP, pas d'inférence) et on retourne immédiatement
validation_status="duplicate" avec un message qui rappelle au LLM
qu'il avait l'info.
Le registry vit pendant une invocation agent — encadrer le streaming
par `with exclusion_context(): ...`. Hors contexte, registry=None et
toutes les fonctions sont des no-ops (compat 100% avec l'existant).
"""
from __future__ import annotations
import contextvars
import unicodedata
from contextlib import contextmanager
from typing import Optional
from jdm_agent.client import JDMClient
from jdm_agent.enrich.models import Candidate
from jdm_agent.factcheck import Claim
from jdm_agent.factcheck.models import Status
from jdm_agent.factcheck.verifier import verify_claim
# ---------- Registries partagés thread-safe ----------
#
# IMPORTANT : on N'UTILISE PAS ContextVar pour ces registries. Raison :
# LangChain exécute les tools dans des threads worker (ThreadPoolExecutor)
# qui ne préservent PAS le contexte ContextVar du parent. Résultat :
# register_consolidation() depuis le tool validate_candidate écrivait
# dans un dict isolé du thread, get_consolidation() depuis le tool
# write_submission_file lisait None → tous les triplets skippés
# silencieusement → fichier .enrich vide.
#
# À la place : dict global module-level + Lock. Activé/désactivé par
# `exclusion_context()` via un compteur (pour supporter le nesting).
# Trade-off accepté : pas d'isolation per-user en cas de concurrent
# heavy multi-user — mais les keys sont (term, relation, target) qui
# sont stables, et chaque write_submission_file ne consulte que les
# triplets que SON LLM lui passe → pas de fuite de contenu entre users.
import threading
_REGISTRY_LOCK = threading.RLock()
_EXCLUSION_REGISTRY: Optional[dict] = None
_CONSOLIDATION_REGISTRY: Optional[dict] = None
_CONTEXT_DEPTH = 0 # compteur de nesting d'exclusion_context()
# Path d'append automatique pour les consolidations. Quand non-None,
# register_consolidation() écrit chaque triplet consolidé en mode
# append dans ce fichier IMMÉDIATEMENT après l'avoir ajouté au
# registry. Permet à l'UI Gradio (et au mcp client) de voir le
# fichier grossir en temps réel, sans dépendre du LLM appelant
# write_submission_file (qui écraserait à chaque appel).
_CONSOLIDATION_OUTPUT_PATH: Optional[str] = None
_CONSOLIDATION_OUTPUT_HEADER_WRITTEN: bool = False
def _norm_target(s: str) -> str:
"""Normalisation cohérente avec `jdm_tools._norm` utilisé dans
list_existing_for_enrichment : NFKD + suppression diacritiques +
lowercase + strip. Doit matcher EXACTEMENT la normalisation stockée."""
s = unicodedata.normalize("NFKD", s or "")
return "".join(ch for ch in s if not unicodedata.combining(ch)).lower().strip()
def _norm_key(term: str, relation: str) -> tuple[str, str]:
return (_norm_target(term), (relation or "").strip().lower())
@contextmanager
def exclusion_context():
"""Active les registries partagés (exclusion + consolidation) pour la
durée d'une invocation agent.
Sans ce contexte, les helpers sont des no-ops — le comportement
précédent (verify_claim a posteriori, explanation custom du LLM)
est préservé.
Implémentation : dict module-level + Lock + compteur de nesting.
Anciennement basé sur ContextVar mais LangChain exécute les tools
dans des threads worker qui ne préservent pas le ContextVar du
parent → registry None dans le tool → register/get no-ops →
fichier .enrich vide. Le dict global avec Lock est cross-thread,
le compteur supporte le nesting (plusieurs invocations imbriquées
partagent le même dict, seule la SORTIE la plus externe le vide).
"""
global _EXCLUSION_REGISTRY, _CONSOLIDATION_REGISTRY, _CONTEXT_DEPTH
with _REGISTRY_LOCK:
if _CONTEXT_DEPTH == 0:
_EXCLUSION_REGISTRY = {}
_CONSOLIDATION_REGISTRY = {}
_CONTEXT_DEPTH += 1
try:
yield
finally:
with _REGISTRY_LOCK:
_CONTEXT_DEPTH = max(0, _CONTEXT_DEPTH - 1)
if _CONTEXT_DEPTH == 0:
_EXCLUSION_REGISTRY = None
_CONSOLIDATION_REGISTRY = None
# ---------- Registry de consolidation ----------
def _norm_consolidation_key(term: str, relation: str, target: str) -> tuple[str, str, str]:
return (
_norm_target(term),
(relation or "").strip().lower(),
_norm_target(target),
)
def set_consolidation_output_path(path: Optional[str]) -> None:
"""Active l'écriture automatique en mode APPEND : chaque appel à
`register_consolidation` écrit la ligne du triplet consolidé dans
`path` immédiatement après l'avoir ajouté au registry.
Permet à l'UI (Gradio gr.File) de voir le fichier grossir en temps
réel sans dépendre du LLM appelant `write_submission_file`. Le
fichier reçoit un header de soumission JeuxDeMots au PREMIER append.
Passer `None` désactive l'auto-append (reset du flag header).
"""
global _CONSOLIDATION_OUTPUT_PATH, _CONSOLIDATION_OUTPUT_HEADER_WRITTEN
with _REGISTRY_LOCK:
_CONSOLIDATION_OUTPUT_PATH = path
_CONSOLIDATION_OUTPUT_HEADER_WRITTEN = False
def get_consolidation_output_path() -> Optional[str]:
"""Retourne le path d'auto-append courant, ou None si désactivé."""
with _REGISTRY_LOCK:
return _CONSOLIDATION_OUTPUT_PATH
def _append_consolidation_to_file(term: str, relation: str, target: str,
explanation: str) -> None:
"""Écrit une ligne `term | relation | target | < explanation >`
en APPEND dans le path configuré par `set_consolidation_output_path`.
No-op si aucun path n'est défini. Crée le dossier parent si absent.
Écrit un header de soumission lors du PREMIER append.
Appelé sous `_REGISTRY_LOCK` par `register_consolidation`.
L'écriture file est rapide (~ms) ; le lock reste bref.
Pas de décodage des raffinements ici (raw `term>id` reste raw) —
le décodage propre est fait par la fusion finale via
`pipeline.write_submission`. Trade-off : visibilité temps réel
prime sur la cosmétique des noms.
"""
global _CONSOLIDATION_OUTPUT_HEADER_WRITTEN
if _CONSOLIDATION_OUTPUT_PATH is None:
return
try:
from pathlib import Path as _Path
p = _Path(_CONSOLIDATION_OUTPUT_PATH)
p.parent.mkdir(parents=True, exist_ok=True)
write_header = not _CONSOLIDATION_OUTPUT_HEADER_WRITTEN
expl = " ".join((explanation or "").split())
with p.open("a", encoding="utf-8") as f:
if write_header:
f.write(
"# Soumission JeuxDeMots — fichier d'enrichissement.\n"
"# Format : terme | relation | cible | annotation < explication >\n\n"
)
_CONSOLIDATION_OUTPUT_HEADER_WRITTEN = True
f.write(f"{term} | {relation} | {target} | < {expl} >\n")
except Exception:
pass # silent fail : le registry continue à fonctionner
def register_consolidation(term: str, relation: str, target: str,
explanation: str, schema: Optional[str] = None) -> None:
"""Stocke l'explication d'inférence produite par `infer()` pour ce
triplet. Appelé par `consolidate_candidate` quand le triplet est
confirmé. No-op si aucun `exclusion_context()` actif.
Si `set_consolidation_output_path` a été appelé, écrit également
la ligne en APPEND dans ce fichier (auto-append temps réel).
Déduplication : si le triplet est déjà dans le registry, on
ne l'écrit PAS une 2e fois (évite les doublons dans le fichier
append-only).
"""
with _REGISTRY_LOCK:
if _CONSOLIDATION_REGISTRY is None:
return
key = _norm_consolidation_key(term, relation, target)
is_new = key not in _CONSOLIDATION_REGISTRY
_CONSOLIDATION_REGISTRY[key] = {
"explanation": (explanation or "").strip(),
"schema": (schema or "").strip(),
}
if is_new:
_append_consolidation_to_file(term, relation, target, explanation)
def get_consolidation(term: str, relation: str, target: str) -> Optional[dict]:
"""Récupère l'explication d'inférence stockée pour ce triplet, si
elle existe. None si pas trouvée. Utilisé par `write_submission_file`
pour OVERRIDER une éventuelle explanation custom du LLM."""
with _REGISTRY_LOCK:
if _CONSOLIDATION_REGISTRY is None:
return None
key = _norm_consolidation_key(term, relation, target)
return _CONSOLIDATION_REGISTRY.get(key)
def count_consolidations() -> int:
"""Renvoie le NOMBRE CUMULATIF de triplets consolidés depuis l'entrée
dans `exclusion_context()` — incluant TOUTES les relances persistance.
Source de vérité pour le compteur de progression du flow d'enrichissement.
À PRÉFÉRER à `count_consolidated_in_messages(accumulated_messages)` qui
ne voit que les ToolMessages du tour COURANT (l'accumulated_messages
étant reset à chaque relance via `build_relance_summary`).
Renvoie 0 hors `exclusion_context()` (registry None)."""
with _REGISTRY_LOCK:
if _CONSOLIDATION_REGISTRY is None:
return 0
return len(_CONSOLIDATION_REGISTRY)
def list_consolidations() -> list[dict]:
"""Renvoie la LISTE des triplets consolidés depuis l'entrée dans
`exclusion_context()` (incluant toutes les relances). Format :
[{term, relation, target, explanation, schema}, ...]. Liste vide
hors contexte. Utile pour une fusion finale (option B) ou un dump
complet en fin de flow."""
with _REGISTRY_LOCK:
if _CONSOLIDATION_REGISTRY is None:
return []
return [
{
"term": k[0], "relation": k[1], "target": k[2],
"explanation": v.get("explanation", ""),
"schema": v.get("schema", ""),
}
for k, v in _CONSOLIDATION_REGISTRY.items()
]
def register_exclusion(term: str, relation: str, exclusion_set) -> None:
"""Stocke la liste de cibles déjà présentes pour (term, relation).
Appelé par `list_existing_for_enrichment` après son fetch.
No-op si aucun `exclusion_context()` n'est actif."""
with _REGISTRY_LOCK:
if _EXCLUSION_REGISTRY is None:
return
_EXCLUSION_REGISTRY[_norm_key(term, relation)] = set(exclusion_set or [])
def is_excluded(term: str, relation: str, target: str) -> Optional[str]:
"""Retourne None si la cible n'est pas dans l'exclusion enregistrée,
sinon un message court qui rappelle au LLM qu'il avait l'info.
No-op (None) si aucun `exclusion_context()` n'est actif ou si pas
de pré-fetch enregistré pour ce (term, relation)."""
with _REGISTRY_LOCK:
if _EXCLUSION_REGISTRY is None:
return None
excl = _EXCLUSION_REGISTRY.get(_norm_key(term, relation))
if not excl:
return None
if _norm_target(target) in excl:
return (
f"Déjà vu lors du pré-fetch `list_existing_for_enrichment("
f"term='{term}', relation_name='{relation}')`. Tu avais "
f"la cible « {target} » dans l'exclusion_set — propose autre chose."
)
return None
# ---------- Validation et consolidation ----------
def validate_candidate(client: JDMClient, candidate: Candidate) -> Candidate:
"""Annote le candidat avec validation_status / validation_note.
Validation STRUCTURELLE en contenance pure (pas d'inférence) : on regarde
uniquement ce que JDM contient littéralement.
"""
# 1. La cible existe-t-elle dans JDM ?
try:
client.node_by_name(candidate.target)
except Exception:
candidate.validation_status = "unknown_term"
candidate.validation_note = f"Le terme {candidate.target!r} n'existe pas dans JDM."
return candidate
# 1.5 FAST-PATH option A : si le pré-fetch a été fait pour ce
# (term, relation) et que la cible y figure, on court-circuite sans
# appeler verify_claim — message éducatif pour faire reculer le LLM.
excl_msg = is_excluded(candidate.term, candidate.relation, candidate.target)
if excl_msg:
candidate.validation_status = "duplicate"
candidate.validation_note = excl_msg
return candidate
# 2. Le triplet existe-t-il déjà ? (= déjà couvert, rien à ajouter)
# effort=0 : un doublon = littéralement présent — contenance stricte.
# Cas où on arrive ici : pas de pré-fetch enregistré pour ce couple
# (LLM a sauté l'étape, ou pre-fetch sur autre relation, etc.) — on
# paie un appel HTTP de plus à titre de filet de sécurité.
claim = Claim(
text=f"{candidate.term} | {candidate.relation} | {candidate.target}",
subject=candidate.term, relation=candidate.relation, object=candidate.target,
)
verdict = verify_claim(client, claim, effort=0)
if verdict.status == Status.SUPPORTED and verdict.evidence_for:
ev = verdict.evidence_for[0]
if ev.target.lower().strip() == candidate.target.lower().strip():
candidate.validation_status = "duplicate"
candidate.validation_note = (
f"Déjà présent : {ev.source} | {ev.relation} | {ev.target} (w={ev.w:.0f})."
)
return candidate
# 3. Incohérence directe ? (triplet explicitement nié dans JDM, w<0)
if verdict.status == Status.CONTRADICTED:
candidate.validation_status = "inconsistent"
candidate.validation_note = f"Contradiction JDM directe : {verdict.explanation}"
candidate.confidence = min(candidate.confidence, 0.1)
return candidate
# 4. OK structurellement
candidate.validation_status = "ok"
candidate.validation_note = (
"Validé structurellement — non-dupliqué, cible connue de JDM, "
"aucune négation directe."
)
return candidate
def consolidate_candidate(client: JDMClient, candidate: Candidate, *,
effort: int = 1,
budget: Optional[int] = None) -> Candidate:
"""Consolide un candidat par INFÉRENCE dans le réseau JDM.
Tente de déduire le triplet à partir du graphe :
* déduit → `consolidation_status = "consolidated"` (prêt pour soumission)
* réfuté → `"rejected"` (+ confidence abaissée)
* silence → `"not_consolidated"` (« pas forcément faux » — simplement
non démontrable par les schémas actuels)
La chaîne d'inférence devient `consolidation_explanation` (justification
« oui parce que … » / « non parce que … »).
"""
from jdm_agent.inference import infer
res = infer(client, candidate.term, candidate.relation, candidate.target,
effort=effort, budget=budget)
if res.is_true:
candidate.consolidation_status = "consolidated"
candidate.consolidation_schema = res.fired_schema.value
candidate.consolidation_explanation = res.explanation
# Enregistre dans le registry partagé pour que write_submission_file
# puisse OVERRIDER une éventuelle explanation custom du LLM par
# cette explication formelle issue du moteur d'inférence.
register_consolidation(
candidate.term, candidate.relation, candidate.target,
res.explanation, res.fired_schema.value,
)
elif res.is_false:
candidate.consolidation_status = "rejected"
candidate.consolidation_schema = res.fired_schema.value
candidate.consolidation_explanation = res.explanation
candidate.confidence = min(candidate.confidence, 0.1)
else:
candidate.consolidation_status = "not_consolidated"
candidate.consolidation_schema = None
candidate.consolidation_explanation = (
"Inférence silencieuse — non démontré dans JDM (pas forcément faux)."
)
return candidate