File size: 6,159 Bytes
38272dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# app.py
import os, glob, uuid, random, json, pymysql, gradio as gr
from typing import List, Dict, Any

IMAGE_DIR = os.getenv("IMAGE_DIR","assets")
SENTIMENTS_PATH = os.getenv("SENTIMENTS_PATH", os.path.join(IMAGE_DIR,"sentiments.json"))
IMG_EXTS = (".png",".jpg",".jpeg",".webp",".bmp",".gif")
POLL_TYPE="sentiment"
DEFAULT_SENTIMENTS=["Très négatif","Négatif","Neutre","Positif","Très positif"]

DB_HOST=os.getenv("DB_HOST"); DB_PORT=int(os.getenv("DB_PORT","3306"))
DB_NAME=os.getenv("DB_NAME"); DB_USER=os.getenv("DB_USER"); DB_PASSWORD=os.getenv("DB_PASSWORD")
DB_SSL=os.getenv("DB_SSL","false").lower()=="true"; ALLOW_DDL=os.getenv("ALLOW_DDL","false").lower()=="true"

def db_connect():
    kw=dict(host=DB_HOST,port=DB_PORT,user=DB_USER,password=DB_PASSWORD,database=DB_NAME,charset="utf8mb4",autocommit=True)
    if DB_SSL: kw["ssl"]={"ssl":{}}
    return pymysql.connect(**kw)

def ensure_tables():
    if not ALLOW_DDL: return
    sql="CREATE TABLE IF NOT EXISTS poll_response (id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, poll_type VARCHAR(32) NOT NULL, session_uuid CHAR(36) NOT NULL, item_index INT NOT NULL, item_file VARCHAR(255) NOT NULL, choice VARCHAR(64) NOT NULL, user_agent VARCHAR(255) DEFAULT NULL, ip_hash CHAR(64) DEFAULT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (id), KEY idx_poll_item (poll_type, item_index), KEY idx_created (created_at)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;"
    conn=db_connect(); 
    with conn.cursor() as cur: cur.execute(sql)
    conn.close()

def load_sentiments():
    try:
        if os.path.exists(SENTIMENTS_PATH):
            with open(SENTIMENTS_PATH,"r",encoding="utf-8") as f:
                data=json.load(f)
            if isinstance(data,list) and data: return [str(x) for x in data]
            if isinstance(data,dict) and isinstance(data.get("sentiments"),list): return [str(x) for x in data["sentiments"]]
    except Exception as e:
        print("[warn] sentiments.json illisible:", e)
    return DEFAULT_SENTIMENTS

def load_images()->List[Dict[str,Any]]:
    files=[p for p in glob.glob(os.path.join(IMAGE_DIR,"*")) if p.lower().endswith(IMG_EXTS)]
    files.sort()
    if not files: raise RuntimeError(f"Aucune image trouvée dans {IMAGE_DIR}")
    return [{"path":p,"file":os.path.basename(p)} for p in files]

def aggregate(conn, items, sentiments):
    rows=[]
    with conn.cursor() as cur:
        for i,it in enumerate(items, start=1):
            cur.execute("SELECT choice, COUNT(*) FROM poll_response WHERE poll_type=%s AND item_index=%s GROUP BY choice", (POLL_TYPE,i))
            counts={c:cnt for c,cnt in (cur.fetchall() or [])}
            total=sum(counts.values()); row=[i,it["file"],total]
            for s in sentiments: row.append(counts.get(s,0))
            rows.append(row)
    return rows

def app():
    ensure_tables()
    sentiments=load_sentiments()
    items=load_images()
    headers=["#","Fichier","Total"]+sentiments
    with gr.Blocks(theme=gr.themes.Soft()) as demo:
        gr.Markdown("# Poll Sentiments")
        gr.Markdown("Choisissez un sentiment pour chaque image, validez, puis consultez les agrégats globaux.")
        state_items=gr.State(items); state_sent=gr.State(sentiments); session_id=gr.State(str(uuid.uuid4()))
        with gr.Group(visible=True) as poll_group:
            with gr.Row():
                btn_shuffle=gr.Button("🔀 Mélanger l’ordre")
                btn_reset=gr.Button("♻️ Réinitialiser les choix")
                btn_submit=gr.Button("✅ Valider mes choix", variant="primary")
            warn=gr.Markdown(visible=False)
            imgs=[]; dds=[]
            rows=(len(items)+3)//4; idx=0
            with gr.Column():
                for r in range(rows):
                    with gr.Row():
                        for c in range(4):
                            if idx>=len(items): break
                            with gr.Column():
                                imgs.append(gr.Image(value=items[idx]["path"], label=f"Image {idx+1}", interactive=False))
                                dds.append(gr.Dropdown(choices=sentiments, label="Votre choix"))
                            idx+=1
        with gr.Group(visible=False) as result_group:
            gr.Markdown("## Agrégats (tous les usagers)")
            df=gr.Dataframe(headers=headers, interactive=False)
            btn_back=gr.Button("↩️ Revenir au poll")
        def on_shuffle(state, sentiments):
            items=list(state); import random; random.shuffle(items)
            return [*([gr.update(value=items[i]['path'],label=f'Image {i+1}') for i in range(len(items))]), *([gr.update(value=None, choices=sentiments) for _ in items]), gr.update(value='',visible=False), items]
        btn_shuffle.click(on_shuffle, inputs=[state_items,state_sent], outputs=[*imgs,*dds,warn,state_items])
        def on_reset(): return [gr.update(value=None) for _ in dds]+[gr.update(value='',visible=False)]
        btn_reset.click(on_reset, inputs=None, outputs=[*dds,warn])
        def on_submit(*vals, state, sentiments, sess):
            if any(v in (None,"") for v in vals):
                missing=sum(1 for v in vals if v in (None,""))
                return gr.update(value=f"❗ {missing} choix manquent.", visible=True), gr.update(visible=True), gr.update(visible=False), None
            conn=db_connect()
            with conn.cursor() as cur:
                for i,v in enumerate(vals, start=1):
                    cur.execute("INSERT INTO poll_response (poll_type,session_uuid,item_index,item_file,choice) VALUES (%s,%s,%s,%s,%s)", ("sentiment",sess,i,state[i-1]['file'],v))
            rows=aggregate(conn,state,sentiments); conn.close()
            return gr.update(visible=False), gr.update(value=rows), gr.update(visible=True), None
        btn_submit.click(on_submit, inputs=[*dds,state_items,state_sent,session_id], outputs=[poll_group,df,result_group,warn])
        btn_back.click(lambda: (gr.update(visible=True), gr.update(visible=False)), inputs=None, outputs=[poll_group, result_group])
    return demo
demo=app()
if __name__=="__main__": demo.launch()