FabIndy commited on
Commit
f4380aa
·
1 Parent(s): 4a495b8

hotfix: restore synthesis module

Browse files
Files changed (1) hide show
  1. src/synthesis.py +400 -0
src/synthesis.py ADDED
@@ -0,0 +1,400 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ # -*- coding: utf-8 -*-
3
+
4
+ """
5
+ synthesis.py — Mode SYNTHÈSE extractive (déterministe, sans reformulation)
6
+
7
+ But :
8
+ - Produire une synthèse lisible ET juridiquement sûre (zéro hallucination)
9
+ - On ne "rédige" rien : on sélectionne des extraits exacts, non tronqués
10
+ - On affiche les extraits dans l'ordre du texte
11
+
12
+ Stratégie :
13
+ 1) Découper l'article en "unités juridiques" (items I/II/1°/a)/-, sinon clauses ;, sinon phrases)
14
+ 2) Scorer les unités sur des marqueurs normatifs (obligation, interdiction, conditions, exceptions, structure)
15
+ 3) Sélectionner 3-5 unités max avec diversité + couverture
16
+ 4) Budget global : on supprime des unités si nécessaire (jamais de troncature)
17
+
18
+ API publique :
19
+ - extractive_summary(article_id, article_text, cfg=None) -> str
20
+ """
21
+
22
+ from __future__ import annotations
23
+
24
+ from dataclasses import dataclass
25
+ from typing import List, Tuple, Dict, Set
26
+ import re
27
+
28
+
29
+ # ==================== CONFIG ====================
30
+
31
+ @dataclass(frozen=True)
32
+ class SynthesisConfig:
33
+ # Option A : 5 puces max, aucun tronquage
34
+ max_bullets: int = 5
35
+
36
+ # Budget global (caractères) : on retire des unités si on dépasse
37
+ max_total_chars: int = 1500
38
+
39
+ # Filtres
40
+ min_unit_len: int = 35
41
+ max_unit_len_soft: int = 900 # si une unité est trop longue, on essaie de la re-split (sans tronquer)
42
+
43
+ # Diversité : seuil de similarité (0 = jamais similaire, 1 = identique)
44
+ max_jaccard_similarity: float = 0.55
45
+
46
+ # Couverture : essayer de prendre au moins 1 unité de chaque catégorie si possible
47
+ enforce_coverage: bool = True
48
+
49
+
50
+ # ==================== TEXT NORMALIZATION ====================
51
+
52
+ _SPACE_RE = re.compile(r"\s+")
53
+
54
+ def _norm(s: str) -> str:
55
+ return _SPACE_RE.sub(" ", s.strip())
56
+
57
+
58
+ # ==================== DETECT STRUCTURE / ITEMS ====================
59
+
60
+ _ITEM_RE = re.compile(
61
+ r"""^(
62
+ (?:[IVX]{1,6}\.) | # I. II. III. IV. ...
63
+ (?:\d+°) | # 1° 2° 3° ...
64
+ (?:[a-z]\)) | # a) b) c) ...
65
+ (?:-\s) # - ...
66
+ )""",
67
+ re.VERBOSE
68
+ )
69
+
70
+ def _is_structured_item(line: str) -> bool:
71
+ return bool(_ITEM_RE.match(line))
72
+
73
+
74
+ # ==================== SPLITTING INTO LEGAL UNITS ====================
75
+
76
+ _SENTENCE_SPLIT_RE = re.compile(r"(?<=[.!?])\s+")
77
+ # Split on semicolons, keeping semantic clauses. We'll keep ";" in the left clause for readability.
78
+ _SEMI_SPLIT_RE = re.compile(r"\s*;\s*")
79
+
80
+ def _merge_wrapped_lines(lines: List[str]) -> List[str]:
81
+ """
82
+ Heuristique simple :
83
+ - si une ligne n'est pas un item structuré et semble être la continuation de la précédente,
84
+ on la concatène.
85
+ """
86
+ merged: List[str] = []
87
+ for ln in lines:
88
+ ln = _norm(ln)
89
+ if not ln:
90
+ continue
91
+ if not merged:
92
+ merged.append(ln)
93
+ continue
94
+
95
+ prev = merged[-1]
96
+
97
+ # Nouveau bloc si item structuré ou si ça ressemble à un titre
98
+ if _is_structured_item(ln):
99
+ merged.append(ln)
100
+ continue
101
+
102
+ # Si la ligne précédente se termine par ":" ou "—" ou si la nouvelle commence par une minuscule,
103
+ # on considère souvent que c'est une continuation.
104
+ if prev.endswith((":","—","-")) or (ln and ln[0].islower()):
105
+ merged[-1] = _norm(prev + " " + ln)
106
+ else:
107
+ # Sinon, on garde séparé
108
+ merged.append(ln)
109
+
110
+ return merged
111
+
112
+
113
+ def _split_long_unit(unit: str, cfg: SynthesisConfig) -> List[str]:
114
+ """
115
+ Si l'unité est très longue, on essaie de la découper SANS tronquer :
116
+ - priorité au ';' (souvent structure juridique)
117
+ - sinon, phrases complètes
118
+ """
119
+ unit = _norm(unit)
120
+ if len(unit) <= cfg.max_unit_len_soft:
121
+ return [unit]
122
+
123
+ # 1) Essai sur ';'
124
+ parts = [p.strip() for p in _SEMI_SPLIT_RE.split(unit) if p.strip()]
125
+ if len(parts) >= 2:
126
+ out: List[str] = []
127
+ for i, p in enumerate(parts):
128
+ # On remet le ';' entre clauses sauf la dernière
129
+ if i < len(parts) - 1 and not p.endswith(";"):
130
+ p = p + ";"
131
+ out.append(_norm(p))
132
+ # Filtre : on ne veut pas de mini-clause vide
133
+ out2 = [p for p in out if len(p) >= cfg.min_unit_len]
134
+ if out2:
135
+ return out2
136
+
137
+ # 2) Fallback phrases
138
+ sents = [s.strip() for s in _SENTENCE_SPLIT_RE.split(unit) if s.strip()]
139
+ if len(sents) >= 2:
140
+ sents2 = [ _norm(s) for s in sents if len(s) >= cfg.min_unit_len ]
141
+ if sents2:
142
+ return sents2
143
+
144
+ # 3) Dernier recours : on garde l'unité entière (même longue). On préfèrera en éliminer au budget.
145
+ return [unit]
146
+
147
+
148
+ def _split_into_units(article_text: str, cfg: SynthesisConfig) -> List[str]:
149
+ if not article_text:
150
+ return []
151
+
152
+ raw_lines = [ln for ln in article_text.splitlines()]
153
+ # enlève les lignes vides
154
+ raw_lines = [ln for ln in raw_lines if _norm(ln)]
155
+
156
+ lines = _merge_wrapped_lines(raw_lines)
157
+
158
+ units: List[str] = []
159
+
160
+ for ln in lines:
161
+ ln = _norm(ln)
162
+ if not ln:
163
+ continue
164
+
165
+ # Si c'est un item structuré, on le garde en tant qu'unité,
166
+ # mais on peut le re-split s'il est gigantesque.
167
+ if _is_structured_item(ln):
168
+ units.extend(_split_long_unit(ln, cfg))
169
+ continue
170
+
171
+ # Sinon, on essaye d'abord de couper par ';' si c'est long
172
+ if len(ln) > cfg.max_unit_len_soft:
173
+ units.extend(_split_long_unit(ln, cfg))
174
+ else:
175
+ units.append(ln)
176
+
177
+ # Filtrage final
178
+ cleaned = []
179
+ for u in units:
180
+ u = _norm(u)
181
+ if len(u) < cfg.min_unit_len:
182
+ continue
183
+ cleaned.append(u)
184
+
185
+ return cleaned
186
+
187
+
188
+ # ==================== SCORING (NORMATIVE / LEGAL) ====================
189
+
190
+ # Patterns "durs" (évite le bruit : on NE met PAS "est/sont")
191
+ _PATTERNS: Dict[str, List[re.Pattern]] = {
192
+ "obligation": [
193
+ re.compile(r"\bdoit\b", re.I),
194
+ re.compile(r"\bdoivent\b", re.I),
195
+ re.compile(r"\best\s+tenu\b", re.I),
196
+ re.compile(r"\bsont\s+tenus\b", re.I),
197
+ re.compile(r"\best\s+tenu\s+de\b", re.I),
198
+ re.compile(r"\bobligatoire\b", re.I),
199
+ re.compile(r"\bobligation\b", re.I),
200
+ ],
201
+ "prohibition": [
202
+ re.compile(r"\binterdit\b", re.I),
203
+ re.compile(r"\best\s+interdit\b", re.I),
204
+ re.compile(r"\bil\s+est\s+interdit\b", re.I),
205
+ re.compile(r"\bne\s+peut\b", re.I),
206
+ re.compile(r"\bne\s+peuvent\b", re.I),
207
+ ],
208
+ "condition": [
209
+ re.compile(r"\bsi\b", re.I),
210
+ re.compile(r"\blorsque\b", re.I),
211
+ re.compile(r"\bà\s+condition\b", re.I),
212
+ re.compile(r"\ba\s+condition\b", re.I),
213
+ re.compile(r"\ben\s+cas\b", re.I),
214
+ re.compile(r"\bdans\s+le\s+cas\b", re.I),
215
+ ],
216
+ "exception": [
217
+ re.compile(r"\bsauf\b", re.I),
218
+ re.compile(r"\btoutefois\b", re.I),
219
+ re.compile(r"\bsous\s+réserve\b", re.I),
220
+ re.compile(r"\bnonobstant\b", re.I),
221
+ ],
222
+ "permission": [
223
+ re.compile(r"\bpeut\b", re.I),
224
+ re.compile(r"\bpeuvent\b", re.I),
225
+ ],
226
+ "structure": [
227
+ re.compile(r"^(?:[IVX]{1,6}\.|(?:\d+°)|(?:[a-z]\))|(?:-\s))", re.I),
228
+ ]
229
+ }
230
+
231
+ _REF_RE = re.compile(r"\b(décret|arrêté|loi|code)\b", re.I)
232
+
233
+ def _score_unit(u: str) -> Tuple[int, Set[str]]:
234
+ """
235
+ Retourne (score, tags)
236
+ tags = catégories rencontrées (obligation, prohibition, condition, exception, permission, structure)
237
+ """
238
+ score = 0
239
+ tags: Set[str] = set()
240
+
241
+ for tag, pats in _PATTERNS.items():
242
+ for p in pats:
243
+ if p.search(u):
244
+ tags.add(tag)
245
+ # pondération simple et justifiable
246
+ if tag in ("obligation", "prohibition"):
247
+ score += 4
248
+ elif tag in ("condition", "exception"):
249
+ score += 3
250
+ elif tag == "structure":
251
+ score += 2
252
+ elif tag == "permission":
253
+ score += 1
254
+ break
255
+
256
+ if _REF_RE.search(u):
257
+ score += 1
258
+
259
+ # pénalité légère si c'est extrêmement long (sans tronquer, juste pour favoriser concision)
260
+ if len(u) > 700:
261
+ score -= 1
262
+
263
+ return score, tags
264
+
265
+
266
+ # ==================== DIVERSITY (ANTI-REDUNDANCY) ====================
267
+
268
+ _WORD_RE = re.compile(r"[a-zA-ZÀ-ÖØ-öø-ÿ0-9]+")
269
+
270
+ _STOPWORDS = {
271
+ # très minimaliste : juste assez pour éviter le bruit
272
+ "le","la","les","un","une","des","du","de","d","à","au","aux","et","ou",
273
+ "en","dans","sur","pour","par","avec","sans","ce","cet","cette","ces",
274
+ "qui","que","quoi","dont","où","il","elle","ils","elles","on",
275
+ "est","sont","été","être","a","ont","avait","avaient","sera","seront",
276
+ "ne","pas","plus","moins","se","sa","son","ses","leur","leurs",
277
+ }
278
+
279
+ def _keywords(u: str) -> Set[str]:
280
+ toks = [t.lower() for t in _WORD_RE.findall(u)]
281
+ return {t for t in toks if len(t) >= 3 and t not in _STOPWORDS}
282
+
283
+ def _jaccard(a: Set[str], b: Set[str]) -> float:
284
+ if not a and not b:
285
+ return 0.0
286
+ inter = len(a & b)
287
+ union = len(a | b)
288
+ return inter / union if union else 0.0
289
+
290
+
291
+ # ==================== SELECTION ====================
292
+
293
+ def _pick_with_coverage(
294
+ scored_units: List[Tuple[int, int, str, Set[str], Set[str]]],
295
+ cfg: SynthesisConfig
296
+ ) -> List[Tuple[int, str]]:
297
+ """
298
+ scored_units elements: (score, idx, unit_text, tags, keywords)
299
+ returns list of (idx, unit_text) selected
300
+ """
301
+
302
+ # candidates sorted by score desc
303
+ candidates = sorted(scored_units, key=lambda x: x[0], reverse=True)
304
+
305
+ selected: List[Tuple[int, str, Set[str]]] = [] # (idx, text, keywords)
306
+
307
+ def can_add(kw: Set[str]) -> bool:
308
+ for (_, _, kw2) in selected:
309
+ if _jaccard(kw, kw2) >= cfg.max_jaccard_similarity:
310
+ return False
311
+ return True
312
+
313
+ def add_candidate(cand: Tuple[int, int, str, Set[str], Set[str]]) -> bool:
314
+ score, idx, text, tags, kw = cand
315
+ if score <= 0:
316
+ return False
317
+ if not can_add(kw):
318
+ return False
319
+ selected.append((idx, text, kw))
320
+ return True
321
+
322
+ # 1) Coverage pass (si activé)
323
+ if cfg.enforce_coverage:
324
+ # catégories prioritaires
325
+ targets = [
326
+ ("obligation",),
327
+ ("prohibition",),
328
+ ("exception", "condition"),
329
+ ("structure",),
330
+ ]
331
+ for group in targets:
332
+ if len(selected) >= cfg.max_bullets:
333
+ break
334
+ for cand in candidates:
335
+ score, idx, text, tags, kw = cand
336
+ if score <= 0:
337
+ continue
338
+ if any(t in tags for t in group):
339
+ if add_candidate(cand):
340
+ break
341
+
342
+ # 2) Fill remaining by best score
343
+ for cand in candidates:
344
+ if len(selected) >= cfg.max_bullets:
345
+ break
346
+ add_candidate(cand)
347
+
348
+ # sort by original order (idx)
349
+ selected_sorted = sorted(((idx, text) for (idx, text, _) in selected), key=lambda x: x[0])
350
+ return selected_sorted
351
+
352
+
353
+ def _apply_budget(selected: List[Tuple[int, str]], cfg: SynthesisConfig) -> List[str]:
354
+ """
355
+ Applique le budget max_total_chars en supprimant des unités (jamais tronquer).
356
+ On supprime en priorité les dernières (car on affiche dans l'ordre).
357
+ """
358
+ texts = [t for (_, t) in selected]
359
+ # +2 per bullet for "- " and newline approx (rough, OK)
360
+ def total_len(ts: List[str]) -> int:
361
+ return sum(len(s) for s in ts) + 3 * len(ts)
362
+
363
+ while texts and total_len(texts) > cfg.max_total_chars:
364
+ texts.pop() # remove last
365
+ return texts
366
+
367
+
368
+ # ==================== PUBLIC API ====================
369
+
370
+ def extractive_summary(article_id: str, article_text: str, cfg: SynthesisConfig | None = None) -> str:
371
+ cfg = cfg or SynthesisConfig()
372
+
373
+ units = _split_into_units(article_text, cfg)
374
+
375
+ if not units:
376
+ return f"Synthèse impossible : texte vide ou non exploitable.\n\nArticles cités : {article_id}"
377
+
378
+ scored_units: List[Tuple[int, int, str, Set[str], Set[str]]] = []
379
+ for idx, u in enumerate(units):
380
+ score, tags = _score_unit(u)
381
+ kw = _keywords(u)
382
+ scored_units.append((score, idx, u, tags, kw))
383
+
384
+ selected = _pick_with_coverage(scored_units, cfg)
385
+
386
+ # si rien (ex: aucun score > 0), fallback : premières unités (mais toujours non tronquées + ordre)
387
+ if not selected:
388
+ selected = [(i, units[i]) for i in range(min(cfg.max_bullets, len(units)))]
389
+
390
+ texts = _apply_budget(selected, cfg)
391
+ if not texts:
392
+ # Si le budget est trop strict et qu'une seule unité est énorme, on affiche quand même la 1ère
393
+ # (option produit : mieux vaut 1 extrait entier que rien)
394
+ texts = [selected[0][1]]
395
+
396
+ out_lines = ["Synthèse (extraits du texte, sans reformulation) :"]
397
+ for t in texts:
398
+ out_lines.append(f"- {t}")
399
+
400
+ return "\n".join(out_lines) + f"\n\nArticles cités : {article_id}"