SonicaB's picture
Upload folder using huggingface_hub
aedd0b6 verified
from __future__ import annotations
import io, os, time, json
from pathlib import Path
from typing import List, Dict
import numpy as np
from PIL import Image
import gradio as gr
import requests
from pydub import AudioSegment
from utils_media import video_to_frame_audio, load_audio_16k, log_inference
HERE = Path(__file__).parent
LABEL_ITEMS = json.loads((HERE / "labels.json").read_text())["labels"]
LABELS = [x["name"] for x in LABEL_ITEMS]
PROMPTS = [x["prompt"] for x in LABEL_ITEMS]
CSV_API = HERE / "runs_api.csv"
CLIP_MODEL = "openai/clip-vit-base-patch32"
W2V2_MODEL = "facebook/wav2vec2-base"
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
print("Warning: HuggingFace token not found. API functions will not work.")
def _img_to_jpeg_bytes(pil: Image.Image) -> bytes:
buf = io.BytesIO()
pil.convert("RGB").save(buf, format="JPEG", quality=90)
return buf.getvalue()
def clip_api_probs(pil: Image.Image, prompts: List[str] = PROMPTS) -> np.ndarray:
if HF_TOKEN is None:
raise RuntimeError("HuggingFace token not available. Please set HF_TOKEN environment variable.")
try:
# Use direct requests API call instead of InferenceClient
img_bytes = _img_to_jpeg_bytes(pil)
url = f"https://api-inference.huggingface.co/models/{CLIP_MODEL}"
headers = {"Authorization": f"Bearer {HF_TOKEN}"}
payload = {
"parameters": {
"candidate_labels": prompts,
"hypothesis_template": "{}"
}
}
files = {"file": ("image.jpg", img_bytes, "image/jpeg")}
data = {"inputs": "", "parameters": json.dumps(payload["parameters"])}
response = requests.post(url, headers=headers, files=files, data=data, timeout=60)
response.raise_for_status()
result = response.json()
# Handle response format
if isinstance(result, list) and len(result) > 0:
scores = {item["label"]: item["score"] for item in result}
else:
# Fallback: equal probabilities
scores = {p: 1.0/len(prompts) for p in prompts}
arr = np.array([scores.get(p, 0.0) for p in prompts], dtype=np.float32)
s = arr.sum()
arr = arr / s if s > 0 else np.ones_like(arr)/len(arr)
return arr
except Exception as e:
print(f"CLIP API error: {e}")
# Return uniform distribution as fallback
return np.ones(len(prompts), dtype=np.float32) / len(prompts)
def _wave_float32_to_wav_bytes(wave_16k: np.ndarray, sr=16000) -> bytes:
samples = (np.clip(wave_16k, -1, 1) * 32767.0).astype(np.int16)
seg = AudioSegment(
samples.tobytes(), frame_rate=sr, sample_width=2, channels=1
)
out = io.BytesIO()
seg.export(out, format="wav")
return out.getvalue()
def w2v2_api_embed(wave_16k: np.ndarray) -> np.ndarray:
if HF_TOKEN is None:
raise RuntimeError("HuggingFace token not available.")
wav_bytes = _wave_float32_to_wav_bytes(wave_16k)
url = f"https://api-inference.huggingface.co/models/{W2V2_MODEL}"
hdrs = {"Authorization": f"Bearer {HF_TOKEN}"}
r = requests.post(url, headers=hdrs, data=wav_bytes, timeout=60)
r.raise_for_status()
arr = np.asarray(r.json(), dtype=np.float32) # shape [T, 768]
if arr.ndim == 3: # [batch, T, D]
arr = arr[0]
vec = arr.mean(axis=0) # [768]
# L2 normalize
n = np.linalg.norm(vec) + 1e-8
return (vec / n).astype(np.float32)
_PROTO_EMBS: Dict[str, np.ndarray] | None = None
def _sine(sr, freq, dur, amp=0.2):
t = np.linspace(0, dur, int(sr*dur), endpoint=False, dtype=np.float32)
return (amp * np.sin(2*np.pi*freq*t)).astype(np.float32)
def _burst_noise(sr, dur, amp=0.2):
x = np.random.randn(int(sr*dur)).astype(np.float32)
n = x.size
env = np.linspace(0, 1, int(0.05*n), dtype=np.float32)
env = np.pad(env, (0, n-env.size), constant_values=1.0)
env[-int(0.15*n):] = np.linspace(1, 0, int(0.15*n), dtype=np.float32)
return (amp * x * env).astype(np.float32)
def _triad(sr, base, minor=False, dur=2.0, amp=0.18):
third = 3/2 if minor else 4/3
w = (_sine(sr, base, dur, amp)
+ _sine(sr, base*third, dur, amp*0.7)
+ _sine(sr, base*2, dur, amp*0.5))
return (w / (np.max(np.abs(w)) + 1e-6)).astype(np.float32)
def _synthesize_audio_prototypes(sr=16000, dur=2.0):
return {
"calm": _sine(sr, 220, dur, amp=0.08),
"energetic": _burst_noise(sr, dur, amp=0.35),
"suspense": _sine(sr, 70, dur, amp=0.18) + _sine(sr, 80, dur, amp=0.12),
"joyful": _triad(sr, 262, minor=False, dur=dur, amp=0.22),
"sad": _triad(sr, 262, minor=True, dur=dur, amp=0.20),
}
def _ensure_proto_embs():
global _PROTO_EMBS
if _PROTO_EMBS is not None:
return
waves = _synthesize_audio_prototypes()
embs = {}
for lbl, wav in waves.items():
e = w2v2_api_embed(wav) # API embed L2-normalized
embs[lbl] = e
_PROTO_EMBS = embs
def w2v2_api_zero_shot_probs(wave_16k: np.ndarray, temperature: float = 1.0) -> np.ndarray:
_ensure_proto_embs()
emb = w2v2_api_embed(wave_16k) # [768], normalized
sims = np.array([float(np.dot(emb, _PROTO_EMBS[lbl])) for lbl in LABELS], dtype=np.float32)
z = sims / max(1e-6, float(temperature))
z = z - z.max()
p = np.exp(z); p /= (p.sum() + 1e-8)
return p.astype(np.float32)
def fuse_probs(p_img: np.ndarray, p_aud: np.ndarray, alpha: float) -> np.ndarray:
p_img = p_img / (p_img.sum() + 1e-8)
p_aud = p_aud / (p_aud.sum() + 1e-8)
p = alpha * p_img + (1 - alpha) * p_aud
return p / (p.sum() + 1e-8)
def top1_label(p: np.ndarray) -> str:
return LABELS[int(np.argmax(p))]
def predict_video(video, alpha=0.7):
if HF_TOKEN is None:
return "Error: HuggingFace token required", {"error": "Please set HF_Token environment variable to use API features"}, {"error": "No token available"}
t0 = time.time()
# FULL video analysis
frames, wave, meta = video_to_frame_audio(video, target_frames=24, fps_cap=2.0)
# IMAGE
t_img0 = time.time()
per_frame = [clip_api_probs(pil) for pil in frames]
p_img = np.mean(np.stack(per_frame, axis=0), axis=0)
t_img = time.time() - t_img0
# AUDIO
t_aud0 = time.time()
p_aud = w2v2_api_zero_shot_probs(wave, temperature=1.0)
t_aud = time.time() - t_aud0
# FUSION
t_fus0 = time.time()
p = fuse_probs(p_img, p_aud, alpha=float(alpha))
t_fus = time.time() - t_fus0
pred = top1_label(p)
probs = {k: round(float(v), 4) for k, v in zip(LABELS, p)}
lat = {
"t_image_ms": int(t_img*1000),
"t_audio_ms": int(t_aud*1000),
"t_fuse_ms": int(t_fus*1000),
"t_total_ms": int((time.time()-t0)*1000),
"n_frames": meta.get("n_frames"),
"fps_used": meta.get("fps_used"),
"duration_s": meta.get("duration_s"),
}
log_inference(engine="api", mode="video", alpha=float(alpha), lat=lat, pred=pred, probs=probs, csv_path=CSV_API )
return pred, probs, lat
def predict_image_audio(image: Image.Image, audio_path: str, alpha=0.7):
if HF_TOKEN is None:
return "Error: HuggingFace token required", {"error": "Please set HF_Token environment variable to use API features"}, {"error": "No token available"}
t0 = time.time()
wave = load_audio_16k(audio_path)
# IMAGE
t_img0 = time.time()
p_img = clip_api_probs(image)
t_img = time.time() - t_img0
# AUDIO
t_aud0 = time.time()
p_aud = w2v2_api_zero_shot_probs(wave, temperature=1.0)
t_aud = time.time() - t_aud0
# FUSION
t_fus0 = time.time()
p = fuse_probs(p_img, p_aud, alpha=float(alpha))
t_fus = time.time() - t_fus0
pred = top1_label(p)
probs = {k: round(float(v), 4) for k, v in zip(LABELS, p)}
lat = {
"t_image_ms": int(t_img*1000),
"t_audio_ms": int(t_aud*1000),
"t_fuse_ms": int(t_fus*1000),
"t_total_ms": int((time.time()-t0)*1000),
}
log_inference(engine="api", mode="image_audio", alpha=float(alpha), lat=lat, pred=pred, probs=probs, csv_path=CSV_API)
return pred, probs, lat
'''
Chat GPT : Create Gradio interface for the above API functions same as local app.
'''
with gr.Blocks(title="Scene Mood (API)") as demo:
gr.Markdown("# Scene Mood Classifier - API Version. Upload a short **video** or an **image + audio** pair.")
with gr.Tab("Video"):
v = gr.Video(sources=["upload"], height=240)
alpha_v = gr.Slider(0.0, 1.0, value=0.7, step=0.05,
label="Fusion weight α (image ↔ audio)",
info="α=1 trusts image only; α=0 trusts audio only.")
btn_v = gr.Button("Analyze")
out_v1, out_v2, out_v3 = gr.Label(), gr.JSON(), gr.JSON()
btn_v.click(predict_video, inputs=[v, alpha_v], outputs=[out_v1, out_v2, out_v3])
with gr.Tab("Image + Audio"):
img = gr.Image(type="pil", height=240, label="Image")
aud = gr.Audio(sources=["upload"], type="filepath", label="Audio")
alpha_ia = gr.Slider(0.0, 1.0, value=0.7, step=0.05,
label="Fusion weight α (image ↔ audio)",
info="α=1 trusts image only; α=0 trusts audio only.")
btn_ia = gr.Button("Analyze")
out_i1, out_i2, out_i3 = gr.Label(), gr.JSON(), gr.JSON()
btn_ia.click(predict_image_audio, inputs=[img, aud, alpha_ia], outputs=[out_i1, out_i2, out_i3])
if __name__ == "__main__":
demo.launch()