from __future__ import annotations import base64 import json import mimetypes import os import re import time from collections.abc import Iterator from pathlib import Path from typing import Any from src.character_registry import build_tavern_system_prompt, get_character from src.persona_skills import select_skill from src.stream_protocol import normalize_event, parse_sse_lines from src.tts_engine import synthesize_sentence DEFAULT_VLLM_BASE_URL = "https://veronicaulises0--virtual-characters-vllm-gemma-serve.modal.run" STAGE_TAG_RE = re.compile( r"[a-z_]+)\"\s+motion=\"(?P[a-z_]+)\"\s+intensity=\"(?P[0-9.]+)\"\s*>" ) def stream_reply( user_text: str, history: list[dict], state: dict, media_inputs: dict[str, list[dict[str, Any]]] | None = None, voice_state: dict[str, Any] | None = None, ) -> Iterator[dict]: media_inputs = media_inputs or {"images": []} if os.environ.get("VC_USE_MOCK") == "1": yield from _stream_mock_reply(user_text, state, voice_state) return modal_url = os.environ.get("VC_MODAL_LLM_URL") if modal_url: yield from _stream_modal_reply(modal_url, user_text, history, state, media_inputs, voice_state) return vllm_url = os.environ.get("VC_MODAL_VLLM_URL") or DEFAULT_VLLM_BASE_URL if vllm_url: yield from _stream_vllm_reply(vllm_url, user_text, history, state, media_inputs, voice_state) return yield from _stream_mock_reply(user_text, state, voice_state) def _stream_modal_reply( url: str, user_text: str, history: list[dict], state: dict, media_inputs: dict[str, list[dict[str, Any]]], voice_state: dict[str, Any] | None, ) -> Iterator[dict]: import httpx character = state.get("character") or get_character(state.get("character_id", "star_knight")) payload = { "text": user_text, "history": history[-8:], "character": character, "vision_note": state.get("last_vision_note"), "image_urls": [_data_url(item["path"]) for item in media_inputs.get("images", []) if item.get("path")], "max_new_tokens": 180, } last_stage = state.get("stage", {}) with httpx.stream("POST", url, json=payload, timeout=180) as response: response.raise_for_status() for event in parse_sse_lines(response.iter_lines()): if event.get("type") == "stage": last_stage = {**last_stage, **event} yield event if event.get("type") == "sentence_end" and _voice_enabled(voice_state): yield from _synthesize_audio_event(event.get("text", ""), character, voice_state or state.get("voice", {})) settle = _settled_stage(last_stage) if settle: yield settle def _stream_vllm_reply( base_url: str, user_text: str, history: list[dict], state: dict, media_inputs: dict[str, list[dict[str, Any]]] | None, voice_state: dict[str, Any] | None, ) -> Iterator[dict]: import httpx media_inputs = media_inputs or {"images": []} character = state.get("character") or get_character(state.get("character_id", "star_knight")) skill = select_skill(user_text, character) voice = {**character.get("voice", {}), **(voice_state or {})} reply_text = "" last_stage = {"type": "stage", "expression": "thinking", "motion": "look_at_user", "intensity": 0.55} yield normalize_event({"type": "skill", "name": skill}) yield normalize_event(last_stage) yield normalize_event({"type": "voice", "style": voice.get("style", "soft"), "speed": voice.get("speed", 0.96), "energy": voice.get("energy", 0.5)}) yield {"type": "debug", "message": f"Modal vLLM endpoint: {base_url.rstrip('/')}"} payload = { "model": os.environ.get("VC_VLLM_SERVED_MODEL", "llm"), "messages": _build_openai_messages(user_text, history, character, skill, state.get("last_vision_note"), media_inputs), "max_tokens": int(os.environ.get("VC_VLLM_MAX_TOKENS", "220")), "temperature": float(os.environ.get("VC_VLLM_TEMPERATURE", "0.75")), "stream": True, "chat_template_kwargs": {"enable_thinking": False}, } timeout = httpx.Timeout(connect=20, read=float(os.environ.get("VC_VLLM_READ_TIMEOUT", "600")), write=20, pool=20) parser_state = {"pending_stage": True, "buffer": "", "started_text": False} url = base_url.rstrip("/") + "/v1/chat/completions" max_attempts = int(os.environ.get("VC_VLLM_RETRIES", "2")) for attempt in range(1, max_attempts + 1): try: with httpx.Client(timeout=timeout, trust_env=False) as client: with client.stream("POST", url, json=payload) as response: response.raise_for_status() for raw_line in response.iter_lines(): if not raw_line: continue line = raw_line.strip() if line.startswith("data:"): line = line[5:].strip() if line == "[DONE]": break try: chunk = json.loads(line) except json.JSONDecodeError: yield {"type": "debug", "message": "invalid vLLM stream chunk", "raw": line[:200]} continue for text in _extract_delta_text(chunk): for event in _events_from_vllm_delta(text, parser_state): if event["type"] == "text_delta": reply_text += event["text"] elif event["type"] == "stage": last_stage = {**last_stage, **event} yield event break except Exception as exc: if attempt < max_attempts and not reply_text: parser_state = {"pending_stage": True, "buffer": "", "started_text": False} yield {"type": "debug", "message": f"Modal vLLM retry {attempt}/{max_attempts}: {exc}"} time.sleep(2 * attempt) continue worried = normalize_event({"type": "stage", "expression": "worried", "motion": "gentle_blink", "intensity": 0.8}) yield worried yield {"type": "error", "message": f"Modal vLLM 调用失败:{exc}"} yield from _stream_mock_reply(user_text, state, voice_state) return for event in _flush_vllm_parser(parser_state): if event["type"] == "text_delta": reply_text += event["text"] elif event["type"] == "stage": last_stage = {**last_stage, **event} yield event clean_reply = reply_text.strip() if clean_reply: yield normalize_event({"type": "sentence_end", "text": clean_reply}) if _voice_enabled(voice_state): yield from _synthesize_audio_event(clean_reply, character, voice_state or state.get("voice", {})) settle = _settled_stage(last_stage) if settle: yield settle yield {"type": "done"} def _build_openai_messages( user_text: str, history: list[dict], character: dict, skill: str, vision_note: str | None, media_inputs: dict[str, list[dict[str, Any]]] | None = None, ) -> list[dict[str, Any]]: system_prompt = build_tavern_system_prompt( character, skill=skill, vision_note=vision_note, include_examples=len(history) < 8, ) messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}] for item in history[-10:]: role = item.get("role") content = item.get("content") if role not in {"user", "assistant"}: continue if isinstance(content, list): continue content_text = str(content or "").strip() if content_text: messages.append({"role": role, "content": content_text}) user_content = _build_user_content(user_text, media_inputs or {"images": []}) messages.append({"role": "user", "content": user_content}) return messages def _build_user_content(user_text: str, media_inputs: dict[str, list[dict[str, Any]]]) -> str | list[dict[str, Any]]: content: list[dict[str, Any]] = [] if user_text.strip(): content.append({"type": "text", "text": user_text.strip()}) for item in media_inputs.get("images", []): path = item.get("path") if path: content.append({"type": "image_url", "image_url": {"url": _data_url(path)}}) if not content: return user_text return content def _extract_delta_text(chunk: dict) -> Iterator[str]: for choice in chunk.get("choices") or []: delta = choice.get("delta") or {} text = delta.get("content") or delta.get("reasoning") or delta.get("reasoning_content") if text: yield str(text) def _events_from_vllm_delta(text: str, parser_state: dict) -> Iterator[dict]: if not parser_state.get("pending_stage"): if not parser_state.get("started_text"): text = text.lstrip() if not text: return parser_state["started_text"] = True yield normalize_event({"type": "text_delta", "text": text}) return parser_state["buffer"] += text buffered = parser_state["buffer"] stripped = buffered.lstrip() if not (stripped.startswith("" not in stripped: if len(stripped) > 180: parser_state["pending_stage"] = False yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55}) text = buffered.lstrip() if text: parser_state["started_text"] = True yield normalize_event({"type": "text_delta", "text": text}) parser_state["buffer"] = "" return tag, rest = stripped.split(">", 1) tag = tag + ">" parser_state["pending_stage"] = False parser_state["buffer"] = "" yield _parse_stage_tag(tag) text = rest.lstrip() if text: parser_state["started_text"] = True yield normalize_event({"type": "text_delta", "text": text}) def _flush_vllm_parser(parser_state: dict) -> Iterator[dict]: if parser_state.get("pending_stage") and parser_state.get("buffer"): parser_state["pending_stage"] = False yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55}) text = parser_state["buffer"].lstrip() if text: parser_state["started_text"] = True yield normalize_event({"type": "text_delta", "text": text}) parser_state["buffer"] = "" def _parse_stage_tag(tag: str) -> dict: match = STAGE_TAG_RE.fullmatch(tag.strip()) if not match: return normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55}) return normalize_event( { "type": "stage", "expression": match.group("expression"), "motion": match.group("motion"), "intensity": match.group("intensity"), } ) def _stream_mock_reply(user_text: str, state: dict, voice_state: dict[str, Any] | None = None) -> Iterator[dict]: character = state.get("character") or get_character(state.get("character_id", "star_knight")) skill = select_skill(user_text, character) voice = {**character.get("voice", {}), **(voice_state or {})} yield normalize_event({"type": "skill", "name": skill}) yield normalize_event({"type": "stage", "expression": "thinking", "motion": "look_at_user", "intensity": 0.45}) yield normalize_event({"type": "voice", "style": voice.get("style", "soft"), "speed": voice.get("speed", 0.96), "energy": voice.get("energy", 0.5)}) if skill == "emotional_support": reply = "我在听。你不用马上把自己整理好,先把这口气慢慢呼出来。" last_stage = normalize_event({"type": "stage", "expression": "worried", "motion": "gentle_blink", "intensity": 0.7}) yield last_stage elif skill == "battle_focus": reply = "收到。先确认边界,再判断风险。你站在我身后就好。" last_stage = normalize_event({"type": "stage", "expression": "thinking", "motion": "focus", "intensity": 0.8}) yield last_stage elif skill == "playful_reframe": reply = "这听起来像一个支线任务。目标很小,但奖励可能意外地不错。" last_stage = normalize_event({"type": "stage", "expression": "smile", "motion": "soft_sway", "intensity": 0.7}) yield last_stage else: catchphrase = (character.get("dialogue_style", {}).get("catchphrases") or ["我在。"])[0] reply = f"{catchphrase} 你刚才说的我记下了,我们可以从最容易的一步开始。" last_stage = normalize_event({"type": "stage", "expression": "smile", "motion": "gentle_blink", "intensity": 0.5}) yield last_stage for part in _chunk_text(reply): yield normalize_event({"type": "text_delta", "text": part}) yield normalize_event({"type": "sentence_end", "text": reply}) if _voice_enabled(voice_state): yield from _synthesize_audio_event(reply, character, voice_state or state.get("voice", {})) settle = _settled_stage(last_stage) if settle: yield settle yield {"type": "done"} def _chunk_text(text: str, size: int = 4) -> Iterator[str]: for index in range(0, len(text), size): yield text[index : index + size] def _settled_stage(stage: dict[str, Any] | None) -> dict[str, Any] | None: if not stage: return None if stage.get("motion") != "talk": return None return normalize_event( { "type": "stage", "expression": stage.get("expression", "smile"), "motion": "gentle_blink", "intensity": min(float(stage.get("intensity", 0.55)), 0.6), } ) def _voice_enabled(voice_state: dict[str, Any] | None) -> bool: if not voice_state: return True return bool(voice_state.get("enabled", True)) def _synthesize_audio_event(text: str, character: dict, voice_state: dict[str, Any]) -> Iterator[dict]: try: audio = synthesize_sentence(text, character, voice_state) except Exception as exc: yield {"type": "error", "message": f"TTS 调用失败:{exc}"} return if audio: yield {"type": "audio", "path": audio} def _data_url(path_value: str | Path) -> str: path = Path(path_value) mime_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream" return f"data:{mime_type};base64,{_base64_file(path)}" def _base64_file(path_value: str | Path) -> str: return base64.b64encode(Path(path_value).read_bytes()).decode("ascii")