Spaces:
Running
Running
| import gradio as gr | |
| import torch | |
| from PIL import Image | |
| from transformers import AutoProcessor, PaliGemmaForConditionalGeneration | |
| import numpy as np | |
| import cv2 | |
| import threading | |
| import traceback | |
| import time | |
| # ====================================================== | |
| # 1. SETUP (GOOGLE PALIGEMMA) | |
| # ====================================================== | |
| print("⏳ Avvio MUSIC4D - PaliGemma (Smart Mode)") | |
| # Usiamo la versione 224px. È più leggera per la CPU rispetto alla 448px. | |
| MODEL_ID = "google/paligemma-3b-mix-224" | |
| VALID_EMOTIONS = [ | |
| "JOY", "HAPPY", "SMILING", "LAUGHING", | |
| "ANGER", "ANGRY", "FROWNING", "FURIOUS", | |
| "FEAR", "SCARED", "TERRIFIED", | |
| "DISGUST", "DISGUSTED", | |
| "SURPRISE", "SHOCKED", "AMAZED", | |
| "SADNESS", "SAD", "CRYING", "DEPRESSED", | |
| "BOREDOM", "BORED", "TIRED", | |
| "NEUTRAL", "SERIOUS", "CALM" | |
| ] | |
| model = None | |
| processor = None | |
| current_emotion = "CARICAMENTO..." | |
| analysis_thread = None | |
| # ====================================================== | |
| # 2. CARICAMENTO | |
| # ====================================================== | |
| def load_paligemma(): | |
| global model, processor | |
| try: | |
| print("📦 Caricamento PaliGemma (3B Params)...") | |
| # PaliGemma richiede la sua classe specifica | |
| model = PaliGemmaForConditionalGeneration.from_pretrained( | |
| MODEL_ID, | |
| device_map="cpu", | |
| torch_dtype=torch.float32, # Float32 è più sicuro su CPU vecchie | |
| revision="bfloat16" # Scarica i pesi leggeri ma li converte | |
| ).eval() | |
| processor = AutoProcessor.from_pretrained(MODEL_ID) | |
| print("✅ PaliGemma Pronto! (Sarà lento ma intelligente)") | |
| except Exception as e: | |
| print(f"❌ Errore caricamento: {e}") | |
| load_paligemma() | |
| # ====================================================== | |
| # 3. AI LOGIC (PaliGemma) | |
| # ====================================================== | |
| def analyze_emotion_task(pil_image): | |
| global current_emotion | |
| start = time.time() | |
| if model is None: | |
| return | |
| try: | |
| # Prompt | |
| prompt = """<image>Which is the general emotion in this picture? Answer the question | |
| using a single word for each emotion you can find. Follow this | |
| example: ’emotions: [em1, em2, em3, ...]’. Choose from: [’joy’, | |
| ’anger’, ’fear’, ’disgust’, ’surprise’, ’sadness’, ’boredom’, ’neutral’, | |
| ’contentment’]. \n""" | |
| # 1. Preprocessing | |
| if pil_image.mode != "RGB": | |
| pil_image = pil_image.convert("RGB") | |
| inputs = processor(text=prompt, images=pil_image, return_tensors="pt") | |
| inputs = {k: v.to("cpu") for k, v in inputs.items()} | |
| # 2. Generazione | |
| with torch.no_grad(): | |
| generated_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=20, # Risposta breve | |
| do_sample=False, # Deterministico | |
| ) | |
| # 3. Decoding | |
| generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] | |
| # --- FIX BUG "SEMPRE JOY" --- | |
| # Il problema era che 'replace' non funzionava bene e rimaneva la lista delle opzioni | |
| # (che contiene la parola 'joy'). | |
| # SOLUZIONE: Spezziamo la stringa dove finisce la domanda ("contentment’].") | |
| # e prendiamo solo quello che c'è dopo. | |
| description = generated_text.upper() | |
| # Cerchiamo la fine della domanda per tagliare via tutto il prompt | |
| marker = "CONTENTMENT’]." | |
| if marker in description: | |
| description = description.split(marker)[-1].strip() | |
| else: | |
| # Fallback di sicurezza: prendiamo solo gli ultimi 50 caratteri | |
| # (la risposta è corta, il prompt è lungo) | |
| description = description[-50:].strip() | |
| description = description.replace("\n", "").strip() | |
| print(f"🧠 PaliGemma dice: {description}") | |
| # 4. Keyword Matching | |
| detected = "UNKNOWN" | |
| for keyword in VALID_EMOTIONS: | |
| if keyword in description: | |
| if keyword in ["HAPPY", "SMILING", "LAUGHING", "JOY"]: detected = "JOY" | |
| elif keyword in ["ANGRY", "FROWNING", "FURIOUS"]: detected = "ANGER" | |
| elif keyword in ["SCARED", "TERRIFIED", "FEAR"]: detected = "FEAR" | |
| elif keyword in ["SAD", "CRYING", "DEPRESSED"]: detected = "SADNESS" | |
| elif keyword in ["SHOCKED", "AMAZED", "SURPRISE"]: detected = "SURPRISE" | |
| elif keyword in ["BORED", "TIRED"]: detected = "BOREDOM" | |
| elif keyword in ["SERIOUS", "CALM", "NEUTRAL"]: detected = "NEUTRAL" | |
| elif keyword in ["CONTENTMENT"]: detected = "JOY" | |
| else: detected = keyword | |
| break | |
| # Fallback intelligente | |
| if detected == "UNKNOWN" and len(description) > 2: | |
| detected = description | |
| current_emotion = detected | |
| except Exception as e: | |
| print("⚠️ ERRORE CRITICO NEL THREAD:") | |
| traceback.print_exc() | |
| current_emotion = "ERRORE" | |
| end = time.time() - start | |
| print(f"⏱️ Tempo inferenza: {end:.2f}s") | |
| # ====================================================== | |
| # 4. FRAME PROCESSING | |
| # ====================================================== | |
| def process_frame(image_array): | |
| global analysis_thread, current_emotion | |
| if image_array is None: | |
| return image_array | |
| # print("thread grafica") | |
| annotated = image_array.copy() | |
| img_h, img_w, _ = annotated.shape | |
| # Logica Thread | |
| if analysis_thread is None or not analysis_thread.is_alive(): | |
| if model is not None: | |
| try: | |
| # Resize per PaliGemma | |
| ai_frame = cv2.resize(image_array, (224, 224)) | |
| pil_image = Image.fromarray(ai_frame) | |
| analysis_thread = threading.Thread(target=analyze_emotion_task, args=(pil_image,)) | |
| analysis_thread.start() | |
| except Exception as e: | |
| print(f"Errore prep immagine: {e}") | |
| # Grafica | |
| is_working = analysis_thread is not None and analysis_thread.is_alive() | |
| status_symbol = " (*)" if is_working else "" | |
| text_display = f"{current_emotion}{status_symbol}" | |
| color_bg = (0, 0, 0) | |
| color_txt = (0, 165, 255) if is_working else (0, 255, 0) | |
| if "ERRORE" in current_emotion: color_txt = (255, 0, 0) | |
| font = cv2.FONT_HERSHEY_SIMPLEX | |
| scale = 0.8 | |
| thick = 2 | |
| (tw, th), base = cv2.getTextSize(text_display, font, scale, thick) | |
| x = 20 | |
| y = img_h - 20 | |
| cv2.rectangle(annotated, (x-10, y-th-10), (x+tw+10, y+base+5), color_bg, -1) | |
| cv2.putText(annotated, text_display, (x, y), font, scale, color_txt, thick, cv2.LINE_AA) | |
| return annotated | |
| # ====================================================== | |
| # 5. UI | |
| # ====================================================== | |
| with gr.Blocks(title="MUSIC4D - PaliGemma Final") as demo: | |
| gr.Markdown("## 🎵 MUSIC4D – PaliGemma CPU") | |
| with gr.Row(): | |
| inp = gr.Image(sources=["webcam"], type="numpy", label="Webcam") | |
| out = gr.Image(type="numpy", label="Output Live") | |
| inp.stream( | |
| process_frame, | |
| inputs=inp, | |
| outputs=out, | |
| stream_every=11.5, | |
| time_limit=350 | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(ssr_mode=False) |