File size: 5,591 Bytes
a2c5081
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# app.py
import os, glob, uuid, random, pymysql, gradio as gr
from typing import List, Dict, Any

IMAGE_DIR = os.getenv("IMAGE_DIR", "assets")
IMG_EXTS = (".png", ".jpg", ".jpeg", ".webp", ".bmp", ".gif")
POLL_TYPE = "thumbs"
CHOICES = [("👍 ThumbUp", "up"), ("👎 ThumbDown", "down")]

DB_HOST = os.getenv("DB_HOST")
DB_PORT = int(os.getenv("DB_PORT", "3306"))
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_SSL = os.getenv("DB_SSL", "false").lower() == "true"
ALLOW_DDL = os.getenv("ALLOW_DDL", "false").lower() == "true"

def db_connect():
    kwargs = dict(host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASSWORD, database=DB_NAME, charset="utf8mb4", autocommit=True)
    if DB_SSL:
        kwargs["ssl"] = {"ssl": {}}
    return pymysql.connect(**kwargs)

def ensure_tables():
    if not ALLOW_DDL: return
    sql = "CREATE TABLE IF NOT EXISTS poll_response (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, poll_type VARCHAR(32) NOT NULL, session_uuid CHAR(36) NOT NULL, item_index INT NOT NULL, item_file VARCHAR(255) NOT NULL, choice VARCHAR(64) NOT NULL, user_agent VARCHAR(255) DEFAULT NULL, ip_hash CHAR(64) DEFAULT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), KEY idx_poll_item (poll_type, item_index), KEY idx_created (created_at)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;"
    conn=db_connect(); 
    with conn.cursor() as cur: cur.execute(sql)
    conn.close()

def load_images()->List[Dict[str,Any]]:
    files=[p for p in glob.glob(os.path.join(IMAGE_DIR,"*")) if p.lower().endswith(IMG_EXTS)]
    files.sort()
    if not files: raise RuntimeError(f"Aucune image trouvée dans {IMAGE_DIR}")
    return [{"path":p,"file":os.path.basename(p)} for p in files]

def aggregate(conn, items):
    rows=[]
    with conn.cursor() as cur:
        for i,it in enumerate(items, start=1):
            cur.execute("SELECT SUM(choice='up'), SUM(choice='down'), COUNT(*) FROM poll_response WHERE poll_type=%s AND item_index=%s",(POLL_TYPE,i))
            up,down,total=cur.fetchone() or (0,0,0)
            up=up or 0; down=down or 0; total=total or 0
            pct = f"{round(100*up/total)}%" if total>0 else "0%"
            rows.append([i,it["file"],up,down,total,pct])
    return rows

def app():
    ensure_tables()
    items = load_images()
    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.Markdown("# Poll 👍 / 👎")
        gr.Markdown("Faites vos choix puis validez. Le tableau affiche les **agrégats** globaux.")
        state_items=gr.State(items); session_id=gr.State(str(uuid.uuid4()))
        with gr.Group(visible=True) as poll_group:
            with gr.Row(): 
                btn_shuffle=gr.Button("🔀 Mélanger l’ordre")
                btn_reset=gr.Button("♻️ Réinitialiser les choix")
                btn_submit=gr.Button("✅ Valider mes choix", variant="primary")
            warn=gr.Markdown(visible=False)
            imgs=[]; radios=[]
            rows=(len(items)+3)//4; idx=0
            with gr.Column():
                for r in range(rows):
                    with gr.Row():
                        for c in range(4):
                            if idx>=len(items): break
                            with gr.Column():
                                imgs.append(gr.Image(value=items[idx]["path"], label=f"Image {idx+1}", interactive=False))
                                radios.append(gr.Radio(choices=[lbl for lbl,_ in CHOICES], label="Votre choix"))
                            idx+=1
        with gr.Group(visible=False) as result_group:
            gr.Markdown("## Agrégats (tous les usagers)")
            df=gr.Dataframe(headers=["#","Fichier","Up","Down","Total","%Up"], interactive=False)
            btn_back=gr.Button("↩️ Revenir au poll")
        def on_shuffle(state):
            items=list(state); random.shuffle(items)
            return [*([gr.update(value=items[i]['path'],label=f'Image {i+1}') for i in range(len(items))]), *([gr.update(value=None) for _ in items]), gr.update(value='',visible=False), items]
        btn_shuffle.click(on_shuffle, inputs=[state_items], outputs=[*imgs,*radios,warn,state_items])
        def on_reset(): return [gr.update(value=None) for _ in radios]+[gr.update(value='',visible=False)]
        btn_reset.click(on_reset, inputs=None, outputs=[*radios,warn])
        def on_submit(*vals, state, sess):
            lab2val={lbl:v for lbl,v in CHOICES}
            choices=[None if v is None else lab2val.get(v) for v in vals]
            if any(v is None for v in choices):
                missing=sum(1 for v in choices if v is None)
                return gr.update(value=f"❗ {missing} choix manquent.", visible=True), gr.update(visible=True), gr.update(visible=False), None
            conn=db_connect()
            with conn.cursor() as cur:
                for i,v in enumerate(choices, start=1):
                    cur.execute("INSERT INTO poll_response (poll_type,session_uuid,item_index,item_file,choice) VALUES (%s,%s,%s,%s,%s)", ("thumbs",sess,i,state[i-1]['file'],v))
            rows=aggregate(conn,state); conn.close()
            return gr.update(visible=False), gr.update(value=rows), gr.update(visible=True), None
        btn_submit.click(on_submit, inputs=[*radios,state_items,session_id], outputs=[poll_group,df,result_group,warn])
        btn_back.click(lambda: (gr.update(visible=True), gr.update(visible=False)), inputs=None, outputs=[poll_group, result_group])
    return demo
demo=app()
if __name__=="__main__": demo.launch()