File size: 15,452 Bytes
6bcddd0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
from __future__ import annotations

import base64
import json
import mimetypes
import os
import re
import time
from collections.abc import Iterator
from pathlib import Path
from typing import Any

from src.character_registry import build_tavern_system_prompt, get_character
from src.persona_skills import select_skill
from src.stream_protocol import normalize_event, parse_sse_lines
from src.tts_engine import synthesize_sentence


DEFAULT_VLLM_BASE_URL = "https://veronicaulises0--virtual-characters-vllm-gemma-serve.modal.run"
STAGE_TAG_RE = re.compile(
    r"<stage\s+expression=\"(?P<expression>[a-z_]+)\"\s+motion=\"(?P<motion>[a-z_]+)\"\s+intensity=\"(?P<intensity>[0-9.]+)\"\s*>"
)


def stream_reply(
    user_text: str,
    history: list[dict],
    state: dict,
    media_inputs: dict[str, list[dict[str, Any]]] | None = None,
    voice_state: dict[str, Any] | None = None,
) -> Iterator[dict]:
    media_inputs = media_inputs or {"images": []}
    if os.environ.get("VC_USE_MOCK") == "1":
        yield from _stream_mock_reply(user_text, state, voice_state)
        return

    modal_url = os.environ.get("VC_MODAL_LLM_URL")
    if modal_url:
        yield from _stream_modal_reply(modal_url, user_text, history, state, media_inputs, voice_state)
        return

    vllm_url = os.environ.get("VC_MODAL_VLLM_URL") or DEFAULT_VLLM_BASE_URL
    if vllm_url:
        yield from _stream_vllm_reply(vllm_url, user_text, history, state, media_inputs, voice_state)
        return

    yield from _stream_mock_reply(user_text, state, voice_state)


def _stream_modal_reply(
    url: str,
    user_text: str,
    history: list[dict],
    state: dict,
    media_inputs: dict[str, list[dict[str, Any]]],
    voice_state: dict[str, Any] | None,
) -> Iterator[dict]:
    import httpx

    character = state.get("character") or get_character(state.get("character_id", "star_knight"))
    payload = {
        "text": user_text,
        "history": history[-8:],
        "character": character,
        "vision_note": state.get("last_vision_note"),
        "image_urls": [_data_url(item["path"]) for item in media_inputs.get("images", []) if item.get("path")],
        "max_new_tokens": 180,
    }
    last_stage = state.get("stage", {})
    with httpx.stream("POST", url, json=payload, timeout=180) as response:
        response.raise_for_status()
        for event in parse_sse_lines(response.iter_lines()):
            if event.get("type") == "stage":
                last_stage = {**last_stage, **event}
            yield event
            if event.get("type") == "sentence_end" and _voice_enabled(voice_state):
                yield from _synthesize_audio_event(event.get("text", ""), character, voice_state or state.get("voice", {}))
    settle = _settled_stage(last_stage)
    if settle:
        yield settle


