import gradio as gr import spaces import os import sys import json import re import zipfile import asyncio import tempfile import base64 import threading from pathlib import Path import subprocess # ── Install deps ────────────────────────────────────────────────────────────── def install_packages(): packages = [ "transformers>=4.40.0", "torch>=2.1.0", "accelerate>=0.27.0", "edge-tts", "sentencepiece", ] for pkg in packages: subprocess.run( [sys.executable, "-m", "pip", "install", pkg, "-q"], check=False, ) install_packages() import torch from transformers import AutoTokenizer, AutoModelForCausalLM import edge_tts # ── Constants ───────────────────────────────────────────────────────────────── MODEL_ID = "Qwen/Qwen2.5-0.5B-Instruct" # small, fast, reliable VRM_PATH = "model/Ani.vrm" ANIM_ZIP = "animation/all_vrma.zip" TTS_VOICE = "en-US-AriaNeural" # reliable fallback TTS_VOICE2 = "zh-CN-XiaoyiNeural" EMOTIONS_MAP = { "happy": {"blinking": True, "icon": "😊"}, "sad": {"blinking": True, "icon": "😢"}, "angry": {"blinking": False, "icon": "😠"}, "surprised": {"blinking": False, "icon": "😲"}, "fearful": {"blinking": True, "icon": "😨"}, "disgusted": {"blinking": False, "icon": "🤢"}, "neutral": {"blinking": True, "icon": "😐"}, "excited": {"blinking": True, "icon": "🤩"}, "confused": {"blinking": True, "icon": "😕"}, "thinking": {"blinking": True, "icon": "🤔"}, "laughing": {"blinking": False, "icon": "😄"}, "crying": {"blinking": False, "icon": "😭"}, "love": {"blinking": True, "icon": "🥰"}, "shy": {"blinking": True, "icon": "😊"}, "bored": {"blinking": True, "icon": "😑"}, "sleepy": {"blinking": False, "icon": "😴"}, "determined": {"blinking": True, "icon": "💪"}, "proud": {"blinking": True, "icon": "🎉"}, "embarrassed": {"blinking": True, "icon": "😳"}, "grateful": {"blinking": True, "icon": "🙏"}, "playful": {"blinking": True, "icon": "😜"}, "serious": {"blinking": True, "icon": "😤"}, } # ── VRMA list ───────────────────────────────────────────────────────────────── def get_vrma_list(): if not os.path.exists(ANIM_ZIP): return [] with zipfile.ZipFile(ANIM_ZIP, "r") as z: return [f for f in z.namelist() if f.endswith(".vrma")] VRMA_LIST = get_vrma_list() # ── Model ───────────────────────────────────────────────────────────────────── _tokenizer = None _model = None _lock = threading.Lock() def load_model(): global _tokenizer, _model if _model is not None: return with _lock: if _model is not None: return print(f"[ANI] Loading {MODEL_ID} …") _tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) _model = AutoModelForCausalLM.from_pretrained( MODEL_ID, torch_dtype=torch.float16, device_map="auto", low_cpu_mem_usage=True, ) _model.eval() print("[ANI] Model ready.") def build_system_prompt(): emo_str = ", ".join(EMOTIONS_MAP.keys()) anim_str = ", ".join(VRMA_LIST[:30]) if VRMA_LIST else "null" return ( "You are Ani, a friendly AI anime companion.\n" "Reply ONLY with valid JSON – no markdown, no extra text:\n" '{"response":"...", "emotion":"...", "animation":"...", "intensity":0.8}\n' f"Emotions: {emo_str}\n" f"Animations: {anim_str}\n" "Pick the best emotion and animation. Keep replies to 1-3 sentences." ) @spaces.GPU(duration=60) def generate_response(user_message: str, history: list) -> dict: load_model() system = build_system_prompt() messages = [{"role": "system", "content": system}] for h, a in history[-4:]: if h: messages.append({"role": "user", "content": h}) if a: try: messages.append({"role": "assistant", "content": json.loads(a).get("response", a)}) except Exception: messages.append({"role": "assistant", "content": a}) messages.append({"role": "user", "content": user_message}) text = _tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) inputs = _tokenizer(text, return_tensors="pt").to(_model.device) with torch.no_grad(): out = _model.generate( **inputs, max_new_tokens=200, temperature=0.7, top_p=0.9, do_sample=True, pad_token_id=_tokenizer.eos_token_id, ) raw = _tokenizer.decode( out[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True ).strip() try: m = re.search(r"\{.*\}", raw, re.DOTALL) result = json.loads(m.group()) if m else {} except Exception: result = {} result.setdefault("response", raw or "Hello!") result.setdefault("emotion", "neutral") result.setdefault("animation", None) result.setdefault("intensity", 0.7) if result["emotion"] not in EMOTIONS_MAP: result["emotion"] = "neutral" return result # ── TTS ─────────────────────────────────────────────────────────────────────── async def _tts(text, voice, path): await edge_tts.Communicate(text, voice).save(path) def run_tts(text: str) -> str: tmp = tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) tmp.close() for voice in [TTS_VOICE2, TTS_VOICE]: try: asyncio.run(_tts(text, voice, tmp.name)) if os.path.getsize(tmp.name) > 0: with open(tmp.name, "rb") as f: b64 = base64.b64encode(f.read()).decode() os.unlink(tmp.name) return b64 except Exception as e: print(f"[TTS] {voice} failed: {e}") return "" # ── VRM helpers ─────────────────────────────────────────────────────────────── def get_vrm_b64() -> str: if os.path.exists(VRM_PATH): with open(VRM_PATH, "rb") as f: return base64.b64encode(f.read()).decode() return "" def get_vrma_b64(name: str) -> str: if not name or not os.path.exists(ANIM_ZIP): return "" with zipfile.ZipFile(ANIM_ZIP, "r") as z: for n in z.namelist(): if n == name or os.path.basename(n) == name: return base64.b64encode(z.read(n)).decode() return "" def get_idle_b64(): for p in ["natural2.vrma", "natural.vrma", "idle.vrma", "stand.vrma"]: b = get_vrma_b64(p) if b: return b if VRMA_LIST: return get_vrma_b64(VRMA_LIST[0]) return "" VRM_B64 = get_vrm_b64() IDLE_B64 = get_idle_b64() # ── Chat handler ────────────────────────────────────────────────────────────── def chat(user_msg: str, history: list): if not user_msg.strip(): return history, "{}" try: result = generate_response(user_msg, history) except Exception as e: result = {"response": f"Sorry, error: {e}", "emotion": "sad", "animation": None, "intensity": 0.5} text = result.get("response", "…") emotion = result.get("emotion", "neutral") animation = result.get("animation") intensity = result.get("intensity", 0.7) audio_b64 = run_tts(text) anim_b64 = get_vrma_b64(animation) if animation else "" blinking = EMOTIONS_MAP.get(emotion, EMOTIONS_MAP["neutral"])["blinking"] payload = json.dumps({ "type": "chat_response", "response": text, "emotion": emotion, "blinking": blinking, "animation_b64": anim_b64, "audio_b64": audio_b64, "intensity": intensity, }) new_history = history + [[user_msg, json.dumps(result)]] return new_history, payload # ══════════════════════════════════════════════════════════════════════════════ # HTML for the iframe-like inner panel # ══════════════════════════════════════════════════════════════════════════════ INNER_CSS = """ """ INNER_JS = """ """ def build_inner_html(): vrm_b64_escaped = VRM_B64 if VRM_B64 else "" idle_b64_escaped = IDLE_B64 if IDLE_B64 else "" return f"""
{INNER_CSS}