creativite / app.py
Mehdi MOUSSAID
Update app.py
38d0f27 verified
import gradio as gr
import numpy as np
import pandas as pd
import csv
import os
import threading
from itertools import combinations
from datetime import datetime
import fasttext
import fasttext.util
import enchant
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.utils import EntryNotFoundError
# ── Config ────────────────────────────────────────────────────────────────────
SCORES_FILE = "scores.csv"
MODEL_PATH = "cc.fr.300.bin"
N_WORDS = 10
SCORE_WORDS = 7
DATASET_REPO = "MendoToto/dat-score"
HF_TOKEN = os.environ.get("HF_TOKEN", "")
hf_api = HfApi(token=HF_TOKEN)
csv_lock = threading.Lock()
print("Téléchargement du modèle FastText français...")
fasttext.util.download_model('fr', if_exists='ignore')
print("Chargement du modèle...")
model = fasttext.load_model(MODEL_PATH)
print(f"Modèle chargé. Dimension : {model.get_dimension()}")
dic = enchant.Dict("fr_FR")
def is_valid_word(word: str) -> bool:
return dic.check(word)
# ── Persistance via HuggingFace Dataset ──────────────────────────────────────
def init_scores_file():
"""TΓ©lΓ©charge le CSV depuis le dataset HF, ou crΓ©e un fichier vide."""
try:
path = hf_hub_download(
repo_id=DATASET_REPO,
filename="scores.csv",
repo_type="dataset",
token=HF_TOKEN
)
import shutil
shutil.copy(path, SCORES_FILE)
print("CSV chargΓ© depuis le dataset HF.")
except EntryNotFoundError:
with open(SCORES_FILE, "w", newline="") as f:
csv.writer(f).writerow(["score", "timestamp", "mot1", "mot2", "mot3", "mot4", "mot5", "mot6", "mot7", "mot8", "mot9", "mot10"])
print("Nouveau CSV créé.")
except Exception as e:
print(f"Impossible de charger le CSV depuis HF : {e}")
if not os.path.exists(SCORES_FILE):
with open(SCORES_FILE, "w", newline="") as f:
csv.writer(f).writerow(["score", "timestamp"])
def push_scores_to_hf():
"""Upload le CSV local vers le dataset HF."""
try:
hf_api.upload_file(
path_or_fileobj=SCORES_FILE,
path_in_repo="scores.csv",
repo_id=DATASET_REPO,
repo_type="dataset",
token=HF_TOKEN
)
except Exception as e:
print(f"Erreur upload HF : {e}")
def save_score(score: float, words: list):
with csv_lock:
with open(SCORES_FILE, "a", newline="") as f:
csv.writer(f).writerow([round(score, 1), datetime.now().strftime("%Y-%m-%d %H:%M")] + words)
push_scores_to_hf()
def load_stats() -> str:
if not os.path.exists(SCORES_FILE):
return "Aucun score enregistrΓ© pour l'instant."
df = pd.read_csv(SCORES_FILE)
if df.empty:
return "Aucun score enregistrΓ© pour l'instant."
n = len(df)
moyenne = df["score"].mean()
mini = df["score"].min()
maxi = df["score"].max()
return (
f"**{n} participant(s)** β€” "
f"Moyenne : **{moyenne:.1f}** β€” "
f"Min : {mini:.1f} β€” "
f"Max : {maxi:.1f}"
)
# ── Calcul ────────────────────────────────────────────────────────────────────
def normalize(v: np.ndarray) -> np.ndarray:
norm = np.linalg.norm(v)
return v / norm if norm > 0 else v
def cosine_dist(a: np.ndarray, b: np.ndarray) -> float:
# vecteurs dΓ©jΓ  normalisΓ©s β†’ produit scalaire suffit
return float(1 - np.dot(a, b))
def compute_dat_score(emb: np.ndarray) -> float:
emb7 = emb[:SCORE_WORDS]
pairs = list(combinations(range(len(emb7)), 2))
return float(np.mean([cosine_dist(emb7[i], emb7[j]) for i, j in pairs]) * 100)
def color_for(val: float, vmin: float, vmax: float) -> str:
"""Rouge (proche) β†’ jaune β†’ vert (loin)"""
t = (val - vmin) / (vmax - vmin) if vmax > vmin else 0.5
t = max(0.0, min(1.0, t))
if t < 0.5:
# rouge β†’ jaune
r, g, b = 220, int(t * 2 * 200), 60
else:
# jaune β†’ vert
r, g, b = int((1 - (t - 0.5) * 2) * 200), 200, 60
return f"rgb({r},{g},{b})"
def build_html_matrix(words: list, distances: np.ndarray) -> str:
n = len(words)
# ignorer la diagonale pour min/max
off_diag = [distances[i][j] for i in range(n) for j in range(n) if i != j]
vmin, vmax = min(off_diag), max(off_diag)
rows = []
# en-tΓͺte
header_cells = ["<th style='background:#f0f0f0;font-size:12px;padding:6px 10px;border:1px solid #ddd;'></th>"]
for w in words:
header_cells.append(
f"<th style='background:#f0f0f0;font-size:12px;padding:6px 10px;"
f"border:1px solid #ddd;text-align:center;'>{w}</th>"
)
rows.append("<tr>" + "".join(header_cells) + "</tr>")
for i, wi in enumerate(words):
cells = [f"<td style='background:#f0f0f0;font-weight:600;font-size:12px;"
f"padding:6px 10px;border:1px solid #ddd;white-space:nowrap;'>{wi}</td>"]
for j, wj in enumerate(words):
if i == j:
cells.append(
"<td style='background:#e8e8e8;color:#aaa;text-align:center;"
"font-size:12px;padding:6px 10px;border:1px solid #ddd;'>β€”</td>"
)
else:
val = distances[i][j]
col = color_for(val, vmin, vmax)
cells.append(
f"<td style='background:{col};text-align:center;font-size:12px;"
f"padding:6px 10px;border:1px solid #ddd;color:#222;"
f"font-weight:500;'>{val*100:.1f}</td>"
)
rows.append("<tr>" + "".join(cells) + "</tr>")
legend = (
"<div style='margin-top:10px;font-size:12px;color:#666;display:flex;"
"align-items:center;gap:8px;'>"
"<span>Proche</span>"
"<div style='width:120px;height:12px;border-radius:4px;"
"background:linear-gradient(to right,rgb(220,60,60),rgb(220,200,60),rgb(60,200,60));'></div>"
"<span>Γ‰loignΓ©</span></div>"
)
return (
"<div style='overflow-x:auto;'>"
"<table style='border-collapse:collapse;font-family:monospace;'>"
+ "".join(rows)
+ "</table>"
+ legend
+ "</div>"
)
def score_comment(score: float, mean: float) -> str:
if mean is None:
return "🎯 Premier participant β€” revenez aprΓ¨s que d'autres aient jouΓ© pour comparer !"
diff = score - mean
if diff < -10:
return f"πŸ“Š **En dessous de la moyenne du groupe** ({mean:.1f}). Vos mots partagent davantage de contextes sΓ©mantiques."
elif diff < 5:
return f"βœ… **Dans la moyenne du groupe** ({mean:.1f}). Bonne exploration sΓ©mantique."
elif diff < 15:
return f"🌟 **Au-dessus de la moyenne du groupe** ({mean:.1f}). Pensée divergente bien développée."
else:
return f"πŸš€ **Nettement au-dessus de la moyenne du groupe** ({mean:.1f}). Excellent score !"
def top_pairs(emb: np.ndarray, words: list, n: int = 3):
all_pairs = [(i, j, cosine_dist(emb[i], emb[j]))
for i, j in combinations(range(len(words)), 2)]
all_pairs.sort(key=lambda x: -x[2])
far = "\n".join(f"**{words[i]}** ↔ **{words[j]}** `{d*100:.1f}`"
for i, j, d in all_pairs[:n])
close = "\n".join(f"**{words[i]}** ↔ **{words[j]}** `{d*100:.1f}`"
for i, j, d in all_pairs[-n:][::-1])
return far, close
# ── Fonction principale ───────────────────────────────────────────────────────
def run_dat(w1, w2, w3, w4, w5, w6, w7, w8, w9, w10):
words = [w.strip().lower() for w in [w1, w2, w3, w4, w5, w6, w7, w8, w9, w10]]
if any(w == "" for w in words):
return "❌ Remplissez les 10 mots.", "", None
if len(set(words)) < N_WORDS:
return "❌ Tous les mots doivent Γͺtre diffΓ©rents.", "", None
invalid = [w for w in words if not is_valid_word(w)]
if invalid:
liste = ", ".join(f"**{w}**" for w in invalid)
return (
f"❌ Mot(s) non reconnu(s) en français : {liste}\n\n"
f"Utilisez uniquement des noms communs franΓ§ais valides.",
"", None
)
# Embeddings normalisΓ©s
emb = np.array([normalize(model.get_word_vector(w)) for w in words])
# Matrice de distances
n = len(words)
dist_mat = np.zeros((n, n))
for i, j in combinations(range(n), 2):
d = cosine_dist(emb[i], emb[j])
dist_mat[i][j] = dist_mat[j][i] = d
score = compute_dat_score(emb)
# RΓ©cupΓ©rer la moyenne actuelle du groupe pour le commentaire
try:
df_tmp = pd.read_csv(SCORES_FILE)
current_mean = df_tmp["score"].mean() if not df_tmp.empty else None
except Exception:
current_mean = None
comment = score_comment(score, current_mean)
far, close = top_pairs(emb, words)
html_matrix = build_html_matrix(words, dist_mat)
save_score(score, words)
stats = load_stats()
result = f"""## Votre score DAT : **{score:.1f} / 100**
*(maximum thΓ©orique : 200)*
{comment}
---
**🟒 Paires les plus éloignées**
{far}
**πŸ”΄ Paires les plus proches**
{close}
"""
return result, html_matrix, stats
def refresh_stats():
return load_stats()
# ── Interface ─────────────────────────────────────────────────────────────────
init_scores_file()
css = ".center { text-align: center !important; }"
with gr.Blocks(title="DAT β€” CrΓ©ativitΓ© Divergente", css=css,
theme=gr.themes.Soft()) as demo:
gr.Markdown(
"# 🧠 Test de Créativité Divergente\n### Divergent Association Task (DAT)",
elem_classes="center"
)
gr.Markdown(
"Trouvez **10 mots aussi diffΓ©rents que possible** les uns des autres. \n"
"Uniquement des **noms communs**, au singulier. Pas de noms propres ni de termes techniques. \n"
"Travaillez depuis votre mΓ©moire β€” sans regarder autour de vous.",
elem_classes="center"
)
gr.Markdown("---\n### Vos 10 mots")
with gr.Row():
w1 = gr.Textbox(label="Mot 1", placeholder="")
w2 = gr.Textbox(label="Mot 2", placeholder="")
w3 = gr.Textbox(label="Mot 3", placeholder="")
w4 = gr.Textbox(label="Mot 4", placeholder="")
w5 = gr.Textbox(label="Mot 5", placeholder="")
with gr.Row():
w6 = gr.Textbox(label="Mot 6", placeholder="")
w7 = gr.Textbox(label="Mot 7", placeholder="")
w8 = gr.Textbox(label="Mot 8", placeholder="")
w9 = gr.Textbox(label="Mot 9", placeholder="")
w10 = gr.Textbox(label="Mot 10", placeholder="")
submit_btn = gr.Button("Calculer mon score β†’", variant="primary", size="lg")
with gr.Row():
with gr.Column(scale=2):
result_md = gr.Markdown()
matrix_html = gr.HTML(label="Matrice des distances sΓ©mantiques")
with gr.Column(scale=1):
gr.Markdown("### πŸ“Š Moyenne du groupe")
stats_md = gr.Markdown(value=load_stats())
refresh_btn = gr.Button("πŸ”„ Actualiser", size="sm")
submit_btn.click(
fn=run_dat,
inputs=[w1, w2, w3, w4, w5, w6, w7, w8, w9, w10],
outputs=[result_md, matrix_html, stats_md]
)
refresh_btn.click(fn=refresh_stats, outputs=[stats_md])
gr.Markdown(
"---\n"
"*BasΓ© sur [Olson et al. (2021), PNAS](https://doi.org/10.1073/pnas.2022340118) Β· "
"Modèle : FastText fr (Meta/Wikipedia) · "
"Un outil [Fouloscopie](https://www.mehdimoussaid.com/)*"
)
demo.launch()