def _stream_vllm_reply(
    base_url: str,
    user_text: str,
    history: list[dict],
    state: dict,
    media_inputs: dict[str, list[dict[str, Any]]] | None,
    voice_state: dict[str, Any] | None,
) -> Iterator[dict]:
    import httpx

    media_inputs = media_inputs or {"images": []}
    character = state.get("character") or get_character(state.get("character_id", "star_knight"))
    skill = select_skill(user_text, character)
    voice = {**character.get("voice", {}), **(voice_state or {})}
    reply_text = ""
    last_stage = {"type": "stage", "expression": "thinking", "motion": "look_at_user", "intensity": 0.55}

    yield normalize_event({"type": "skill", "name": skill})
    yield normalize_event(last_stage)
    yield normalize_event({"type": "voice", "style": voice.get("style", "soft"), "speed": voice.get("speed", 0.96), "energy": voice.get("energy", 0.5)})
    yield {"type": "debug", "message": f"Modal vLLM endpoint: {base_url.rstrip('/')}"}

    payload = {
        "model": os.environ.get("VC_VLLM_SERVED_MODEL", "llm"),
        "messages": _build_openai_messages(user_text, history, character, skill, state.get("last_vision_note"), media_inputs),
        "max_tokens": int(os.environ.get("VC_VLLM_MAX_TOKENS", "220")),
        "temperature": float(os.environ.get("VC_VLLM_TEMPERATURE", "0.75")),
        "stream": True,
        "chat_template_kwargs": {"enable_thinking": False},
    }
    timeout = httpx.Timeout(connect=20, read=float(os.environ.get("VC_VLLM_READ_TIMEOUT", "600")), write=20, pool=20)
    parser_state = {"pending_stage": True, "buffer": "", "started_text": False}
    url = base_url.rstrip("/") + "/v1/chat/completions"

    max_attempts = int(os.environ.get("VC_VLLM_RETRIES", "2"))
    for attempt in range(1, max_attempts + 1):
        try:
            with httpx.Client(timeout=timeout, trust_env=False) as client:
                with client.stream("POST", url, json=payload) as response:
                    response.raise_for_status()
                    for raw_line in response.iter_lines():
                        if not raw_line:
                            continue
                        line = raw_line.strip()
                        if line.startswith("data:"):
                            line = line[5:].strip()
                        if line == "[DONE]":
                            break
                        try:
                            chunk = json.loads(line)
                        except json.JSONDecodeError:
                            yield {"type": "debug", "message": "invalid vLLM stream chunk", "raw": line[:200]}
                            continue

                        for text in _extract_delta_text(chunk):
                            for event in _events_from_vllm_delta(text, parser_state):
                                if event["type"] == "text_delta":
                                    reply_text += event["text"]
                                elif event["type"] == "stage":
                                    last_stage = {**last_stage, **event}
                                yield event
            break
        except Exception as exc:
            if attempt < max_attempts and not reply_text:
                parser_state = {"pending_stage": True, "buffer": "", "started_text": False}
                yield {"type": "debug", "message": f"Modal vLLM retry {attempt}/{max_attempts}: {exc}"}
                time.sleep(2 * attempt)
                continue
            worried = normalize_event({"type": "stage", "expression": "worried", "motion": "gentle_blink", "intensity": 0.8})
            yield worried
            yield {"type": "error", "message": f"Modal vLLM 调用失败:{exc}"}
            yield from _stream_mock_reply(user_text, state, voice_state)
            return

    for event in _flush_vllm_parser(parser_state):
        if event["type"] == "text_delta":
            reply_text += event["text"]
        elif event["type"] == "stage":
            last_stage = {**last_stage, **event}
        yield event

    clean_reply = reply_text.strip()
    if clean_reply:
        yield normalize_event({"type": "sentence_end", "text": clean_reply})
        if _voice_enabled(voice_state):
            yield from _synthesize_audio_event(clean_reply, character, voice_state or state.get("voice", {}))
    settle = _settled_stage(last_stage)
    if settle:
        yield settle
    yield {"type": "done"}


def _build_openai_messages(
    user_text: str,
    history: list[dict],
    character: dict,
    skill: str,
    vision_note: str | None,
    media_inputs: dict[str, list[dict[str, Any]]] | None = None,
) -> list[dict[str, Any]]:
    system_prompt = build_tavern_system_prompt(
        character,
        skill=skill,
        vision_note=vision_note,
        include_examples=len(history) < 8,
    )

    messages: list[dict[str, Any]] = [{"role": "system", "content": system_prompt}]
    for item in history[-10:]:
        role = item.get("role")
        content = item.get("content")
        if role not in {"user", "assistant"}:
            continue
        if isinstance(content, list):
            continue
        content_text = str(content or "").strip()
        if content_text:
            messages.append({"role": role, "content": content_text})

    user_content = _build_user_content(user_text, media_inputs or {"images": []})
    messages.append({"role": "user", "content": user_content})
    return messages


def _build_user_content(user_text: str, media_inputs: dict[str, list[dict[str, Any]]]) -> str | list[dict[str, Any]]:
    content: list[dict[str, Any]] = []
    if user_text.strip():
        content.append({"type": "text", "text": user_text.strip()})
    for item in media_inputs.get("images", []):
        path = item.get("path")
        if path:
            content.append({"type": "image_url", "image_url": {"url": _data_url(path)}})
    if not content:
        return user_text
    return content


def _extract_delta_text(chunk: dict) -> Iterator[str]:
    for choice in chunk.get("choices") or []:
        delta = choice.get("delta") or {}
        text = delta.get("content") or delta.get("reasoning") or delta.get("reasoning_content")
        if text:
            yield str(text)


def _events_from_vllm_delta(text: str, parser_state: dict) -> Iterator[dict]:
    if not parser_state.get("pending_stage"):
        if not parser_state.get("started_text"):
            text = text.lstrip()
            if not text:
                return
            parser_state["started_text"] = True
        yield normalize_event({"type": "text_delta", "text": text})
        return

    parser_state["buffer"] += text
    buffered = parser_state["buffer"]
    stripped = buffered.lstrip()
    if not (stripped.startswith("<stage") or "<stage".startswith(stripped)):
        parser_state["pending_stage"] = False
        yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55})
        text = buffered.lstrip()
        if text:
            parser_state["started_text"] = True
            yield normalize_event({"type": "text_delta", "text": text})
        parser_state["buffer"] = ""
        return

    if ">" not in stripped:
        if len(stripped) > 180:
            parser_state["pending_stage"] = False
            yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55})
            text = buffered.lstrip()
            if text:
                parser_state["started_text"] = True
                yield normalize_event({"type": "text_delta", "text": text})
            parser_state["buffer"] = ""
        return

    tag, rest = stripped.split(">", 1)
    tag = tag + ">"
    parser_state["pending_stage"] = False
    parser_state["buffer"] = ""
    yield _parse_stage_tag(tag)
    text = rest.lstrip()
    if text:
        parser_state["started_text"] = True
        yield normalize_event({"type": "text_delta", "text": text})


