Spaces:
Sleeping
Sleeping
| # app.py | |
| import os, glob, uuid, random, pymysql, gradio as gr | |
| from typing import List, Dict, Any | |
| IMAGE_DIR = os.getenv("IMAGE_DIR", "assets") | |
| IMG_EXTS = (".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif") | |
| POLL_TYPE = "thumbs" | |
| CHOICES = [("👍 ThumbUp", "up"), ("👎 ThumbDown", "down")] | |
| DB_HOST = os.getenv("DB_HOST") | |
| DB_PORT = int(os.getenv("DB_PORT", "3306")) | |
| DB_NAME = os.getenv("DB_NAME") | |
| DB_USER = os.getenv("DB_USER") | |
| DB_PASSWORD = os.getenv("DB_PASSWORD") | |
| DB_SSL = os.getenv("DB_SSL", "false").lower() == "true" | |
| ALLOW_DDL = os.getenv("ALLOW_DDL", "false").lower() == "true" | |
| def db_connect(): | |
| kwargs = dict(host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, charset="utf8mb4", autocommit=True) | |
| if DB_SSL: | |
| kwargs["ssl"] = {"ssl": {}} | |
| return pymysql.connect(**kwargs) | |
| def ensure_tables(): | |
| if not ALLOW_DDL: return | |
| sql = "CREATE TABLE IF NOT EXISTS poll_response (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, poll_type VARCHAR(32) NOT NULL, session_uuid CHAR(36) NOT NULL, item_index INT NOT NULL, item_file VARCHAR(255) NOT NULL, choice VARCHAR(64) NOT NULL, user_agent VARCHAR(255) DEFAULT NULL, ip_hash CHAR(64) DEFAULT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), KEY idx_poll_item (poll_type, item_index), KEY idx_created (created_at)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;" | |
| conn=db_connect(); | |
| with conn.cursor() as cur: cur.execute(sql) | |
| conn.close() | |
| def load_images()->List[Dict[str,Any]]: | |
| files=[p for p in glob.glob(os.path.join(IMAGE_DIR,"*")) if p.lower().endswith(IMG_EXTS)] | |
| files.sort() | |
| if not files: raise RuntimeError(f"Aucune image trouvée dans {IMAGE_DIR}") | |
| return [{"path":p,"file":os.path.basename(p)} for p in files] | |
| def aggregate(conn, items): | |
| rows=[] | |
| with conn.cursor() as cur: | |
| for i,it in enumerate(items, start=1): | |
| cur.execute("SELECT SUM(choice='up'), SUM(choice='down'), COUNT(*) FROM poll_response WHERE poll_type=%s AND item_index=%s",(POLL_TYPE,i)) | |
| up,down,total=cur.fetchone() or (0,0,0) | |
| up=up or 0; down=down or 0; total=total or 0 | |
| pct = f"{round(100*up/total)}%" if total>0 else "0%" | |
| rows.append([i,it["file"],up,down,total,pct]) | |
| return rows | |
| def app(): | |
| ensure_tables() | |
| items = load_images() | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# Poll 👍 / 👎") | |
| gr.Markdown("Faites vos choix puis validez. Le tableau affiche les **agrégats** globaux.") | |
| state_items=gr.State(items); session_id=gr.State(str(uuid.uuid4())) | |
| with gr.Group(visible=True) as poll_group: | |
| with gr.Row(): | |
| btn_shuffle=gr.Button("🔀 Mélanger l’ordre") | |
| btn_reset=gr.Button("♻️ Réinitialiser les choix") | |
| btn_submit=gr.Button("✅ Valider mes choix", variant="primary") | |
| warn=gr.Markdown(visible=False) | |
| imgs=[]; radios=[] | |
| rows=(len(items)+3)//4; idx=0 | |
| with gr.Column(): | |
| for r in range(rows): | |
| with gr.Row(): | |
| for c in range(4): | |
| if idx>=len(items): break | |
| with gr.Column(): | |
| imgs.append(gr.Image(value=items[idx]["path"], label=f"Image {idx+1}", interactive=False)) | |
| radios.append(gr.Radio(choices=[lbl for lbl,_ in CHOICES], label="Votre choix")) | |
| idx+=1 | |
| with gr.Group(visible=False) as result_group: | |
| gr.Markdown("## Agrégats (tous les usagers)") | |
| df=gr.Dataframe(headers=["#","Fichier","Up","Down","Total","%Up"], interactive=False) | |
| btn_back=gr.Button("↩️ Revenir au poll") | |
| def on_shuffle(state): | |
| items=list(state); random.shuffle(items) | |
| return [*([gr.update(value=items[i]['path'],label=f'Image {i+1}') for i in range(len(items))]), *([gr.update(value=None) for _ in items]), gr.update(value='',visible=False), items] | |
| btn_shuffle.click(on_shuffle, inputs=[state_items], outputs=[*imgs,*radios,warn,state_items]) | |
| def on_reset(): return [gr.update(value=None) for _ in radios]+[gr.update(value='',visible=False)] | |
| btn_reset.click(on_reset, inputs=None, outputs=[*radios,warn]) | |
| def on_submit(*vals, state, sess): | |
| lab2val={lbl:v for lbl,v in CHOICES} | |
| choices=[None if v is None else lab2val.get(v) for v in vals] | |
| if any(v is None for v in choices): | |
| missing=sum(1 for v in choices if v is None) | |
| return gr.update(value=f"❗ {missing} choix manquent.", visible=True), gr.update(visible=True), gr.update(visible=False), None | |
| conn=db_connect() | |
| with conn.cursor() as cur: | |
| for i,v in enumerate(choices, start=1): | |
| cur.execute("INSERT INTO poll_response (poll_type,session_uuid,item_index,item_file,choice) VALUES (%s,%s,%s,%s,%s)", ("thumbs",sess,i,state[i-1]['file'],v)) | |
| rows=aggregate(conn,state); conn.close() | |
| return gr.update(visible=False), gr.update(value=rows), gr.update(visible=True), None | |
| btn_submit.click(on_submit, inputs=[*radios,state_items,session_id], outputs=[poll_group,df,result_group,warn]) | |
| btn_back.click(lambda: (gr.update(visible=True), gr.update(visible=False)), inputs=None, outputs=[poll_group, result_group]) | |
| return demo | |
| demo=app() | |
| if __name__=="__main__": demo.launch() | |