""" Agrégation de mindmaps - Version OPTIMISÉE VITESSE """ import os import json import numpy as np import logging import re from rapidfuzz import fuzz from sentence_transformers import util import torch # Configuration logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') # Modèle sémantique (lazy loading) _semantic_model = None def get_semantic_model(): global _semantic_model if _semantic_model is None: from sentence_transformers import SentenceTransformer _semantic_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') _semantic_model.eval() return _semantic_model def smart_normalize(label): """ Normalisation rapide + gestion OOP/OOPS/Systems""" import unicodedata original = label # Minuscules + accents normalized = label.lower().strip() normalized = ''.join( c for c in unicodedata.normalize('NFD', normalized) if unicodedata.category(c) != 'Mn' ) # Ponctuation → espaces normalized = re.sub(r'[-_/.]', ' ', normalized) normalized = re.sub(r'[^\w\s]', '', normalized) normalized = re.sub(r'\s+', ' ', normalized).strip() # ═══════════════════════════════════════════════════════════════ # ✅ NOUVEAU : Traitement spécial OOP # ═══════════════════════════════════════════════════════════════ # 1. OOPS → OOP (avant le traitement des pluriels) if normalized == 'oops': normalized = 'oop' logging.info(f" 🎯 '{original}' → 'oop' (OOPS détecté)") # 2. "Object Oriented Programming System(s)" → "Object Oriented Programming" # Enlever "system" et "systems" après "programming" if 'object' in normalized and 'oriented' in normalized and 'programming' in normalized: # Enlever "system" ou "systems" à la fin normalized = re.sub(r'\s+systems?\s*$', '', normalized) if 'system' in original.lower(): logging.info(f" 🎯 '{original}' → '{normalized}' (Systems enlevé)") # ═══════════════════════════════════════════════════════════════ # Pluriels simples (APRÈS le traitement OOP) # ═══════════════════════════════════════════════════════════════ words = [] for word in normalized.split(): if len(word) > 3 and word.endswith('s') and not word.endswith('ss'): words.append(word[:-1]) else: words.append(word) normalized = ' '.join(words) # ═══════════════════════════════════════════════════════════════ # Transformations FR→EN # ═══════════════════════════════════════════════════════════════ normalized = (normalized .replace('orientee', 'oriented') .replace('oriente', 'oriented') .replace('programmation', 'programming') .replace('objet', 'object') .replace('systeme', 'system')) # ✅ LOG si transformation significative if normalized != original.lower().strip() and normalized != 'oop': logging.info(f" 🔄 '{original}' → '{normalized}'") return normalized def fast_acronym_check(short, long): """✅ Vérification ultra-rapide d'acronyme + OOPS""" short_clean = short.replace(' ', '') long_words = long.split() # ═══════════════════════════════════════════════════════════════ # ✅ CAS SPÉCIAL : OOPS = OOP + System # ═══════════════════════════════════════════════════════════════ if short_clean == 'oops' or short_clean == 'oop': # Vérifier si c'est une forme de "object oriented programming" if all(word in long for word in ['object', 'oriented', 'programming']): return True # Si court contenu dans long if short_clean in long.replace(' ', ''): return True # Si trop long pour être un acronyme if len(short_clean) > 6: return False # Acronyme des premières lettres acronym = ''.join(w[0] for w in long_words if w)[:len(short_clean)] # Match exact if short_clean == acronym: return True # ✅ NOUVEAU : OOPS = OOP + dernière lettre de "system" if short_clean == 'oops': # Vérifier si l'acronyme + 's' match if acronym == 'oop' or acronym + 's' == 'oops': return True return short_clean == acronym def cluster_labels_fast(labels, similarity_threshold=75): """ ⚡ VERSION ULTRA-RAPIDE OPTIMISATIONS : 1. Pré-groupement par normalisation (réduit N) 2. Calcul batch de TOUTE la matrice sémantique EN UNE FOIS 3. Fuzzy/acronyme UNIQUEMENT sur les candidats proches """ if not labels: return {} import time start_total = time.time() logging.info(f"⚡ Clustering rapide de {len(labels)} labels...") # ═══════════════════════════════════════════════════════════════ # ÉTAPE 1 : Pré-groupement (réduit drastiquement N) # ═══════════════════════════════════════════════════════════════ normalized_groups = {} for label in labels: norm = smart_normalize(label) if norm not in normalized_groups: normalized_groups[norm] = [] normalized_groups[norm].append(label) logging.info(f" 📦 Pré-groupement: {len(labels)} → {len(normalized_groups)}") representative_labels = [members[0] for members in normalized_groups.values()] normalized_reps = [smart_normalize(lbl) for lbl in representative_labels] n = len(representative_labels) # ═══════════════════════════════════════════════════════════════ # ÉTAPE 2 : Calcul BATCH de toute la matrice sémantique # ═══════════════════════════════════════════════════════════════ start_embed = time.time() with torch.no_grad(): all_embeddings = get_semantic_model().encode( normalized_reps, convert_to_tensor=True, batch_size=64, # ← Batch plus grand show_progress_bar=False ) # ⚡ CLEF : Calcul de TOUTE la matrice en une seule opération semantic_matrix = util.cos_sim(all_embeddings, all_embeddings).cpu().numpy() * 100 embed_time = time.time() - start_embed logging.info(f" ⚡ Matrice sémantique calculée en {embed_time:.2f}s") # ═══════════════════════════════════════════════════════════════ # ÉTAPE 3 : Bonification avec fuzzy/acronyme UNIQUEMENT si proche # ═══════════════════════════════════════════════════════════════ start_fuzzy = time.time() similarity_matrix = semantic_matrix.copy() # Seulement pour les paires avec score sémantique > 60% candidates = np.argwhere(semantic_matrix > 60) for i, j in candidates: if i >= j: # Éviter doublons continue norm_i = normalized_reps[i] norm_j = normalized_reps[j] # Bonus acronyme bonus = 0 if fast_acronym_check(norm_i, norm_j) or fast_acronym_check(norm_j, norm_i): bonus = 30 # Fuzzy (seulement si pas déjà 100%) if similarity_matrix[i, j] < 100: fuzzy_score = fuzz.ratio(norm_i, norm_j) # Combinaison hybrid = (0.6 * semantic_matrix[i, j]) + (0.4 * fuzzy_score) + bonus hybrid = min(100, hybrid) similarity_matrix[i, j] = hybrid similarity_matrix[j, i] = hybrid fuzzy_time = time.time() - start_fuzzy logging.info(f" ⚡ Bonifications calculées en {fuzzy_time:.2f}s ({len(candidates)} paires)") # ═══════════════════════════════════════════════════════════════ # ÉTAPE 4 : Clustering par seuil # ═══════════════════════════════════════════════════════════════ clusters = {} assigned = set() for i, label in enumerate(representative_labels): if label in assigned: continue similar_indices = np.where(similarity_matrix[i] >= similarity_threshold)[0] cluster_members = [] for idx in similar_indices: rep_label = representative_labels[idx] norm = smart_normalize(rep_label) cluster_members.extend(normalized_groups[norm]) canonical = label clusters[canonical] = cluster_members assigned.update(cluster_members) if len(cluster_members) > 1: logging.info(f" 🔗 '{canonical}' ← {len(cluster_members)} labels") total_time = time.time() - start_total logging.info(f"✅ Clustering terminé en {total_time:.2f}s ({len(labels)} → {len(clusters)})") return clusters def aggregate_mindmaps(json_paths, output_json_path, threshold=0.5, fuzzy_threshold=70, semantic_threshold=70): """⚡ VERSION RAPIDE""" import time start = time.time() logging.info("=" * 70) logging.info(f"⚡ AGRÉGATION RAPIDE") logging.info(f"📁 Fichiers: {len(json_paths)}") logging.info("=" * 70) hybrid_threshold = (0.6 * semantic_threshold) + (0.4 * fuzzy_threshold) raw_graphs = [] all_labels = [] # Charger for i, path in enumerate(json_paths, 1): try: with open(path, 'r', encoding='utf-8') as f: g = json.load(f) raw_graphs.append(g) for n in g["nodes"]: all_labels.append(n["label"]) logging.info(f"✅ [{i}/{len(json_paths)}] {os.path.basename(path)}") except Exception as e: logging.error(f"❌ {path}: {e}") continue if not raw_graphs: return {"nodes": [], "edges": []} N = len(raw_graphs) # ⚡ Clustering rapide clusters = cluster_labels_fast(all_labels, hybrid_threshold) label_to_canonical = {} for canonical, members in clusters.items(): for member in members: label_to_canonical[member] = canonical # Compter canonical_labels = sorted(set(label_to_canonical.values())) idx = {lbl: i for i, lbl in enumerate(canonical_labels)} node_counts = np.zeros(len(canonical_labels), dtype=int) edge_counts = np.zeros((len(canonical_labels), len(canonical_labels)), dtype=int) for g in raw_graphs: seen = set() for n in g["nodes"]: canonical = label_to_canonical.get(n["label"], n["label"]) seen.add(canonical) for canonical in seen: if canonical in idx: node_counts[idx[canonical]] += 1 for e in g["edges"]: try: src_label = next(n["label"] for n in g["nodes"] if n["id"] == e["source"]) tgt_label = next(n["label"] for n in g["nodes"] if n["id"] == e["target"]) src_canonical = label_to_canonical.get(src_label, src_label) tgt_canonical = label_to_canonical.get(tgt_label, tgt_label) if src_canonical in idx and tgt_canonical in idx: i, j = idx[src_canonical], idx[tgt_canonical] edge_counts[i, j] += 1 except StopIteration: continue # JSON node_freq = node_counts / N edge_freq = edge_counts / N agg_nodes = [] for i, canonical in enumerate(canonical_labels): freq = round(float(node_freq[i]) * 100, 2) agg_nodes.append({ "id": canonical, "label": f"{canonical} ({freq}%)", "freq": freq }) agg_edges = [] for i, src in enumerate(canonical_labels): for j, tgt in enumerate(canonical_labels): f = float(edge_freq[i, j]) if f > 0: agg_edges.append({ "source": src, "target": tgt, "freq": round(f * 100, 2), "label": f"{round(f * 100, 2)}%" }) aggregated = { "metadata": { "doc_count": N, "threshold": int(threshold * 100), "min_freq": round(float(node_freq.min()) * 100, 2), "max_freq": round(float(node_freq.max()) * 100, 2) }, "nodes": agg_nodes, "edges": agg_edges } os.makedirs(os.path.dirname(output_json_path) or ".", exist_ok=True) with open(output_json_path, "w", encoding="utf-8") as f: json.dump(aggregated, f, indent=2, ensure_ascii=False) total_time = time.time() - start logging.info("=" * 70) logging.info(f"✅ TERMINÉ en {total_time:.2f}s") logging.info(f"📊 {len(all_labels)} → {len(canonical_labels)} labels") logging.info(f"💾 {output_json_path}") logging.info("=" * 70) return aggregated