code-education-rag / src /synthesis.py
FabIndy's picture
hotfix: restore synthesis module
f4380aa
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
synthesis.py — Mode SYNTHÈSE extractive (déterministe, sans reformulation)
But :
- Produire une synthèse lisible ET juridiquement sûre (zéro hallucination)
- On ne "rédige" rien : on sélectionne des extraits exacts, non tronqués
- On affiche les extraits dans l'ordre du texte
Stratégie :
1) Découper l'article en "unités juridiques" (items I/II/1°/a)/-, sinon clauses ;, sinon phrases)
2) Scorer les unités sur des marqueurs normatifs (obligation, interdiction, conditions, exceptions, structure)
3) Sélectionner 3-5 unités max avec diversité + couverture
4) Budget global : on supprime des unités si nécessaire (jamais de troncature)
API publique :
- extractive_summary(article_id, article_text, cfg=None) -> str
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Tuple, Dict, Set
import re
# ==================== CONFIG ====================
@dataclass(frozen=True)
class SynthesisConfig:
# Option A : 5 puces max, aucun tronquage
max_bullets: int = 5
# Budget global (caractères) : on retire des unités si on dépasse
max_total_chars: int = 1500
# Filtres
min_unit_len: int = 35
max_unit_len_soft: int = 900 # si une unité est trop longue, on essaie de la re-split (sans tronquer)
# Diversité : seuil de similarité (0 = jamais similaire, 1 = identique)
max_jaccard_similarity: float = 0.55
# Couverture : essayer de prendre au moins 1 unité de chaque catégorie si possible
enforce_coverage: bool = True
# ==================== TEXT NORMALIZATION ====================
_SPACE_RE = re.compile(r"\s+")
def _norm(s: str) -> str:
return _SPACE_RE.sub(" ", s.strip())
# ==================== DETECT STRUCTURE / ITEMS ====================
_ITEM_RE = re.compile(
r"""^(
(?:[IVX]{1,6}\.) | # I. II. III. IV. ...
(?:\d+°) | # 1° 2° 3° ...
(?:[a-z]\)) | # a) b) c) ...
(?:-\s) # - ...
)""",
re.VERBOSE
)
def _is_structured_item(line: str) -> bool:
return bool(_ITEM_RE.match(line))
# ==================== SPLITTING INTO LEGAL UNITS ====================
_SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")
# Split on semicolons, keeping semantic clauses. We'll keep ";" in the left clause for readability.
_SEMI_SPLIT_RE = re.compile(r"\s*;\s*")
def _merge_wrapped_lines(lines: List[str]) -> List[str]:
"""
Heuristique simple :
- si une ligne n'est pas un item structuré et semble être la continuation de la précédente,
on la concatène.
"""
merged: List[str] = []
for ln in lines:
ln = _norm(ln)
if not ln:
continue
if not merged:
merged.append(ln)
continue
prev = merged[-1]
# Nouveau bloc si item structuré ou si ça ressemble à un titre
if _is_structured_item(ln):
merged.append(ln)
continue
# Si la ligne précédente se termine par ":" ou "—" ou si la nouvelle commence par une minuscule,
# on considère souvent que c'est une continuation.
if prev.endswith((":","—","-")) or (ln and ln[0].islower()):
merged[-1] = _norm(prev + " " + ln)
else:
# Sinon, on garde séparé
merged.append(ln)
return merged
def _split_long_unit(unit: str, cfg: SynthesisConfig) -> List[str]:
"""
Si l'unité est très longue, on essaie de la découper SANS tronquer :
- priorité au ';' (souvent structure juridique)
- sinon, phrases complètes
"""
unit = _norm(unit)
if len(unit) <= cfg.max_unit_len_soft:
return [unit]
# 1) Essai sur ';'
parts = [p.strip() for p in _SEMI_SPLIT_RE.split(unit) if p.strip()]
if len(parts) >= 2:
out: List[str] = []
for i, p in enumerate(parts):
# On remet le ';' entre clauses sauf la dernière
if i < len(parts) - 1 and not p.endswith(";"):
p = p + ";"
out.append(_norm(p))
# Filtre : on ne veut pas de mini-clause vide
out2 = [p for p in out if len(p) >= cfg.min_unit_len]
if out2:
return out2
# 2) Fallback phrases
sents = [s.strip() for s in _SENTENCE_SPLIT_RE.split(unit) if s.strip()]
if len(sents) >= 2:
sents2 = [ _norm(s) for s in sents if len(s) >= cfg.min_unit_len ]
if sents2:
return sents2
# 3) Dernier recours : on garde l'unité entière (même longue). On préfèrera en éliminer au budget.
return [unit]
def _split_into_units(article_text: str, cfg: SynthesisConfig) -> List[str]:
if not article_text:
return []
raw_lines = [ln for ln in article_text.splitlines()]
# enlève les lignes vides
raw_lines = [ln for ln in raw_lines if _norm(ln)]
lines = _merge_wrapped_lines(raw_lines)
units: List[str] = []
for ln in lines:
ln = _norm(ln)
if not ln:
continue
# Si c'est un item structuré, on le garde en tant qu'unité,
# mais on peut le re-split s'il est gigantesque.
if _is_structured_item(ln):
units.extend(_split_long_unit(ln, cfg))
continue
# Sinon, on essaye d'abord de couper par ';' si c'est long
if len(ln) > cfg.max_unit_len_soft:
units.extend(_split_long_unit(ln, cfg))
else:
units.append(ln)
# Filtrage final
cleaned = []
for u in units:
u = _norm(u)
if len(u) < cfg.min_unit_len:
continue
cleaned.append(u)
return cleaned
# ==================== SCORING (NORMATIVE / LEGAL) ====================
# Patterns "durs" (évite le bruit : on NE met PAS "est/sont")
_PATTERNS: Dict[str, List[re.Pattern]] = {
"obligation": [
re.compile(r"\bdoit\b", re.I),
re.compile(r"\bdoivent\b", re.I),
re.compile(r"\best\s+tenu\b", re.I),
re.compile(r"\bsont\s+tenus\b", re.I),
re.compile(r"\best\s+tenu\s+de\b", re.I),
re.compile(r"\bobligatoire\b", re.I),
re.compile(r"\bobligation\b", re.I),
],
"prohibition": [
re.compile(r"\binterdit\b", re.I),
re.compile(r"\best\s+interdit\b", re.I),
re.compile(r"\bil\s+est\s+interdit\b", re.I),
re.compile(r"\bne\s+peut\b", re.I),
re.compile(r"\bne\s+peuvent\b", re.I),
],
"condition": [
re.compile(r"\bsi\b", re.I),
re.compile(r"\blorsque\b", re.I),
re.compile(r"\bà\s+condition\b", re.I),
re.compile(r"\ba\s+condition\b", re.I),
re.compile(r"\ben\s+cas\b", re.I),
re.compile(r"\bdans\s+le\s+cas\b", re.I),
],
"exception": [
re.compile(r"\bsauf\b", re.I),
re.compile(r"\btoutefois\b", re.I),
re.compile(r"\bsous\s+réserve\b", re.I),
re.compile(r"\bnonobstant\b", re.I),
],
"permission": [
re.compile(r"\bpeut\b", re.I),
re.compile(r"\bpeuvent\b", re.I),
],
"structure": [
re.compile(r"^(?:[IVX]{1,6}\.|(?:\d+°)|(?:[a-z]\))|(?:-\s))", re.I),
]
}
_REF_RE = re.compile(r"\b(décret|arrêté|loi|code)\b", re.I)
def _score_unit(u: str) -> Tuple[int, Set[str]]:
"""
Retourne (score, tags)
tags = catégories rencontrées (obligation, prohibition, condition, exception, permission, structure)
"""
score = 0
tags: Set[str] = set()
for tag, pats in _PATTERNS.items():
for p in pats:
if p.search(u):
tags.add(tag)
# pondération simple et justifiable
if tag in ("obligation", "prohibition"):
score += 4
elif tag in ("condition", "exception"):
score += 3
elif tag == "structure":
score += 2
elif tag == "permission":
score += 1
break
if _REF_RE.search(u):
score += 1
# pénalité légère si c'est extrêmement long (sans tronquer, juste pour favoriser concision)
if len(u) > 700:
score -= 1
return score, tags
# ==================== DIVERSITY (ANTI-REDUNDANCY) ====================
_WORD_RE = re.compile(r"[a-zA-ZÀ-ÖØ-öø-ÿ0-9]+")
_STOPWORDS = {
# très minimaliste : juste assez pour éviter le bruit
"le","la","les","un","une","des","du","de","d","à","au","aux","et","ou",
"en","dans","sur","pour","par","avec","sans","ce","cet","cette","ces",
"qui","que","quoi","dont","où","il","elle","ils","elles","on",
"est","sont","été","être","a","ont","avait","avaient","sera","seront",
"ne","pas","plus","moins","se","sa","son","ses","leur","leurs",
}
def _keywords(u: str) -> Set[str]:
toks = [t.lower() for t in _WORD_RE.findall(u)]
return {t for t in toks if len(t) >= 3 and t not in _STOPWORDS}
def _jaccard(a: Set[str], b: Set[str]) -> float:
if not a and not b:
return 0.0
inter = len(a & b)
union = len(a | b)
return inter / union if union else 0.0
# ==================== SELECTION ====================
def _pick_with_coverage(
scored_units: List[Tuple[int, int, str, Set[str], Set[str]]],
cfg: SynthesisConfig
) -> List[Tuple[int, str]]:
"""
scored_units elements: (score, idx, unit_text, tags, keywords)
returns list of (idx, unit_text) selected
"""
# candidates sorted by score desc
candidates = sorted(scored_units, key=lambda x: x[0], reverse=True)
selected: List[Tuple[int, str, Set[str]]] = [] # (idx, text, keywords)
def can_add(kw: Set[str]) -> bool:
for (_, _, kw2) in selected:
if _jaccard(kw, kw2) >= cfg.max_jaccard_similarity:
return False
return True
def add_candidate(cand: Tuple[int, int, str, Set[str], Set[str]]) -> bool:
score, idx, text, tags, kw = cand
if score <= 0:
return False
if not can_add(kw):
return False
selected.append((idx, text, kw))
return True
# 1) Coverage pass (si activé)
if cfg.enforce_coverage:
# catégories prioritaires
targets = [
("obligation",),
("prohibition",),
("exception", "condition"),
("structure",),
]
for group in targets:
if len(selected) >= cfg.max_bullets:
break
for cand in candidates:
score, idx, text, tags, kw = cand
if score <= 0:
continue
if any(t in tags for t in group):
if add_candidate(cand):
break
# 2) Fill remaining by best score
for cand in candidates:
if len(selected) >= cfg.max_bullets:
break
add_candidate(cand)
# sort by original order (idx)
selected_sorted = sorted(((idx, text) for (idx, text, _) in selected), key=lambda x: x[0])
return selected_sorted
def _apply_budget(selected: List[Tuple[int, str]], cfg: SynthesisConfig) -> List[str]:
"""
Applique le budget max_total_chars en supprimant des unités (jamais tronquer).
On supprime en priorité les dernières (car on affiche dans l'ordre).
"""
texts = [t for (_, t) in selected]
# +2 per bullet for "- " and newline approx (rough, OK)
def total_len(ts: List[str]) -> int:
return sum(len(s) for s in ts) + 3 * len(ts)
while texts and total_len(texts) > cfg.max_total_chars:
texts.pop() # remove last
return texts
# ==================== PUBLIC API ====================
def extractive_summary(article_id: str, article_text: str, cfg: SynthesisConfig | None = None) -> str:
cfg = cfg or SynthesisConfig()
units = _split_into_units(article_text, cfg)
if not units:
return f"Synthèse impossible : texte vide ou non exploitable.\n\nArticles cités : {article_id}"
scored_units: List[Tuple[int, int, str, Set[str], Set[str]]] = []
for idx, u in enumerate(units):
score, tags = _score_unit(u)
kw = _keywords(u)
scored_units.append((score, idx, u, tags, kw))
selected = _pick_with_coverage(scored_units, cfg)
# si rien (ex: aucun score > 0), fallback : premières unités (mais toujours non tronquées + ordre)
if not selected:
selected = [(i, units[i]) for i in range(min(cfg.max_bullets, len(units)))]
texts = _apply_budget(selected, cfg)
if not texts:
# Si le budget est trop strict et qu'une seule unité est énorme, on affiche quand même la 1ère
# (option produit : mieux vaut 1 extrait entier que rien)
texts = [selected[0][1]]
out_lines = ["Synthèse (extraits du texte, sans reformulation) :"]
for t in texts:
out_lines.append(f"- {t}")
return "\n".join(out_lines) + f"\n\nArticles cités : {article_id}"