from __future__ import annotations import io, os, time, json from pathlib import Path from typing import List, Dict import numpy as np from PIL import Image import gradio as gr import requests from pydub import AudioSegment from utils_media import video_to_frame_audio, load_audio_16k, log_inference HERE = Path(__file__).parent LABEL_ITEMS = json.loads((HERE / "labels.json").read_text())["labels"] LABELS = [x["name"] for x in LABEL_ITEMS] PROMPTS = [x["prompt"] for x in LABEL_ITEMS] CSV_API = HERE / "runs_api.csv" CLIP_MODEL = "openai/clip-vit-base-patch32" W2V2_MODEL = "facebook/wav2vec2-base" HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: print("Warning: HuggingFace token not found. API functions will not work.") def _img_to_jpeg_bytes(pil: Image.Image) -> bytes: buf = io.BytesIO() pil.convert("RGB").save(buf, format="JPEG", quality=90) return buf.getvalue() def clip_api_probs(pil: Image.Image, prompts: List[str] = PROMPTS) -> np.ndarray: if HF_TOKEN is None: raise RuntimeError("HuggingFace token not available. Please set HF_TOKEN environment variable.") try: # Use direct requests API call instead of InferenceClient img_bytes = _img_to_jpeg_bytes(pil) url = f"https://api-inference.huggingface.co/models/{CLIP_MODEL}" headers = {"Authorization": f"Bearer {HF_TOKEN}"} payload = { "parameters": { "candidate_labels": prompts, "hypothesis_template": "{}" } } files = {"file": ("image.jpg", img_bytes, "image/jpeg")} data = {"inputs": "", "parameters": json.dumps(payload["parameters"])} response = requests.post(url, headers=headers, files=files, data=data, timeout=60) response.raise_for_status() result = response.json() # Handle response format if isinstance(result, list) and len(result) > 0: scores = {item["label"]: item["score"] for item in result} else: # Fallback: equal probabilities scores = {p: 1.0/len(prompts) for p in prompts} arr = np.array([scores.get(p, 0.0) for p in prompts], dtype=np.float32) s = arr.sum() arr = arr / s if s > 0 else np.ones_like(arr)/len(arr) return arr except Exception as e: print(f"CLIP API error: {e}") # Return uniform distribution as fallback return np.ones(len(prompts), dtype=np.float32) / len(prompts) def _wave_float32_to_wav_bytes(wave_16k: np.ndarray, sr=16000) -> bytes: samples = (np.clip(wave_16k, -1, 1) * 32767.0).astype(np.int16) seg = AudioSegment( samples.tobytes(), frame_rate=sr, sample_width=2, channels=1 ) out = io.BytesIO() seg.export(out, format="wav") return out.getvalue() def w2v2_api_embed(wave_16k: np.ndarray) -> np.ndarray: if HF_TOKEN is None: raise RuntimeError("HuggingFace token not available.") wav_bytes = _wave_float32_to_wav_bytes(wave_16k) url = f"https://api-inference.huggingface.co/models/{W2V2_MODEL}" hdrs = {"Authorization": f"Bearer {HF_TOKEN}"} r = requests.post(url, headers=hdrs, data=wav_bytes, timeout=60) r.raise_for_status() arr = np.asarray(r.json(), dtype=np.float32) # shape [T, 768] if arr.ndim == 3: # [batch, T, D] arr = arr[0] vec = arr.mean(axis=0) # [768] # L2 normalize n = np.linalg.norm(vec) + 1e-8 return (vec / n).astype(np.float32) _PROTO_EMBS: Dict[str, np.ndarray] | None = None def _sine(sr, freq, dur, amp=0.2): t = np.linspace(0, dur, int(sr*dur), endpoint=False, dtype=np.float32) return (amp * np.sin(2*np.pi*freq*t)).astype(np.float32) def _burst_noise(sr, dur, amp=0.2): x = np.random.randn(int(sr*dur)).astype(np.float32) n = x.size env = np.linspace(0, 1, int(0.05*n), dtype=np.float32) env = np.pad(env, (0, n-env.size), constant_values=1.0) env[-int(0.15*n):] = np.linspace(1, 0, int(0.15*n), dtype=np.float32) return (amp * x * env).astype(np.float32) def _triad(sr, base, minor=False, dur=2.0, amp=0.18): third = 3/2 if minor else 4/3 w = (_sine(sr, base, dur, amp) + _sine(sr, base*third, dur, amp*0.7) + _sine(sr, base*2, dur, amp*0.5)) return (w / (np.max(np.abs(w)) + 1e-6)).astype(np.float32) def _synthesize_audio_prototypes(sr=16000, dur=2.0): return { "calm": _sine(sr, 220, dur, amp=0.08), "energetic": _burst_noise(sr, dur, amp=0.35), "suspense": _sine(sr, 70, dur, amp=0.18) + _sine(sr, 80, dur, amp=0.12), "joyful": _triad(sr, 262, minor=False, dur=dur, amp=0.22), "sad": _triad(sr, 262, minor=True, dur=dur, amp=0.20), } def _ensure_proto_embs(): global _PROTO_EMBS if _PROTO_EMBS is not None: return waves = _synthesize_audio_prototypes() embs = {} for lbl, wav in waves.items(): e = w2v2_api_embed(wav) # API embed L2-normalized embs[lbl] = e _PROTO_EMBS = embs def w2v2_api_zero_shot_probs(wave_16k: np.ndarray, temperature: float = 1.0) -> np.ndarray: _ensure_proto_embs() emb = w2v2_api_embed(wave_16k) # [768], normalized sims = np.array([float(np.dot(emb, _PROTO_EMBS[lbl])) for lbl in LABELS], dtype=np.float32) z = sims / max(1e-6, float(temperature)) z = z - z.max() p = np.exp(z); p /= (p.sum() + 1e-8) return p.astype(np.float32) def fuse_probs(p_img: np.ndarray, p_aud: np.ndarray, alpha: float) -> np.ndarray: p_img = p_img / (p_img.sum() + 1e-8) p_aud = p_aud / (p_aud.sum() + 1e-8) p = alpha * p_img + (1 - alpha) * p_aud return p / (p.sum() + 1e-8) def top1_label(p: np.ndarray) -> str: return LABELS[int(np.argmax(p))] def predict_video(video, alpha=0.7): if HF_TOKEN is None: return "Error: HuggingFace token required", {"error": "Please set HF_Token environment variable to use API features"}, {"error": "No token available"} t0 = time.time() # FULL video analysis frames, wave, meta = video_to_frame_audio(video, target_frames=24, fps_cap=2.0) # IMAGE t_img0 = time.time() per_frame = [clip_api_probs(pil) for pil in frames] p_img = np.mean(np.stack(per_frame, axis=0), axis=0) t_img = time.time() - t_img0 # AUDIO t_aud0 = time.time() p_aud = w2v2_api_zero_shot_probs(wave, temperature=1.0) t_aud = time.time() - t_aud0 # FUSION t_fus0 = time.time() p = fuse_probs(p_img, p_aud, alpha=float(alpha)) t_fus = time.time() - t_fus0 pred = top1_label(p) probs = {k: round(float(v), 4) for k, v in zip(LABELS, p)} lat = { "t_image_ms": int(t_img*1000), "t_audio_ms": int(t_aud*1000), "t_fuse_ms": int(t_fus*1000), "t_total_ms": int((time.time()-t0)*1000), "n_frames": meta.get("n_frames"), "fps_used": meta.get("fps_used"), "duration_s": meta.get("duration_s"), } log_inference(engine="api", mode="video", alpha=float(alpha), lat=lat, pred=pred, probs=probs, csv_path=CSV_API ) return pred, probs, lat def predict_image_audio(image: Image.Image, audio_path: str, alpha=0.7): if HF_TOKEN is None: return "Error: HuggingFace token required", {"error": "Please set HF_Token environment variable to use API features"}, {"error": "No token available"} t0 = time.time() wave = load_audio_16k(audio_path) # IMAGE t_img0 = time.time() p_img = clip_api_probs(image) t_img = time.time() - t_img0 # AUDIO t_aud0 = time.time() p_aud = w2v2_api_zero_shot_probs(wave, temperature=1.0) t_aud = time.time() - t_aud0 # FUSION t_fus0 = time.time() p = fuse_probs(p_img, p_aud, alpha=float(alpha)) t_fus = time.time() - t_fus0 pred = top1_label(p) probs = {k: round(float(v), 4) for k, v in zip(LABELS, p)} lat = { "t_image_ms": int(t_img*1000), "t_audio_ms": int(t_aud*1000), "t_fuse_ms": int(t_fus*1000), "t_total_ms": int((time.time()-t0)*1000), } log_inference(engine="api", mode="image_audio", alpha=float(alpha), lat=lat, pred=pred, probs=probs, csv_path=CSV_API) return pred, probs, lat ''' Chat GPT : Create Gradio interface for the above API functions same as local app. ''' with gr.Blocks(title="Scene Mood (API)") as demo: gr.Markdown("# Scene Mood Classifier - API Version. Upload a short **video** or an **image + audio** pair.") with gr.Tab("Video"): v = gr.Video(sources=["upload"], height=240) alpha_v = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="Fusion weight α (image ↔ audio)", info="α=1 trusts image only; α=0 trusts audio only.") btn_v = gr.Button("Analyze") out_v1, out_v2, out_v3 = gr.Label(), gr.JSON(), gr.JSON() btn_v.click(predict_video, inputs=[v, alpha_v], outputs=[out_v1, out_v2, out_v3]) with gr.Tab("Image + Audio"): img = gr.Image(type="pil", height=240, label="Image") aud = gr.Audio(sources=["upload"], type="filepath", label="Audio") alpha_ia = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="Fusion weight α (image ↔ audio)", info="α=1 trusts image only; α=0 trusts audio only.") btn_ia = gr.Button("Analyze") out_i1, out_i2, out_i3 = gr.Label(), gr.JSON(), gr.JSON() btn_ia.click(predict_image_audio, inputs=[img, aud, alpha_ia], outputs=[out_i1, out_i2, out_i3]) if __name__ == "__main__": demo.launch()