Spaces:
Running
Running
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| import csv | |
| import os | |
| import threading | |
| from itertools import combinations | |
| from datetime import datetime | |
| import fasttext | |
| import fasttext.util | |
| import enchant | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from huggingface_hub.utils import EntryNotFoundError | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SCORES_FILE = "scores.csv" | |
| MODEL_PATH = "cc.fr.300.bin" | |
| N_WORDS = 10 | |
| SCORE_WORDS = 7 | |
| DATASET_REPO = "MendoToto/dat-score" | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") | |
| hf_api = HfApi(token=HF_TOKEN) | |
| csv_lock = threading.Lock() | |
| print("Téléchargement du modèle FastText français...") | |
| fasttext.util.download_model('fr', if_exists='ignore') | |
| print("Chargement du modèle...") | |
| model = fasttext.load_model(MODEL_PATH) | |
| print(f"Modèle chargé. Dimension : {model.get_dimension()}") | |
| dic = enchant.Dict("fr_FR") | |
| def is_valid_word(word: str) -> bool: | |
| return dic.check(word) | |
| # ββ Persistance via HuggingFace Dataset ββββββββββββββββββββββββββββββββββββββ | |
| def init_scores_file(): | |
| """TΓ©lΓ©charge le CSV depuis le dataset HF, ou crΓ©e un fichier vide.""" | |
| try: | |
| path = hf_hub_download( | |
| repo_id=DATASET_REPO, | |
| filename="scores.csv", | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| import shutil | |
| shutil.copy(path, SCORES_FILE) | |
| print("CSV chargΓ© depuis le dataset HF.") | |
| except EntryNotFoundError: | |
| with open(SCORES_FILE, "w", newline="") as f: | |
| csv.writer(f).writerow(["score", "timestamp", "mot1", "mot2", "mot3", "mot4", "mot5", "mot6", "mot7", "mot8", "mot9", "mot10"]) | |
| print("Nouveau CSV créé.") | |
| except Exception as e: | |
| print(f"Impossible de charger le CSV depuis HF : {e}") | |
| if not os.path.exists(SCORES_FILE): | |
| with open(SCORES_FILE, "w", newline="") as f: | |
| csv.writer(f).writerow(["score", "timestamp"]) | |
| def push_scores_to_hf(): | |
| """Upload le CSV local vers le dataset HF.""" | |
| try: | |
| hf_api.upload_file( | |
| path_or_fileobj=SCORES_FILE, | |
| path_in_repo="scores.csv", | |
| repo_id=DATASET_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| except Exception as e: | |
| print(f"Erreur upload HF : {e}") | |
| def save_score(score: float, words: list): | |
| with csv_lock: | |
| with open(SCORES_FILE, "a", newline="") as f: | |
| csv.writer(f).writerow([round(score, 1), datetime.now().strftime("%Y-%m-%d %H:%M")] + words) | |
| push_scores_to_hf() | |
| def load_stats() -> str: | |
| if not os.path.exists(SCORES_FILE): | |
| return "Aucun score enregistrΓ© pour l'instant." | |
| df = pd.read_csv(SCORES_FILE) | |
| if df.empty: | |
| return "Aucun score enregistrΓ© pour l'instant." | |
| n = len(df) | |
| moyenne = df["score"].mean() | |
| mini = df["score"].min() | |
| maxi = df["score"].max() | |
| return ( | |
| f"**{n} participant(s)** β " | |
| f"Moyenne : **{moyenne:.1f}** β " | |
| f"Min : {mini:.1f} β " | |
| f"Max : {maxi:.1f}" | |
| ) | |
| # ββ Calcul ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def normalize(v: np.ndarray) -> np.ndarray: | |
| norm = np.linalg.norm(v) | |
| return v / norm if norm > 0 else v | |
| def cosine_dist(a: np.ndarray, b: np.ndarray) -> float: | |
| # vecteurs dΓ©jΓ normalisΓ©s β produit scalaire suffit | |
| return float(1 - np.dot(a, b)) | |
| def compute_dat_score(emb: np.ndarray) -> float: | |
| emb7 = emb[:SCORE_WORDS] | |
| pairs = list(combinations(range(len(emb7)), 2)) | |
| return float(np.mean([cosine_dist(emb7[i], emb7[j]) for i, j in pairs]) * 100) | |
| def color_for(val: float, vmin: float, vmax: float) -> str: | |
| """Rouge (proche) β jaune β vert (loin)""" | |
| t = (val - vmin) / (vmax - vmin) if vmax > vmin else 0.5 | |
| t = max(0.0, min(1.0, t)) | |
| if t < 0.5: | |
| # rouge β jaune | |
| r, g, b = 220, int(t * 2 * 200), 60 | |
| else: | |
| # jaune β vert | |
| r, g, b = int((1 - (t - 0.5) * 2) * 200), 200, 60 | |
| return f"rgb({r},{g},{b})" | |
| def build_html_matrix(words: list, distances: np.ndarray) -> str: | |
| n = len(words) | |
| # ignorer la diagonale pour min/max | |
| off_diag = [distances[i][j] for i in range(n) for j in range(n) if i != j] | |
| vmin, vmax = min(off_diag), max(off_diag) | |
| rows = [] | |
| # en-tΓͺte | |
| header_cells = ["<th style='background:#f0f0f0;font-size:12px;padding:6px 10px;border:1px solid #ddd;'></th>"] | |
| for w in words: | |
| header_cells.append( | |
| f"<th style='background:#f0f0f0;font-size:12px;padding:6px 10px;" | |
| f"border:1px solid #ddd;text-align:center;'>{w}</th>" | |
| ) | |
| rows.append("<tr>" + "".join(header_cells) + "</tr>") | |
| for i, wi in enumerate(words): | |
| cells = [f"<td style='background:#f0f0f0;font-weight:600;font-size:12px;" | |
| f"padding:6px 10px;border:1px solid #ddd;white-space:nowrap;'>{wi}</td>"] | |
| for j, wj in enumerate(words): | |
| if i == j: | |
| cells.append( | |
| "<td style='background:#e8e8e8;color:#aaa;text-align:center;" | |
| "font-size:12px;padding:6px 10px;border:1px solid #ddd;'>β</td>" | |
| ) | |
| else: | |
| val = distances[i][j] | |
| col = color_for(val, vmin, vmax) | |
| cells.append( | |
| f"<td style='background:{col};text-align:center;font-size:12px;" | |
| f"padding:6px 10px;border:1px solid #ddd;color:#222;" | |
| f"font-weight:500;'>{val*100:.1f}</td>" | |
| ) | |
| rows.append("<tr>" + "".join(cells) + "</tr>") | |
| legend = ( | |
| "<div style='margin-top:10px;font-size:12px;color:#666;display:flex;" | |
| "align-items:center;gap:8px;'>" | |
| "<span>Proche</span>" | |
| "<div style='width:120px;height:12px;border-radius:4px;" | |
| "background:linear-gradient(to right,rgb(220,60,60),rgb(220,200,60),rgb(60,200,60));'></div>" | |
| "<span>ΓloignΓ©</span></div>" | |
| ) | |
| return ( | |
| "<div style='overflow-x:auto;'>" | |
| "<table style='border-collapse:collapse;font-family:monospace;'>" | |
| + "".join(rows) | |
| + "</table>" | |
| + legend | |
| + "</div>" | |
| ) | |
| def score_comment(score: float, mean: float) -> str: | |
| if mean is None: | |
| return "π― Premier participant β revenez aprΓ¨s que d'autres aient jouΓ© pour comparer !" | |
| diff = score - mean | |
| if diff < -10: | |
| return f"π **En dessous de la moyenne du groupe** ({mean:.1f}). Vos mots partagent davantage de contextes sΓ©mantiques." | |
| elif diff < 5: | |
| return f"β **Dans la moyenne du groupe** ({mean:.1f}). Bonne exploration sΓ©mantique." | |
| elif diff < 15: | |
| return f"π **Au-dessus de la moyenne du groupe** ({mean:.1f}). PensΓ©e divergente bien dΓ©veloppΓ©e." | |
| else: | |
| return f"π **Nettement au-dessus de la moyenne du groupe** ({mean:.1f}). Excellent score !" | |
| def top_pairs(emb: np.ndarray, words: list, n: int = 3): | |
| all_pairs = [(i, j, cosine_dist(emb[i], emb[j])) | |
| for i, j in combinations(range(len(words)), 2)] | |
| all_pairs.sort(key=lambda x: -x[2]) | |
| far = "\n".join(f"**{words[i]}** β **{words[j]}** `{d*100:.1f}`" | |
| for i, j, d in all_pairs[:n]) | |
| close = "\n".join(f"**{words[i]}** β **{words[j]}** `{d*100:.1f}`" | |
| for i, j, d in all_pairs[-n:][::-1]) | |
| return far, close | |
| # ββ Fonction principale βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_dat(w1, w2, w3, w4, w5, w6, w7, w8, w9, w10): | |
| words = [w.strip().lower() for w in [w1, w2, w3, w4, w5, w6, w7, w8, w9, w10]] | |
| if any(w == "" for w in words): | |
| return "β Remplissez les 10 mots.", "", None | |
| if len(set(words)) < N_WORDS: | |
| return "β Tous les mots doivent Γͺtre diffΓ©rents.", "", None | |
| invalid = [w for w in words if not is_valid_word(w)] | |
| if invalid: | |
| liste = ", ".join(f"**{w}**" for w in invalid) | |
| return ( | |
| f"β Mot(s) non reconnu(s) en franΓ§ais : {liste}\n\n" | |
| f"Utilisez uniquement des noms communs franΓ§ais valides.", | |
| "", None | |
| ) | |
| # Embeddings normalisΓ©s | |
| emb = np.array([normalize(model.get_word_vector(w)) for w in words]) | |
| # Matrice de distances | |
| n = len(words) | |
| dist_mat = np.zeros((n, n)) | |
| for i, j in combinations(range(n), 2): | |
| d = cosine_dist(emb[i], emb[j]) | |
| dist_mat[i][j] = dist_mat[j][i] = d | |
| score = compute_dat_score(emb) | |
| # RΓ©cupΓ©rer la moyenne actuelle du groupe pour le commentaire | |
| try: | |
| df_tmp = pd.read_csv(SCORES_FILE) | |
| current_mean = df_tmp["score"].mean() if not df_tmp.empty else None | |
| except Exception: | |
| current_mean = None | |
| comment = score_comment(score, current_mean) | |
| far, close = top_pairs(emb, words) | |
| html_matrix = build_html_matrix(words, dist_mat) | |
| save_score(score, words) | |
| stats = load_stats() | |
| result = f"""## Votre score DAT : **{score:.1f} / 100** | |
| *(maximum thΓ©orique : 200)* | |
| {comment} | |
| --- | |
| **π’ Paires les plus Γ©loignΓ©es** | |
| {far} | |
| **π΄ Paires les plus proches** | |
| {close} | |
| """ | |
| return result, html_matrix, stats | |
| def refresh_stats(): | |
| return load_stats() | |
| # ββ Interface βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| init_scores_file() | |
| css = ".center { text-align: center !important; }" | |
| with gr.Blocks(title="DAT β CrΓ©ativitΓ© Divergente", css=css, | |
| theme=gr.themes.Soft()) as demo: | |
| gr.Markdown( | |
| "# π§ Test de CrΓ©ativitΓ© Divergente\n### Divergent Association Task (DAT)", | |
| elem_classes="center" | |
| ) | |
| gr.Markdown( | |
| "Trouvez **10 mots aussi diffΓ©rents que possible** les uns des autres. \n" | |
| "Uniquement des **noms communs**, au singulier. Pas de noms propres ni de termes techniques. \n" | |
| "Travaillez depuis votre mΓ©moire β sans regarder autour de vous.", | |
| elem_classes="center" | |
| ) | |
| gr.Markdown("---\n### Vos 10 mots") | |
| with gr.Row(): | |
| w1 = gr.Textbox(label="Mot 1", placeholder="") | |
| w2 = gr.Textbox(label="Mot 2", placeholder="") | |
| w3 = gr.Textbox(label="Mot 3", placeholder="") | |
| w4 = gr.Textbox(label="Mot 4", placeholder="") | |
| w5 = gr.Textbox(label="Mot 5", placeholder="") | |
| with gr.Row(): | |
| w6 = gr.Textbox(label="Mot 6", placeholder="") | |
| w7 = gr.Textbox(label="Mot 7", placeholder="") | |
| w8 = gr.Textbox(label="Mot 8", placeholder="") | |
| w9 = gr.Textbox(label="Mot 9", placeholder="") | |
| w10 = gr.Textbox(label="Mot 10", placeholder="") | |
| submit_btn = gr.Button("Calculer mon score β", variant="primary", size="lg") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| result_md = gr.Markdown() | |
| matrix_html = gr.HTML(label="Matrice des distances sΓ©mantiques") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Moyenne du groupe") | |
| stats_md = gr.Markdown(value=load_stats()) | |
| refresh_btn = gr.Button("π Actualiser", size="sm") | |
| submit_btn.click( | |
| fn=run_dat, | |
| inputs=[w1, w2, w3, w4, w5, w6, w7, w8, w9, w10], | |
| outputs=[result_md, matrix_html, stats_md] | |
| ) | |
| refresh_btn.click(fn=refresh_stats, outputs=[stats_md]) | |
| gr.Markdown( | |
| "---\n" | |
| "*BasΓ© sur [Olson et al. (2021), PNAS](https://doi.org/10.1073/pnas.2022340118) Β· " | |
| "Modèle : FastText fr (Meta/Wikipedia) · " | |
| "Un outil [Fouloscopie](https://www.mehdimoussaid.com/)*" | |
| ) | |
| demo.launch() |