| """ |
| AgrΓ©gation de mindmaps - Version OPTIMISΓE VITESSE |
| """ |
|
|
| import os |
| import json |
| import numpy as np |
| import logging |
| import re |
| from rapidfuzz import fuzz |
| from sentence_transformers import util |
| import torch |
|
|
| |
| logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') |
|
|
| |
| _semantic_model = None |
|
|
| def get_semantic_model(): |
| global _semantic_model |
| if _semantic_model is None: |
| from sentence_transformers import SentenceTransformer |
| _semantic_model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2') |
| _semantic_model.eval() |
| return _semantic_model |
|
|
|
|
| def smart_normalize(label): |
| """ Normalisation rapide + gestion OOP/OOPS/Systems""" |
| import unicodedata |
|
|
| original = label |
|
|
| |
| normalized = label.lower().strip() |
| normalized = ''.join( |
| c for c in unicodedata.normalize('NFD', normalized) |
| if unicodedata.category(c) != 'Mn' |
| ) |
|
|
| |
| normalized = re.sub(r'[-_/.]', ' ', normalized) |
| normalized = re.sub(r'[^\w\s]', '', normalized) |
| normalized = re.sub(r'\s+', ' ', normalized).strip() |
|
|
| |
| |
| |
|
|
| |
| if normalized == 'oops': |
| normalized = 'oop' |
| logging.info(f" π― '{original}' β 'oop' (OOPS dΓ©tectΓ©)") |
|
|
| |
| |
| if 'object' in normalized and 'oriented' in normalized and 'programming' in normalized: |
| |
| normalized = re.sub(r'\s+systems?\s*$', '', normalized) |
|
|
| if 'system' in original.lower(): |
| logging.info(f" π― '{original}' β '{normalized}' (Systems enlevΓ©)") |
|
|
| |
| |
| |
| words = [] |
| for word in normalized.split(): |
| if len(word) > 3 and word.endswith('s') and not word.endswith('ss'): |
| words.append(word[:-1]) |
| else: |
| words.append(word) |
|
|
| normalized = ' '.join(words) |
|
|
| |
| |
| |
| normalized = (normalized |
| .replace('orientee', 'oriented') |
| .replace('oriente', 'oriented') |
| .replace('programmation', 'programming') |
| .replace('objet', 'object') |
| .replace('systeme', 'system')) |
|
|
| |
| if normalized != original.lower().strip() and normalized != 'oop': |
| logging.info(f" π '{original}' β '{normalized}'") |
|
|
| return normalized |
|
|
|
|
| def fast_acronym_check(short, long): |
| """β
VΓ©rification ultra-rapide d'acronyme + OOPS""" |
| short_clean = short.replace(' ', '') |
| long_words = long.split() |
|
|
| |
| |
| |
| if short_clean == 'oops' or short_clean == 'oop': |
| |
| if all(word in long for word in ['object', 'oriented', 'programming']): |
| return True |
|
|
| |
| if short_clean in long.replace(' ', ''): |
| return True |
|
|
| |
| if len(short_clean) > 6: |
| return False |
|
|
| |
| acronym = ''.join(w[0] for w in long_words if w)[:len(short_clean)] |
|
|
| |
| if short_clean == acronym: |
| return True |
|
|
| |
| if short_clean == 'oops': |
| |
| if acronym == 'oop' or acronym + 's' == 'oops': |
| return True |
|
|
| return short_clean == acronym |
|
|
| def cluster_labels_fast(labels, similarity_threshold=75): |
| """ |
| β‘ VERSION ULTRA-RAPIDE |
| |
| OPTIMISATIONS : |
| 1. PrΓ©-groupement par normalisation (rΓ©duit N) |
| 2. Calcul batch de TOUTE la matrice sΓ©mantique EN UNE FOIS |
| 3. Fuzzy/acronyme UNIQUEMENT sur les candidats proches |
| """ |
| if not labels: |
| return {} |
|
|
| import time |
| start_total = time.time() |
|
|
| logging.info(f"β‘ Clustering rapide de {len(labels)} labels...") |
|
|
| |
| |
| |
| normalized_groups = {} |
|
|
| for label in labels: |
| norm = smart_normalize(label) |
| if norm not in normalized_groups: |
| normalized_groups[norm] = [] |
| normalized_groups[norm].append(label) |
|
|
| logging.info(f" π¦ PrΓ©-groupement: {len(labels)} β {len(normalized_groups)}") |
|
|
| representative_labels = [members[0] for members in normalized_groups.values()] |
| normalized_reps = [smart_normalize(lbl) for lbl in representative_labels] |
| n = len(representative_labels) |
|
|
| |
| |
| |
| start_embed = time.time() |
|
|
| with torch.no_grad(): |
| all_embeddings = get_semantic_model().encode( |
| normalized_reps, |
| convert_to_tensor=True, |
| batch_size=64, |
| show_progress_bar=False |
| ) |
|
|
| |
| semantic_matrix = util.cos_sim(all_embeddings, all_embeddings).cpu().numpy() * 100 |
|
|
| embed_time = time.time() - start_embed |
| logging.info(f" β‘ Matrice sΓ©mantique calculΓ©e en {embed_time:.2f}s") |
|
|
| |
| |
| |
| start_fuzzy = time.time() |
|
|
| similarity_matrix = semantic_matrix.copy() |
|
|
| |
| candidates = np.argwhere(semantic_matrix > 60) |
|
|
| for i, j in candidates: |
| if i >= j: |
| continue |
|
|
| norm_i = normalized_reps[i] |
| norm_j = normalized_reps[j] |
|
|
| |
| bonus = 0 |
| if fast_acronym_check(norm_i, norm_j) or fast_acronym_check(norm_j, norm_i): |
| bonus = 30 |
|
|
| |
| if similarity_matrix[i, j] < 100: |
| fuzzy_score = fuzz.ratio(norm_i, norm_j) |
|
|
| |
| hybrid = (0.6 * semantic_matrix[i, j]) + (0.4 * fuzzy_score) + bonus |
| hybrid = min(100, hybrid) |
|
|
| similarity_matrix[i, j] = hybrid |
| similarity_matrix[j, i] = hybrid |
|
|
| fuzzy_time = time.time() - start_fuzzy |
| logging.info(f" β‘ Bonifications calculΓ©es en {fuzzy_time:.2f}s ({len(candidates)} paires)") |
|
|
| |
| |
| |
| clusters = {} |
| assigned = set() |
|
|
| for i, label in enumerate(representative_labels): |
| if label in assigned: |
| continue |
|
|
| similar_indices = np.where(similarity_matrix[i] >= similarity_threshold)[0] |
| cluster_members = [] |
|
|
| for idx in similar_indices: |
| rep_label = representative_labels[idx] |
| norm = smart_normalize(rep_label) |
| cluster_members.extend(normalized_groups[norm]) |
|
|
| canonical = label |
| clusters[canonical] = cluster_members |
| assigned.update(cluster_members) |
|
|
| if len(cluster_members) > 1: |
| logging.info(f" π '{canonical}' β {len(cluster_members)} labels") |
|
|
| total_time = time.time() - start_total |
| logging.info(f"β
Clustering terminΓ© en {total_time:.2f}s ({len(labels)} β {len(clusters)})") |
|
|
| return clusters |
|
|
|
|
| def aggregate_mindmaps(json_paths, output_json_path, threshold=0.5, |
| fuzzy_threshold=70, semantic_threshold=70): |
| """β‘ VERSION RAPIDE""" |
| import time |
| start = time.time() |
|
|
| logging.info("=" * 70) |
| logging.info(f"β‘ AGRΓGATION RAPIDE") |
| logging.info(f"π Fichiers: {len(json_paths)}") |
| logging.info("=" * 70) |
|
|
| hybrid_threshold = (0.6 * semantic_threshold) + (0.4 * fuzzy_threshold) |
|
|
| raw_graphs = [] |
| all_labels = [] |
|
|
| |
| for i, path in enumerate(json_paths, 1): |
| try: |
| with open(path, 'r', encoding='utf-8') as f: |
| g = json.load(f) |
| raw_graphs.append(g) |
|
|
| for n in g["nodes"]: |
| all_labels.append(n["label"]) |
|
|
| logging.info(f"β
[{i}/{len(json_paths)}] {os.path.basename(path)}") |
| except Exception as e: |
| logging.error(f"β {path}: {e}") |
| continue |
|
|
| if not raw_graphs: |
| return {"nodes": [], "edges": []} |
|
|
| N = len(raw_graphs) |
|
|
| |
| clusters = cluster_labels_fast(all_labels, hybrid_threshold) |
|
|
| label_to_canonical = {} |
| for canonical, members in clusters.items(): |
| for member in members: |
| label_to_canonical[member] = canonical |
|
|
| |
| canonical_labels = sorted(set(label_to_canonical.values())) |
| idx = {lbl: i for i, lbl in enumerate(canonical_labels)} |
|
|
| node_counts = np.zeros(len(canonical_labels), dtype=int) |
| edge_counts = np.zeros((len(canonical_labels), len(canonical_labels)), dtype=int) |
|
|
| for g in raw_graphs: |
| seen = set() |
| for n in g["nodes"]: |
| canonical = label_to_canonical.get(n["label"], n["label"]) |
| seen.add(canonical) |
|
|
| for canonical in seen: |
| if canonical in idx: |
| node_counts[idx[canonical]] += 1 |
|
|
| for e in g["edges"]: |
| try: |
| src_label = next(n["label"] for n in g["nodes"] if n["id"] == e["source"]) |
| tgt_label = next(n["label"] for n in g["nodes"] if n["id"] == e["target"]) |
|
|
| src_canonical = label_to_canonical.get(src_label, src_label) |
| tgt_canonical = label_to_canonical.get(tgt_label, tgt_label) |
|
|
| if src_canonical in idx and tgt_canonical in idx: |
| i, j = idx[src_canonical], idx[tgt_canonical] |
| edge_counts[i, j] += 1 |
| except StopIteration: |
| continue |
|
|
| |
| node_freq = node_counts / N |
| edge_freq = edge_counts / N |
|
|
| agg_nodes = [] |
|
|
| for i, canonical in enumerate(canonical_labels): |
| freq = round(float(node_freq[i]) * 100, 2) |
| agg_nodes.append({ |
| "id": canonical, |
| "label": f"{canonical} ({freq}%)", |
| "freq": freq |
| }) |
|
|
| agg_edges = [] |
|
|
| for i, src in enumerate(canonical_labels): |
| for j, tgt in enumerate(canonical_labels): |
| f = float(edge_freq[i, j]) |
| if f > 0: |
| agg_edges.append({ |
| "source": src, |
| "target": tgt, |
| "freq": round(f * 100, 2), |
| "label": f"{round(f * 100, 2)}%" |
| }) |
|
|
| aggregated = { |
| "metadata": { |
| "doc_count": N, |
| "threshold": int(threshold * 100), |
| "min_freq": round(float(node_freq.min()) * 100, 2), |
| "max_freq": round(float(node_freq.max()) * 100, 2) |
| }, |
| "nodes": agg_nodes, |
| "edges": agg_edges |
| } |
|
|
| os.makedirs(os.path.dirname(output_json_path) or ".", exist_ok=True) |
| with open(output_json_path, "w", encoding="utf-8") as f: |
| json.dump(aggregated, f, indent=2, ensure_ascii=False) |
|
|
| total_time = time.time() - start |
| logging.info("=" * 70) |
| logging.info(f"β
TERMINΓ en {total_time:.2f}s") |
| logging.info(f"π {len(all_labels)} β {len(canonical_labels)} labels") |
| logging.info(f"πΎ {output_json_path}") |
| logging.info("=" * 70) |
|
|
| return aggregated |