def _flush_vllm_parser(parser_state: dict) -> Iterator[dict]:
    if parser_state.get("pending_stage") and parser_state.get("buffer"):
        parser_state["pending_stage"] = False
        yield normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55})
        text = parser_state["buffer"].lstrip()
        if text:
            parser_state["started_text"] = True
            yield normalize_event({"type": "text_delta", "text": text})
        parser_state["buffer"] = ""


def _parse_stage_tag(tag: str) -> dict:
    match = STAGE_TAG_RE.fullmatch(tag.strip())
    if not match:
        return normalize_event({"type": "stage", "expression": "smile", "motion": "talk", "intensity": 0.55})
    return normalize_event(
        {
            "type": "stage",
            "expression": match.group("expression"),
            "motion": match.group("motion"),
            "intensity": match.group("intensity"),
        }
    )


def _stream_mock_reply(user_text: str, state: dict, voice_state: dict[str, Any] | None = None) -> Iterator[dict]:
    character = state.get("character") or get_character(state.get("character_id", "star_knight"))
    skill = select_skill(user_text, character)
    voice = {**character.get("voice", {}), **(voice_state or {})}
    yield normalize_event({"type": "skill", "name": skill})
    yield normalize_event({"type": "stage", "expression": "thinking", "motion": "look_at_user", "intensity": 0.45})
    yield normalize_event({"type": "voice", "style": voice.get("style", "soft"), "speed": voice.get("speed", 0.96), "energy": voice.get("energy", 0.5)})

    if skill == "emotional_support":
        reply = "我在听。你不用马上把自己整理好,先把这口气慢慢呼出来。"
        last_stage = normalize_event({"type": "stage", "expression": "worried", "motion": "gentle_blink", "intensity": 0.7})
        yield last_stage
    elif skill == "battle_focus":
        reply = "收到。先确认边界,再判断风险。你站在我身后就好。"
        last_stage = normalize_event({"type": "stage", "expression": "thinking", "motion": "focus", "intensity": 0.8})
        yield last_stage
    elif skill == "playful_reframe":
        reply = "这听起来像一个支线任务。目标很小,但奖励可能意外地不错。"
        last_stage = normalize_event({"type": "stage", "expression": "smile", "motion": "soft_sway", "intensity": 0.7})
        yield last_stage
    else:
        catchphrase = (character.get("dialogue_style", {}).get("catchphrases") or ["我在。"])[0]
        reply = f"{catchphrase} 你刚才说的我记下了,我们可以从最容易的一步开始。"
        last_stage = normalize_event({"type": "stage", "expression": "smile", "motion": "gentle_blink", "intensity": 0.5})
        yield last_stage

    for part in _chunk_text(reply):
        yield normalize_event({"type": "text_delta", "text": part})
    yield normalize_event({"type": "sentence_end", "text": reply})
    if _voice_enabled(voice_state):
        yield from _synthesize_audio_event(reply, character, voice_state or state.get("voice", {}))
    settle = _settled_stage(last_stage)
    if settle:
        yield settle
    yield {"type": "done"}


def _chunk_text(text: str, size: int = 4) -> Iterator[str]:
    for index in range(0, len(text), size):
        yield text[index : index + size]


def _settled_stage(stage: dict[str, Any] | None) -> dict[str, Any] | None:
    if not stage:
        return None
    if stage.get("motion") != "talk":
        return None
    return normalize_event(
        {
            "type": "stage",
            "expression": stage.get("expression", "smile"),
            "motion": "gentle_blink",
            "intensity": min(float(stage.get("intensity", 0.55)), 0.6),
        }
    )


def _voice_enabled(voice_state: dict[str, Any] | None) -> bool:
    if not voice_state:
        return True
    return bool(voice_state.get("enabled", True))


def _synthesize_audio_event(text: str, character: dict, voice_state: dict[str, Any]) -> Iterator[dict]:
    try:
        audio = synthesize_sentence(text, character, voice_state)
    except Exception as exc:
        yield {"type": "error", "message": f"TTS 调用失败:{exc}"}
        return
    if audio:
        yield {"type": "audio", "path": audio}


def _data_url(path_value: str | Path) -> str:
    path = Path(path_value)
    mime_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
    return f"data:{mime_type};base64,{_base64_file(path)}"


def _base64_file(path_value: str | Path) -> str:
    return base64.b64encode(Path(path_value).read_bytes()).decode("ascii")