Spaces:
Sleeping
Sleeping
wanghao76
fix: adapt app.py for Gradio 6.x API (remove Chatbot type, move theme/css to launch)
a8c7472 | """ | |
| app.py - StoryWeaver Gradio 交互界面 | |
| 职责: | |
| 1. 构建游戏的 Web 前端界面 (Gradio) | |
| 2. 串联 NLU 引擎、叙事引擎、状态管理器 | |
| 3. 管理用户交互流程(文本输入 + 选项点击) | |
| 4. 展示游戏状态(HP、背包、任务等) | |
| 数据流转: | |
| 用户输入 → NLU 引擎(意图识别) → 叙事引擎(两阶段生成) | |
| ↕ ↕ | |
| Gradio UI ← 状态管理器(校验 + 更新) ← 叙事引擎(文本 + 选项) | |
| """ | |
| import copy | |
| from collections import Counter | |
| import html | |
| import json | |
| import logging | |
| from time import perf_counter | |
| import gradio as gr | |
| from state_manager import GameState | |
| from nlu_engine import NLUEngine | |
| from scene_assets import get_scene_image_path, IMAGE_DIR | |
| from story_engine import StoryEngine | |
| from telemetry import append_turn_log, create_session_metadata | |
| from utils import logger | |
| APP_UI_CSS = """ | |
| .story-chat {min-height: 500px;} | |
| .status-panel { | |
| font-family: "Microsoft YaHei UI", "Noto Sans SC", sans-serif; | |
| font-size: 0.9em; | |
| line-height: 1.5; | |
| background: transparent !important; | |
| border: none !important; | |
| border-radius: 0 !important; | |
| padding: 10px 12px !important; | |
| box-shadow: none !important; | |
| overflow: visible !important; | |
| } | |
| .status-panel > div, | |
| .status-panel [class*="prose"], | |
| .status-panel .markdown-body, | |
| .status-panel [class*="wrap"] { | |
| background: transparent !important; | |
| border: none !important; | |
| box-shadow: none !important; | |
| padding: 0 !important; | |
| overflow: visible !important; | |
| } | |
| .status-panel * { | |
| word-break: break-word; | |
| overflow-wrap: anywhere; | |
| } | |
| .option-btn {min-height: 50px !important;} | |
| .side-action-btn, | |
| .side-action-btn button {min-height: 50px !important;} | |
| .backpack-btn button { | |
| min-height: 50px !important; | |
| background: #ffffff !important; | |
| color: #0f172a !important; | |
| border: 1px solid #d1d5db !important; | |
| } | |
| .scene-sidebar {gap: 12px;} | |
| .scene-card { | |
| border: 1px solid #e5e7eb !important; | |
| border-radius: 12px !important; | |
| background: #fcfcfd !important; | |
| box-shadow: 0 4px 14px rgba(15, 23, 42, 0.04) !important; | |
| } | |
| .scene-image { | |
| min-height: 260px; | |
| padding: 10px !important; | |
| } | |
| .scene-image > div, | |
| .scene-image img, | |
| .scene-image button, | |
| .scene-image [class*="image"], | |
| .scene-image [class*="wrap"], | |
| .scene-image [class*="frame"], | |
| .scene-image [class*="preview"] { | |
| border: none !important; | |
| box-shadow: none !important; | |
| background: transparent !important; | |
| } | |
| .scene-image img { | |
| width: 100%; | |
| height: 100%; | |
| object-fit: contain !important; | |
| border-radius: 10px; | |
| padding: 4px; | |
| background: #ffffff !important; | |
| } | |
| """ | |
| # ============================================================ | |
| # 全局游戏实例(每个会话独立) | |
| # ============================================================ | |
| # 使用 Gradio State 管理每个用户的游戏状态 | |
| # 这里先定义工厂函数 | |
| def create_new_game(player_name: str = "旅人") -> dict: | |
| """创建新游戏实例,返回包含所有引擎的字典""" | |
| game_state = GameState(player_name=player_name) | |
| nlu = NLUEngine(game_state) | |
| story = StoryEngine(game_state, enable_rule_text_polish=True) | |
| return { | |
| "game_state": game_state, | |
| "nlu": nlu, | |
| "story": story, | |
| "current_options": [], | |
| "started": False, | |
| **create_session_metadata(), | |
| } | |
| def _json_safe(value): | |
| """Convert nested values into JSON-serializable data for logs.""" | |
| if value is None or isinstance(value, (str, int, float, bool)): | |
| return value | |
| if isinstance(value, dict): | |
| return {str(key): _json_safe(val) for key, val in value.items()} | |
| if isinstance(value, (list, tuple, set)): | |
| return [_json_safe(item) for item in value] | |
| if hasattr(value, "model_dump"): | |
| return _json_safe(value.model_dump()) | |
| return str(value) | |
| def _build_state_snapshot(gs: GameState) -> dict: | |
| """Build a compact state snapshot for reproducible evaluation logs.""" | |
| active_quests = [] | |
| effective_stats = gs.get_effective_player_stats() | |
| equipment_bonuses = gs.get_equipment_stat_bonuses() | |
| environment_snapshot = gs.get_environment_snapshot(limit=3) | |
| for quest in gs.world.quests.values(): | |
| if quest.status == "active": | |
| active_quests.append( | |
| { | |
| "quest_id": quest.quest_id, | |
| "title": quest.title, | |
| "status": quest.status, | |
| "objectives": _json_safe(quest.objectives), | |
| } | |
| ) | |
| return { | |
| "turn": gs.turn, | |
| "game_mode": gs.game_mode, | |
| "location": gs.player.location, | |
| "scene": gs.world.current_scene, | |
| "day": gs.world.day_count, | |
| "time_of_day": gs.world.time_of_day, | |
| "weather": gs.world.weather, | |
| "light_level": gs.world.light_level, | |
| "environment": _json_safe(environment_snapshot), | |
| "player": { | |
| "name": gs.player.name, | |
| "level": gs.player.level, | |
| "hp": gs.player.hp, | |
| "max_hp": gs.player.max_hp, | |
| "mp": gs.player.mp, | |
| "max_mp": gs.player.max_mp, | |
| "attack": gs.player.attack, | |
| "defense": gs.player.defense, | |
| "speed": gs.player.speed, | |
| "luck": gs.player.luck, | |
| "perception": gs.player.perception, | |
| "gold": gs.player.gold, | |
| "morale": gs.player.morale, | |
| "sanity": gs.player.sanity, | |
| "hunger": gs.player.hunger, | |
| "karma": gs.player.karma, | |
| "effective_stats": _json_safe(effective_stats), | |
| "equipment_bonuses": _json_safe(equipment_bonuses), | |
| "inventory": list(gs.player.inventory), | |
| "equipment": copy.deepcopy(gs.player.equipment), | |
| "skills": list(gs.player.skills), | |
| "status_effects": [effect.name for effect in gs.player.status_effects], | |
| }, | |
| "active_quests": active_quests, | |
| "event_log_size": len(gs.event_log), | |
| } | |
| def _record_interaction_log( | |
| game_session: dict, | |
| *, | |
| input_source: str, | |
| user_input: str, | |
| intent_result: dict | None, | |
| output_text: str, | |
| latency_ms: float, | |
| nlu_latency_ms: float | None = None, | |
| generation_latency_ms: float | None = None, | |
| final_result: dict | None = None, | |
| selected_option: dict | None = None, | |
| ): | |
| """Append a structured interaction log without affecting gameplay.""" | |
| if not game_session or "game_state" not in game_session: | |
| return | |
| final_result = final_result or {} | |
| telemetry = _json_safe(final_result.get("telemetry", {})) or {} | |
| record = { | |
| "input_source": input_source, | |
| "user_input": user_input, | |
| "selected_option": _json_safe(selected_option), | |
| "nlu_result": _json_safe(intent_result), | |
| "latency_ms": round(latency_ms, 2), | |
| "nlu_latency_ms": None if nlu_latency_ms is None else round(nlu_latency_ms, 2), | |
| "generation_latency_ms": None if generation_latency_ms is None else round(generation_latency_ms, 2), | |
| "used_fallback": bool(telemetry.get("used_fallback", False)), | |
| "fallback_reason": telemetry.get("fallback_reason"), | |
| "engine_mode": telemetry.get("engine_mode"), | |
| "state_changes": _json_safe(final_result.get("state_changes", {})), | |
| "change_log": _json_safe(final_result.get("change_log", [])), | |
| "consistency_issues": _json_safe(final_result.get("consistency_issues", [])), | |
| "output_text": output_text, | |
| "story_text": final_result.get("story_text"), | |
| "options": _json_safe(final_result.get("options", game_session.get("current_options", []))), | |
| "post_turn_snapshot": _build_state_snapshot(game_session["game_state"]), | |
| } | |
| try: | |
| append_turn_log(game_session, record) | |
| except Exception as exc: | |
| logger.warning(f"Failed to append interaction log: {exc}") | |
| def _build_option_intent(selected_option: dict) -> dict: | |
| """Represent button clicks in the same schema as free-text NLU output.""" | |
| option_text = selected_option.get("text", "") | |
| return { | |
| "intent": selected_option.get("action_type", "EXPLORE"), | |
| "target": selected_option.get("target"), | |
| "details": option_text, | |
| "raw_input": option_text, | |
| "parser_source": "option_click", | |
| } | |
| def _get_scene_image_value(gs: GameState) -> str | None: | |
| focus_npc = getattr(gs, "last_interacted_npc", None) | |
| return get_scene_image_path(gs, focus_npc=focus_npc) | |
| def _get_scene_image_update(gs: GameState): | |
| image_value = _get_scene_image_value(gs) | |
| return gr.update(value=image_value, visible=bool(image_value)) | |
| def _build_map_graph_data(gs: GameState) -> dict: | |
| """基于已发现地点与连接关系构建地图拓扑数据。""" | |
| world_locations = getattr(getattr(gs, "world", None), "locations", {}) or {} | |
| discovered = list(getattr(getattr(gs, "world", None), "discovered_locations", []) or []) | |
| history = list(getattr(gs, "location_history", []) or []) | |
| current_location = str(getattr(gs, "current_location", None) or "").strip() | |
| if not current_location: | |
| current_location = str(getattr(getattr(gs, "player", None), "location", None) or "未知之地") | |
| visible_set: set[str] = set(discovered) | set(history) | |
| if current_location: | |
| visible_set.add(current_location) | |
| # 使用世界注册顺序保证地图输出稳定,便于玩家快速扫描。 | |
| ordered_nodes: list[str] = [name for name in world_locations.keys() if name in visible_set] | |
| for name in discovered + history + [current_location]: | |
| if name and name in visible_set and name not in ordered_nodes: | |
| ordered_nodes.append(name) | |
| visited_set = set(history) | |
| if current_location: | |
| visited_set.add(current_location) | |
| adjacency: dict[str, list[str]] = {} | |
| for node in ordered_nodes: | |
| loc_info = world_locations.get(node) | |
| if not loc_info: | |
| adjacency[node] = [] | |
| continue | |
| neighbors = [] | |
| for neighbor in list(getattr(loc_info, "connected_to", []) or []): | |
| if neighbor in visible_set and neighbor != node: | |
| neighbors.append(neighbor) | |
| adjacency[node] = neighbors | |
| node_state: dict[str, str] = {} | |
| for node in ordered_nodes: | |
| if node == current_location: | |
| node_state[node] = "current" | |
| elif node in visited_set: | |
| node_state[node] = "visited" | |
| else: | |
| node_state[node] = "known" | |
| return { | |
| "current_location": current_location, | |
| "nodes": ordered_nodes, | |
| "adjacency": adjacency, | |
| "node_state": node_state, | |
| } | |
| def _build_location_hover_text(gs: GameState, location_name: str) -> str: | |
| """构造地点 hover 提示:展示 NPC 与怪物。""" | |
| world = getattr(gs, "world", None) | |
| locations = getattr(world, "locations", {}) or {} | |
| npcs = getattr(world, "npcs", {}) or {} | |
| loc = locations.get(location_name) | |
| if not loc: | |
| return f"{location_name}\nNPC: 无\n怪物: 无" | |
| npc_names: set[str] = set(getattr(loc, "npcs_present", []) or []) | |
| for npc in npcs.values(): | |
| if getattr(npc, "location", None) == location_name and getattr(npc, "is_alive", True): | |
| npc_names.add(getattr(npc, "name", "")) | |
| npc_names = {name for name in npc_names if name} | |
| enemy_names = [str(name) for name in list(getattr(loc, "enemies", []) or []) if str(name)] | |
| npc_text = "、".join(sorted(npc_names)) if npc_names else "无" | |
| enemy_text = "、".join(enemy_names) if enemy_names else "无" | |
| return f"{location_name}\nNPC: {npc_text}\n怪物: {enemy_text}" | |
| def _truncate_map_label(name: str, max_len: int = 8) -> str: | |
| text = str(name or "") | |
| return text if len(text) <= max_len else f"{text[:max_len]}..." | |
| def _build_fixed_branch_layout(nodes: list[str], adjacency: dict[str, list[str]]) -> dict[str, tuple[int, int]]: | |
| """固定起点的分层布局:保持总体顺序稳定,同时展示分支。""" | |
| if not nodes: | |
| return {} | |
| node_set = set(nodes) | |
| layers: dict[str, int] = {} | |
| def _bfs(seed: str, base_layer: int) -> None: | |
| if seed not in node_set or seed in layers: | |
| return | |
| queue: list[str] = [seed] | |
| layers[seed] = base_layer | |
| cursor = 0 | |
| while cursor < len(queue): | |
| node = queue[cursor] | |
| cursor += 1 | |
| next_layer = layers[node] + 1 | |
| for nxt in adjacency.get(node, []): | |
| if nxt in node_set and nxt not in layers: | |
| layers[nxt] = next_layer | |
| queue.append(nxt) | |
| # 第一出现地点作为固定起点,避免因当前位置变化而重排。 | |
| _bfs(nodes[0], 0) | |
| for name in nodes: | |
| if name not in layers: | |
| base = (max(layers.values()) + 1) if layers else 0 | |
| _bfs(name, base) | |
| level_nodes: dict[int, list[str]] = {} | |
| for name in nodes: | |
| level = layers.get(name, 0) | |
| level_nodes.setdefault(level, []).append(name) | |
| positions: dict[str, tuple[int, int]] = {} | |
| for col_idx, level in enumerate(sorted(level_nodes.keys())): | |
| for row_idx, name in enumerate(level_nodes[level]): | |
| positions[name] = (col_idx, row_idx) | |
| return positions | |
| def _render_text_map(gs: GameState | None) -> str: | |
| """拓扑地图:从左到右显示地点关系图。""" | |
| if gs is None: | |
| return "地图关系图\n(未开始)" | |
| graph = _build_map_graph_data(gs) | |
| nodes = graph["nodes"] | |
| adjacency = graph["adjacency"] | |
| node_state = graph["node_state"] | |
| current_location = graph["current_location"] | |
| if not nodes: | |
| current = current_location or "未知之地" | |
| return f"地图关系图\n当前位置:{current}" | |
| positions = _build_fixed_branch_layout(nodes, adjacency) | |
| if not positions: | |
| return "地图关系图\n(暂无可显示节点)" | |
| node_width = 110 | |
| node_height = 34 | |
| col_gap = 140 | |
| row_gap = 50 | |
| x_margin = 16 | |
| y_margin = 16 | |
| max_col = max(col for col, _ in positions.values()) | |
| max_row = max(row for _, row in positions.values()) | |
| canvas_width = x_margin * 2 + max_col * col_gap + node_width | |
| canvas_height = y_margin * 2 + max_row * row_gap + node_height | |
| canvas_height = max(canvas_height, 74) | |
| node_boxes: dict[str, tuple[int, int, int, int]] = {} | |
| centers: dict[str, tuple[int, int]] = {} | |
| for name, (col, row) in positions.items(): | |
| x = x_margin + col * col_gap | |
| y = y_margin + row * row_gap | |
| node_boxes[name] = (x, y, node_width, node_height) | |
| centers[name] = (x + node_width // 2, y + node_height // 2) | |
| edge_pairs: set[tuple[str, str]] = set() | |
| for source, neighbors in adjacency.items(): | |
| for target in neighbors: | |
| if source in positions and target in positions and source != target: | |
| edge_pairs.add(tuple(sorted((source, target)))) | |
| edge_svg: list[str] = [] | |
| def _segment_hits_box_horizontal(y: float, x_start: float, x_end: float, box: tuple[int, int, int, int]) -> bool: | |
| bx, by, bw, bh = box | |
| left = min(x_start, x_end) | |
| right = max(x_start, x_end) | |
| return (by + 1) <= y <= (by + bh - 1) and not (right <= bx + 1 or left >= bx + bw - 1) | |
| def _segment_hits_box_vertical(x: float, y_start: float, y_end: float, box: tuple[int, int, int, int]) -> bool: | |
| bx, by, bw, bh = box | |
| top = min(y_start, y_end) | |
| bottom = max(y_start, y_end) | |
| return (bx + 1) <= x <= (bx + bw - 1) and not (bottom <= by + 1 or top >= by + bh - 1) | |
| for source, target in sorted(edge_pairs): | |
| sx, sy, sw, sh = node_boxes[source] | |
| tx, ty, tw, th = node_boxes[target] | |
| source_exit_x = sx + sw | |
| source_exit_y = sy + sh / 2 | |
| target_entry_x = tx | |
| target_entry_y = ty + th / 2 | |
| mid_x = (source_exit_x + target_entry_x) / 2 | |
| needs_detour = False | |
| for name, box in node_boxes.items(): | |
| if name in {source, target}: | |
| continue | |
| if ( | |
| _segment_hits_box_horizontal(source_exit_y, source_exit_x, mid_x, box) | |
| or _segment_hits_box_vertical(mid_x, source_exit_y, target_entry_y, box) | |
| or _segment_hits_box_horizontal(target_entry_y, mid_x, target_entry_x, box) | |
| ): | |
| needs_detour = True | |
| break | |
| if not needs_detour: | |
| points = ( | |
| f"{source_exit_x},{source_exit_y} " | |
| f"{mid_x},{source_exit_y} " | |
| f"{mid_x},{target_entry_y} " | |
| f"{target_entry_x},{target_entry_y}" | |
| ) | |
| else: | |
| # 局部上绕:仅在必要时走上方,减少杂乱感同时避免压到节点。 | |
| route_y = max(4, min(source_exit_y, target_entry_y) - node_height / 2 - 10) | |
| route_left_x = source_exit_x + 6 | |
| route_right_x = target_entry_x - 6 | |
| points = ( | |
| f"{source_exit_x},{source_exit_y} " | |
| f"{route_left_x},{source_exit_y} " | |
| f"{route_left_x},{route_y} " | |
| f"{route_right_x},{route_y} " | |
| f"{route_right_x},{target_entry_y} " | |
| f"{target_entry_x},{target_entry_y}" | |
| ) | |
| edge_svg.append( | |
| f"<polyline points='{points}' " | |
| "fill='none' stroke='#94a3b8' stroke-width='1.7' " | |
| "stroke-linecap='round' stroke-linejoin='round' />" | |
| ) | |
| node_svg: list[str] = [] | |
| for name in nodes: | |
| col, row = positions[name] | |
| x = x_margin + col * col_gap | |
| y = y_margin + row * row_gap | |
| state = node_state.get(name, "known") | |
| escaped_name = html.escape(_truncate_map_label(name)) | |
| hover_text = html.escape(_build_location_hover_text(gs, name)) | |
| if state == "current": | |
| fill = "#fff7ed" | |
| stroke = "#f97316" | |
| text_color = "#9a3412" | |
| stroke_width = 1.8 | |
| display_name = escaped_name | |
| elif state == "visited": | |
| fill = "#f1f5f9" | |
| stroke = "#64748b" | |
| text_color = "#334155" | |
| stroke_width = 1.4 | |
| display_name = escaped_name | |
| else: | |
| fill = "#ffffff" | |
| stroke = "#cbd5e1" | |
| text_color = "#334155" | |
| stroke_width = 1.2 | |
| display_name = escaped_name | |
| rect_class_attr = " class='map-current-node'" if state == "current" else "" | |
| node_svg.append( | |
| "<g cursor='help'>" | |
| f"<title>{hover_text}</title>" | |
| f"<rect x='{x}' y='{y}' width='{node_width}' height='{node_height}' " | |
| f"rx='8' ry='8' fill='{fill}' stroke='{stroke}' stroke-width='{stroke_width}'{rect_class_attr} />" | |
| f"<text x='{x + node_width / 2}' y='{y + node_height / 2 + 5}' " | |
| "text-anchor='middle' font-size='11.5' font-family='Microsoft YaHei UI, Noto Sans SC, sans-serif' " | |
| f"fill='{text_color}'>{display_name}</text>" | |
| "</g>" | |
| ) | |
| svg = ( | |
| f"<svg width='{canvas_width}' height='{canvas_height}' viewBox='0 0 {canvas_width} {canvas_height}' " | |
| "xmlns='http://www.w3.org/2000/svg'>" | |
| "<style>" | |
| "@keyframes mapNodePulse{" | |
| "0%{fill:#fff7ed;stroke:#f97316;stroke-width:1.8;opacity:0.9;}" | |
| "50%{fill:#fdba74;stroke:#c2410c;stroke-width:1.8;opacity:1;}" | |
| "100%{fill:#fff7ed;stroke:#f97316;stroke-width:1.8;opacity:0.9;}" | |
| "}" | |
| ".map-current-node{animation:mapNodePulse 1.2s ease-in-out infinite;}" | |
| "</style>" | |
| + "".join(edge_svg) | |
| + "".join(node_svg) | |
| + "</svg>" | |
| ) | |
| return ( | |
| "<div style='font-size:0.9em;'>" | |
| "<details>" | |
| "<summary style='cursor:pointer;font-weight:700;'>展开地图关系图</summary>" | |
| "<div style='font-size:0.8em;color:#475569;margin:8px 0 6px 0;'>" | |
| "鼠标悬停于地点格可查看NPC与怪物。" | |
| "</div>" | |
| "<div style='overflow-x:auto;padding-bottom:2px;'>" | |
| + svg | |
| + "</div>" | |
| "</details>" | |
| "</div>" | |
| ) | |
| def restart_game() -> tuple: | |
| """ | |
| 重启冒险:清空所有数据,回到初始输入名称阶段。 | |
| Returns: | |
| (空聊天历史, 初始状态面板, 隐藏选项按钮×3, 空游戏会话, | |
| 禁用文本输入框, 重置角色名称) | |
| """ | |
| loading = _get_loading_button_updates() | |
| return ( | |
| [], # 清空聊天历史 | |
| _format_world_info_panel(None), # 重置世界信息 | |
| "## 等待开始...\n\n请输入角色名称并点击「开始冒险」", # 重置状态面板 | |
| "地图关系图\n(未开始)", # 清空地图 | |
| gr.update(value=None, visible=False), # 清空场景图片 | |
| *loading, # 占位选项按钮 | |
| {}, # 清空游戏会话 | |
| gr.update(value="", interactive=False), # 禁用并清空文本输入 | |
| gr.update(value="旅人"), # 重置角色名称 | |
| ) | |
| # ============================================================ | |
| # 核心交互函数 | |
| # ============================================================ | |
| def start_game(player_name: str, game_session: dict): | |
| """ | |
| 开始新游戏:流式生成开场叙事。 | |
| 使用生成器 yield 实现流式输出,让用户看到文字逐步出现。 | |
| """ | |
| if not player_name.strip(): | |
| player_name = "旅人" | |
| # 创建新游戏 | |
| game_session = create_new_game(player_name) | |
| game_session["started"] = True | |
| # 初始 yield:显示加载状态,按钮保持可见但禁用 | |
| chat_history = [{"role": "assistant", "content": "⏳ 正在生成开场..."}] | |
| world_info_text = _format_world_info_panel(game_session["game_state"]) | |
| status_text = _format_status_panel(game_session["game_state"]) | |
| loading = _get_loading_button_updates() | |
| yield ( | |
| chat_history, | |
| world_info_text, | |
| status_text, | |
| _render_text_map(game_session["game_state"]), | |
| _get_scene_image_update(game_session["game_state"]), | |
| *loading, | |
| game_session, | |
| gr.update(interactive=False), | |
| ) | |
| # 流式生成开场(选项仅在流结束后从 final 事件中提取,流式期间不解析选项) | |
| turn_started = perf_counter() | |
| story_text = "" | |
| final_result = None | |
| for update in game_session["story"].generate_opening_stream(): | |
| if update["type"] == "story_chunk": | |
| story_text = update["text"] | |
| chat_history[-1]["content"] = story_text | |
| yield ( | |
| chat_history, | |
| world_info_text, | |
| status_text, | |
| _render_text_map(game_session["game_state"]), | |
| _get_scene_image_update(game_session["game_state"]), | |
| *loading, | |
| game_session, | |
| gr.update(interactive=False), | |
| ) | |
| elif update["type"] == "final": | |
| final_result = update | |
| generation_latency_ms = (perf_counter() - turn_started) * 1000 | |
| # ★ 只在数据流完全结束后,从 final_result 中提取选项 | |
| if final_result: | |
| story_text = final_result.get("story_text", story_text) | |
| options = final_result.get("options", []) | |
| else: | |
| options = [] | |
| # ★ 安全兜底:强制确保恰好 3 个选项 | |
| options = _finalize_session_options(options) | |
| # 最终 yield:显示完整文本 + 选项 + 启用按钮 | |
| game_session["current_options"] = options | |
| full_message = story_text | |
| if not final_result: | |
| final_result = { | |
| "story_text": story_text, | |
| "options": options, | |
| "state_changes": {}, | |
| "change_log": [], | |
| "consistency_issues": [], | |
| "telemetry": { | |
| "engine_mode": "opening_app", | |
| "used_fallback": True, | |
| "fallback_reason": "missing_final_event", | |
| }, | |
| } | |
| chat_history[-1]["content"] = full_message | |
| world_info_text = _format_world_info_panel(game_session["game_state"]) | |
| status_text = _format_status_panel(game_session["game_state"]) | |
| btn_updates = _get_button_updates(options) | |
| _record_interaction_log( | |
| game_session, | |
| input_source="system_opening", | |
| user_input="", | |
| intent_result=None, | |
| output_text=full_message, | |
| latency_ms=generation_latency_ms, | |
| generation_latency_ms=generation_latency_ms, | |
| final_result=final_result, | |
| ) | |
| yield ( | |
| chat_history, | |
| world_info_text, | |
| status_text, | |
| _render_text_map(game_session["game_state"]), | |
| _get_scene_image_update(game_session["game_state"]), | |
| *btn_updates, | |
| game_session, | |
| gr.update(interactive=True), | |
| ) | |
| def process_user_input(user_input: str, chat_history: list, game_session: dict): | |
| """ | |
| 处理用户文本输入(流式版本)。 | |
| 流程: | |
| 1. NLU 引擎解析意图 | |
| 2. 叙事引擎流式生成故事 | |
| 3. 逐步更新 UI | |
| """ | |
| if not game_session or not game_session.get("started"): | |
| chat_history = chat_history or [] | |
| chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"}) | |
| loading = _get_loading_button_updates() | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(None), | |
| "", | |
| "", | |
| gr.update(value=None, visible=False), | |
| *loading, | |
| game_session, | |
| ) | |
| return | |
| if not user_input.strip(): | |
| btn_updates = _get_button_updates(game_session.get("current_options", [])) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(game_session["game_state"]), | |
| _format_status_panel(game_session["game_state"]), | |
| _render_text_map(game_session["game_state"]), | |
| _get_scene_image_update(game_session["game_state"]), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| return | |
| gs: GameState = game_session["game_state"] | |
| nlu: NLUEngine = game_session["nlu"] | |
| story: StoryEngine = game_session["story"] | |
| turn_started = perf_counter() | |
| # 检查游戏是否已结束 | |
| if gs.is_game_over(): | |
| chat_history.append({"role": "user", "content": user_input}) | |
| chat_history.append({"role": "assistant", "content": "游戏已结束。请点击「重新开始」按钮开始新的冒险。"}) | |
| restart_buttons = _get_button_updates( | |
| [ | |
| {"id": 1, "text": "重新开始", "action_type": "RESTART"}, | |
| ] | |
| ) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *restart_buttons, | |
| game_session, | |
| ) | |
| return | |
| # 1. NLU 解析 | |
| nlu_started = perf_counter() | |
| intent = nlu.parse_intent(user_input) | |
| nlu_latency_ms = (perf_counter() - nlu_started) * 1000 | |
| # 1.5 预校验:立即驳回违反一致性的操作(不调用 LLM,不消耗回合) | |
| is_valid, rejection_msg = gs.pre_validate_action(intent) | |
| if not is_valid: | |
| chat_history.append({"role": "user", "content": user_input}) | |
| options = game_session.get("current_options", []) | |
| options = _finalize_session_options(options) | |
| rejection_content = ( | |
| f"⚠️ **行动被驳回**:{rejection_msg}\n\n" | |
| f"请重新选择行动,或输入其他指令。" | |
| ) | |
| chat_history.append({"role": "assistant", "content": rejection_content}) | |
| rejection_result = { | |
| "story_text": rejection_content, | |
| "options": options, | |
| "state_changes": {}, | |
| "change_log": [], | |
| "consistency_issues": [], | |
| "telemetry": { | |
| "engine_mode": "pre_validation", | |
| "used_fallback": False, | |
| "fallback_reason": None, | |
| }, | |
| } | |
| _record_interaction_log( | |
| game_session, | |
| input_source="text_input", | |
| user_input=user_input, | |
| intent_result=intent, | |
| output_text=rejection_content, | |
| latency_ms=(perf_counter() - turn_started) * 1000, | |
| nlu_latency_ms=nlu_latency_ms, | |
| generation_latency_ms=0.0, | |
| final_result=rejection_result, | |
| ) | |
| btn_updates = _get_button_updates(options) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| return | |
| # 2. 添加用户消息 + 空的 assistant 消息(用于流式填充) | |
| chat_history.append({"role": "user", "content": user_input}) | |
| chat_history.append({"role": "assistant", "content": "⏳ 正在生成..."}) | |
| # 按钮保持可见但禁用,防止流式期间点击 | |
| loading = _get_loading_button_updates( | |
| max(len(game_session.get("current_options", [])), MIN_OPTION_BUTTONS) | |
| ) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *loading, | |
| game_session, | |
| ) | |
| # 3. 流式生成故事 | |
| generation_started = perf_counter() | |
| final_result = None | |
| for update in story.generate_story_stream(intent): | |
| if update["type"] == "story_chunk": | |
| chat_history[-1]["content"] = update["text"] | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *loading, | |
| game_session, | |
| ) | |
| elif update["type"] == "final": | |
| final_result = update | |
| generation_latency_ms = (perf_counter() - generation_started) * 1000 | |
| # 4. 最终更新:完整文本 + 状态变化 + 选项 + 按钮 | |
| if final_result: | |
| # ★ 安全兜底:强制确保恰好 3 个选项 | |
| options = _finalize_session_options(final_result.get("options", [])) | |
| game_session["current_options"] = options | |
| change_log = final_result.get("change_log", []) | |
| log_text = "" | |
| if change_log: | |
| log_text = "\n".join(f" {c}" for c in change_log) | |
| log_text = f"\n\n**状态变化:**\n{log_text}" | |
| issues = final_result.get("consistency_issues", []) | |
| issues_text = "" | |
| if issues: | |
| issues_text = "\n".join(f" {i}" for i in issues) | |
| issues_text = f"\n\n**一致性提示:**\n{issues_text}" | |
| full_message = f"{final_result['story_text']}{log_text}{issues_text}" | |
| chat_history[-1]["content"] = full_message | |
| status_text = _format_status_panel(gs) | |
| btn_updates = _get_button_updates(options) | |
| _record_interaction_log( | |
| game_session, | |
| input_source="text_input", | |
| user_input=user_input, | |
| intent_result=intent, | |
| output_text=full_message, | |
| latency_ms=(perf_counter() - turn_started) * 1000, | |
| nlu_latency_ms=nlu_latency_ms, | |
| generation_latency_ms=generation_latency_ms, | |
| final_result=final_result, | |
| ) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| status_text, | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| else: | |
| # ★ 兜底:final_result 为空,说明流式生成未产生 final 事件 | |
| logger.warning("流式生成未产生 final 事件,使用兜底文本") | |
| fallback_text = "你环顾四周,思考着接下来该做什么..." | |
| fallback_options = _finalize_session_options([]) | |
| game_session["current_options"] = fallback_options | |
| full_message = fallback_text | |
| fallback_result = { | |
| "story_text": fallback_text, | |
| "options": fallback_options, | |
| "state_changes": {}, | |
| "change_log": [], | |
| "consistency_issues": [], | |
| "telemetry": { | |
| "engine_mode": "app_fallback", | |
| "used_fallback": True, | |
| "fallback_reason": "missing_final_event", | |
| }, | |
| } | |
| chat_history[-1]["content"] = full_message | |
| status_text = _format_status_panel(gs) | |
| btn_updates = _get_button_updates(fallback_options) | |
| _record_interaction_log( | |
| game_session, | |
| input_source="text_input", | |
| user_input=user_input, | |
| intent_result=intent, | |
| output_text=full_message, | |
| latency_ms=(perf_counter() - turn_started) * 1000, | |
| nlu_latency_ms=nlu_latency_ms, | |
| generation_latency_ms=generation_latency_ms, | |
| final_result=fallback_result, | |
| ) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| status_text, | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| return | |
| def process_option_click(option_idx: int, chat_history: list, game_session: dict): | |
| """ | |
| 处理玩家点击选项按钮(流式版本)。 | |
| Args: | |
| option_idx: 选项索引 (0-5) | |
| """ | |
| if not game_session or not game_session.get("started"): | |
| chat_history = chat_history or [] | |
| chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"}) | |
| loading = _get_loading_button_updates() | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(None), | |
| "", | |
| "", | |
| gr.update(value=None, visible=False), | |
| *loading, | |
| game_session, | |
| ) | |
| return | |
| options = game_session.get("current_options", []) | |
| if option_idx >= len(options): | |
| btn_updates = _get_button_updates(options) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(game_session["game_state"]), | |
| _format_status_panel(game_session["game_state"]), | |
| _render_text_map(game_session["game_state"]), | |
| _get_scene_image_update(game_session["game_state"]), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| return | |
| selected_option = options[option_idx] | |
| gs: GameState = game_session["game_state"] | |
| story: StoryEngine = game_session["story"] | |
| option_intent = _build_option_intent(selected_option) | |
| turn_started = perf_counter() | |
| # 检查特殊选项:退出背包(不消耗回合) | |
| if selected_option.get("action_type") == "BACKPACK_EXIT": | |
| chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"}) | |
| chat_history.append({"role": "assistant", "content": "你合上背包,把注意力重新放回当前局势。"}) | |
| restored_options = game_session.pop("backpack_return_options", None) | |
| if not isinstance(restored_options, list): | |
| restored_options = _finalize_session_options([]) | |
| game_session["current_options"] = restored_options | |
| btn_updates = _get_button_updates(restored_options) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| return | |
| from_backpack_menu = str(selected_option.get("menu", "")) == "backpack" | |
| # 检查特殊选项:重新开始 | |
| if selected_option.get("action_type") == "RESTART": | |
| # 重新开始时使用流式开场 | |
| game_session = create_new_game(gs.player.name) | |
| game_session["started"] = True | |
| chat_history = [{"role": "assistant", "content": "⏳ 正在重新生成开场..."}] | |
| world_info_text = _format_world_info_panel(game_session["game_state"]) | |
| status_text = _format_status_panel(game_session["game_state"]) | |
| loading = _get_loading_button_updates() | |
| yield ( | |
| chat_history, | |
| world_info_text, | |
| status_text, | |
| _render_text_map(game_session["game_state"]), | |
| _get_scene_image_update(game_session["game_state"]), | |
| *loading, | |
| game_session, | |
| ) | |
| story_text = "" | |
| restart_final = None | |
| for update in game_session["story"].generate_opening_stream(): | |
| if update["type"] == "story_chunk": | |
| story_text = update["text"] | |
| chat_history[-1]["content"] = story_text | |
| yield ( | |
| chat_history, | |
| world_info_text, | |
| status_text, | |
| _render_text_map(game_session["game_state"]), | |
| _get_scene_image_update(game_session["game_state"]), | |
| *loading, | |
| game_session, | |
| ) | |
| elif update["type"] == "final": | |
| restart_final = update | |
| # ★ 只在流完全结束后提取选项 | |
| if restart_final: | |
| story_text = restart_final.get("story_text", story_text) | |
| restart_options = restart_final.get("options", []) | |
| else: | |
| restart_options = [] | |
| # ★ 安全兜底:强制确保恰好 3 个选项 | |
| restart_options = _finalize_session_options(restart_options) | |
| game_session["current_options"] = restart_options | |
| full_message = story_text | |
| chat_history[-1]["content"] = full_message | |
| status_text = _format_status_panel(game_session["game_state"]) | |
| btn_updates = _get_button_updates(restart_options) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(game_session["game_state"]), | |
| status_text, | |
| _render_text_map(game_session["game_state"]), | |
| _get_scene_image_update(game_session["game_state"]), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| return | |
| # 检查特殊选项:退出 | |
| if selected_option.get("action_type") == "QUIT": | |
| chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"}) | |
| chat_history.append({"role": "assistant", "content": "感谢游玩 StoryWeaver!\n你的冒险到此结束,但故事永远不会真正终结...\n\n点击「开始冒险」可以重新开始。"}) | |
| quit_buttons = _get_button_updates( | |
| [ | |
| {"id": 1, "text": "重新开始", "action_type": "RESTART"}, | |
| ] | |
| ) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *quit_buttons, | |
| game_session, | |
| ) | |
| return | |
| # 正常选项处理:流式生成 | |
| chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"}) | |
| chat_history.append({"role": "assistant", "content": "⏳ 正在生成..."}) | |
| # 按钮保持可见但禁用 | |
| loading = _get_loading_button_updates(max(len(options), MIN_OPTION_BUTTONS)) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *loading, | |
| game_session, | |
| ) | |
| generation_started = perf_counter() | |
| final_result = None | |
| for update in story.process_option_selection_stream(selected_option): | |
| if update["type"] == "story_chunk": | |
| chat_history[-1]["content"] = update["text"] | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *loading, | |
| game_session, | |
| ) | |
| elif update["type"] == "final": | |
| final_result = update | |
| generation_latency_ms = (perf_counter() - generation_started) * 1000 | |
| if final_result: | |
| # 背包菜单内执行使用/装备后,继续停留在背包菜单 | |
| if from_backpack_menu: | |
| options = _build_backpack_options(gs) | |
| else: | |
| options = _finalize_session_options(final_result.get("options", [])) | |
| game_session["current_options"] = options | |
| change_log = final_result.get("change_log", []) | |
| log_text = "" | |
| if change_log: | |
| log_text = "\n".join(f" {c}" for c in change_log) | |
| log_text = f"\n\n**状态变化:**\n{log_text}" | |
| full_message = f"{final_result['story_text']}{log_text}" | |
| chat_history[-1]["content"] = full_message | |
| status_text = _format_status_panel(gs) | |
| btn_updates = _get_button_updates(options) | |
| _record_interaction_log( | |
| game_session, | |
| input_source="option_click", | |
| user_input=selected_option.get("text", ""), | |
| intent_result=option_intent, | |
| output_text=full_message, | |
| latency_ms=(perf_counter() - turn_started) * 1000, | |
| generation_latency_ms=generation_latency_ms, | |
| final_result=final_result, | |
| selected_option=selected_option, | |
| ) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| status_text, | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| else: | |
| # ★ 兜底:final_result 为空,说明流式生成未产生 final 事件 | |
| logger.warning("[选项点击] 流式生成未产生 final 事件,使用兜底文本") | |
| if from_backpack_menu: | |
| fallback_text = "你整理了一下背包,却一时没想好先使用哪件物品。" | |
| fallback_options = _build_backpack_options(gs) | |
| else: | |
| fallback_text = "你环顾四周,思考着接下来该做什么..." | |
| fallback_options = _finalize_session_options([]) | |
| game_session["current_options"] = fallback_options | |
| full_message = fallback_text | |
| fallback_result = { | |
| "story_text": fallback_text, | |
| "options": fallback_options, | |
| "state_changes": {}, | |
| "change_log": [], | |
| "consistency_issues": [], | |
| "telemetry": { | |
| "engine_mode": "app_fallback", | |
| "used_fallback": True, | |
| "fallback_reason": "missing_final_event", | |
| }, | |
| } | |
| chat_history[-1]["content"] = full_message | |
| status_text = _format_status_panel(gs) | |
| btn_updates = _get_button_updates(fallback_options) | |
| _record_interaction_log( | |
| game_session, | |
| input_source="option_click", | |
| user_input=selected_option.get("text", ""), | |
| intent_result=option_intent, | |
| output_text=full_message, | |
| latency_ms=(perf_counter() - turn_started) * 1000, | |
| generation_latency_ms=generation_latency_ms, | |
| final_result=fallback_result, | |
| selected_option=selected_option, | |
| ) | |
| yield ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| status_text, | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| return | |
| # ============================================================ | |
| # UI 辅助函数 | |
| # ============================================================ | |
| MIN_OPTION_BUTTONS = 3 | |
| MAX_OPTION_BUTTONS = 6 | |
| # 兜底默认选项(当解析出的选项为空时使用) | |
| _FALLBACK_BUTTON_OPTIONS = [ | |
| {"id": 1, "text": "查看周围", "action_type": "EXPLORE"}, | |
| {"id": 2, "text": "等待一会", "action_type": "REST"}, | |
| {"id": 3, "text": "检查状态", "action_type": "EXPLORE"}, | |
| ] | |
| def _normalize_options( | |
| options: list[dict], | |
| *, | |
| minimum: int = 0, | |
| maximum: int = MAX_OPTION_BUTTONS, | |
| ) -> list[dict]: | |
| """ | |
| 规范化选项列表: | |
| - 至多保留 maximum 个选项 | |
| - 仅当 minimum > 0 时补充兜底项 | |
| - 始终重新编号 | |
| """ | |
| if not isinstance(options, list): | |
| options = [] | |
| normalized = [opt for opt in options if isinstance(opt, dict)][:maximum] | |
| for fb in _FALLBACK_BUTTON_OPTIONS: | |
| if len(normalized) >= minimum: | |
| break | |
| if not any(o.get("text") == fb["text"] for o in normalized): | |
| normalized.append(fb.copy()) | |
| while len(normalized) < minimum: | |
| normalized.append({ | |
| "id": len(normalized) + 1, | |
| "text": "继续探索", | |
| "action_type": "EXPLORE", | |
| }) | |
| for i, opt in enumerate(normalized[:maximum], 1): | |
| if isinstance(opt, dict): | |
| opt["id"] = i | |
| return normalized[:maximum] | |
| def _finalize_session_options(options: list[dict]) -> list[dict]: | |
| minimum = MIN_OPTION_BUTTONS if not options else 0 | |
| return _normalize_options(options, minimum=minimum) | |
| def _format_options(options: list[dict]) -> str: | |
| """将选项列表格式化为可读的文本(纯文字,绝不显示 JSON)""" | |
| if not options: | |
| return "" | |
| lines = ["---", "**你的选择:**"] | |
| for i, opt in enumerate(options): | |
| # 安全提取:兼容 dict 和异常情况 | |
| if isinstance(opt, dict): | |
| idx = opt.get("id", i + 1) | |
| text = opt.get("text", "未知选项") | |
| else: | |
| idx = i + 1 | |
| text = str(opt) | |
| lines.append(f" **[{idx}]** {text}") | |
| return "\n".join(lines) | |
| def _is_backpack_menu_active(options: list[dict]) -> bool: | |
| return any( | |
| isinstance(opt, dict) | |
| and str(opt.get("action_type", "")).upper() == "BACKPACK_EXIT" | |
| and str(opt.get("menu", "")) == "backpack" | |
| for opt in (options or []) | |
| ) | |
| def _format_item_function(item_info) -> str: | |
| if item_info is None: | |
| return "功能未知" | |
| if item_info.use_effect: | |
| return f"效果:{item_info.use_effect}" | |
| if item_info.stat_bonus: | |
| bonus_text = ",".join( | |
| f"{stat}{'+' if int(value) >= 0 else ''}{int(value)}" | |
| for stat, value in item_info.stat_bonus.items() | |
| ) | |
| return f"装备加成:{bonus_text}" | |
| if item_info.lore_text: | |
| return f"线索:{item_info.lore_text}" | |
| return "暂无可用效果" | |
| def _build_backpack_options(gs: GameState) -> list[dict]: | |
| inventory = list(gs.player.inventory) | |
| if not inventory: | |
| return [ | |
| {"id": 1, "text": "退出背包", "action_type": "BACKPACK_EXIT", "menu": "backpack"}, | |
| ] | |
| inventory_order = list(dict.fromkeys(inventory)) | |
| equip_types = {"weapon", "armor", "accessory", "helmet", "boots"} | |
| consumable_options: list[dict] = [] | |
| equip_options: list[dict] = [] | |
| for item_name in inventory_order: | |
| item_info = gs.world.item_registry.get(item_name) | |
| if item_info and gs.is_item_consumable(item_name): | |
| consumable_options.append( | |
| { | |
| "text": f"使用{item_name}", | |
| "action_type": "USE_ITEM", | |
| "target": item_name, | |
| "menu": "backpack", | |
| } | |
| ) | |
| continue | |
| if item_info and item_info.item_type in equip_types: | |
| equip_options.append( | |
| { | |
| "text": f"装备{item_name}", | |
| "action_type": "EQUIP", | |
| "target": item_name, | |
| "menu": "backpack", | |
| } | |
| ) | |
| max_action_slots = MAX_OPTION_BUTTONS - 1 | |
| merged_actions = (consumable_options + equip_options)[:max_action_slots] | |
| merged_actions.append( | |
| {"text": "退出背包", "action_type": "BACKPACK_EXIT", "menu": "backpack"} | |
| ) | |
| return _normalize_options(merged_actions, minimum=0, maximum=MAX_OPTION_BUTTONS) | |
| def _format_backpack_story(gs: GameState) -> str: | |
| inventory = list(gs.player.inventory) | |
| if not inventory: | |
| return "你打开背包,里面空空如也。" | |
| inventory_counter = Counter(inventory) | |
| inventory_order = list(dict.fromkeys(inventory)) | |
| lines = ["你打开背包,快速检查随身物资:"] | |
| for item_name in inventory_order: | |
| item_info = gs.world.item_registry.get(item_name) | |
| quantity = inventory_counter.get(item_name, 1) | |
| quantity_text = f"x{quantity} " if quantity > 1 else "" | |
| description = item_info.description if item_info else "暂无描述" | |
| function_text = _format_item_function(item_info) | |
| lines.append(f"- {quantity_text}**{item_name}**:{description}({function_text})") | |
| lines.append("\n你可以直接在下方选择“使用/装备”对应物品,或退出背包。") | |
| return "\n".join(lines) | |
| def open_backpack(chat_history: list, game_session: dict): | |
| chat_history = chat_history or [] | |
| if not game_session or not game_session.get("started"): | |
| chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"}) | |
| loading = _get_loading_button_updates() | |
| return ( | |
| chat_history, | |
| _format_world_info_panel(None), | |
| "", | |
| "", | |
| gr.update(value=None, visible=False), | |
| *loading, | |
| game_session, | |
| ) | |
| gs: GameState = game_session["game_state"] | |
| current_options = game_session.get("current_options", []) | |
| if not _is_backpack_menu_active(current_options): | |
| game_session["backpack_return_options"] = copy.deepcopy(current_options) | |
| backpack_story = _format_backpack_story(gs) | |
| backpack_options = _build_backpack_options(gs) | |
| game_session["current_options"] = backpack_options | |
| chat_history.append({"role": "user", "content": "打开背包"}) | |
| chat_history.append({"role": "assistant", "content": backpack_story}) | |
| _record_interaction_log( | |
| game_session, | |
| input_source="backpack_button", | |
| user_input="打开背包", | |
| intent_result={"intent": "OPEN_BACKPACK", "target": None}, | |
| output_text=backpack_story, | |
| latency_ms=0.0, | |
| generation_latency_ms=0.0, | |
| final_result={ | |
| "story_text": backpack_story, | |
| "options": backpack_options, | |
| "state_changes": {}, | |
| "change_log": [], | |
| "consistency_issues": [], | |
| "telemetry": { | |
| "engine_mode": "backpack_menu", | |
| "used_fallback": False, | |
| "fallback_reason": None, | |
| }, | |
| }, | |
| ) | |
| btn_updates = _get_button_updates(backpack_options) | |
| return ( | |
| chat_history, | |
| _format_world_info_panel(gs), | |
| _format_status_panel(gs), | |
| _render_text_map(gs), | |
| _get_scene_image_update(gs), | |
| *btn_updates, | |
| game_session, | |
| ) | |
| def _get_loading_button_updates(visible_count: int = MIN_OPTION_BUTTONS) -> list: | |
| """返回加载中占位按钮更新,支持最多 6 个选项槽位。""" | |
| visible_count = max(0, min(int(visible_count or 0), MAX_OPTION_BUTTONS)) | |
| updates = [] | |
| for index in range(MAX_OPTION_BUTTONS): | |
| updates.append( | |
| gr.update( | |
| value="...", | |
| visible=index < visible_count, | |
| interactive=False, | |
| ) | |
| ) | |
| return updates | |
| def _get_button_updates(options: list[dict]) -> list: | |
| """从选项列表生成按钮更新,始终返回 6 个槽位。""" | |
| options = _normalize_options(options, minimum=0) | |
| updates = [] | |
| for i in range(MAX_OPTION_BUTTONS): | |
| opt = options[i] if i < len(options) else None | |
| if isinstance(opt, dict): | |
| text = opt.get("text", "...") | |
| visible = True | |
| else: | |
| text = "..." | |
| visible = False | |
| updates.append(gr.update(value=text, visible=visible, interactive=visible)) | |
| return updates | |
| def _format_status_panel(gs: GameState) -> str: | |
| """格式化状态面板文本(双列 HTML 布局,减少滚动)""" | |
| p = gs.player | |
| w = gs.world | |
| effective_stats = gs.get_effective_player_stats() | |
| equipment_bonuses = gs.get_equipment_stat_bonuses() | |
| env_snapshot = gs.get_environment_snapshot(limit=3) | |
| survival_snapshot = gs.get_survival_state_snapshot() | |
| scene_summary = gs.get_scene_summary().replace("\n", "<br>") | |
| clock_display = gs.get_clock_display() | |
| # 属性进度条 | |
| hp_bar = _progress_bar(p.hp, p.max_hp, "HP") | |
| mp_bar = _progress_bar(p.mp, p.max_mp, "MP") | |
| stamina_bar = _progress_bar(p.stamina, p.max_stamina, "体力") | |
| hunger_bar = _progress_bar(p.hunger, 100, "饱食") | |
| sanity_bar = _progress_bar(p.sanity, 100, "理智") | |
| morale_bar = _progress_bar(p.morale, 100, "士气") | |
| # 装备 | |
| slot_names = { | |
| "weapon": "武器", "armor": "护甲", "accessory": "饰品", | |
| "helmet": "头盔", "boots": "靴子", | |
| } | |
| equip_lines = [] | |
| for slot, item in p.equipment.items(): | |
| equip_lines.append(f"{slot_names.get(slot, slot)}: {item or '无'}") | |
| equip_text = "<br>".join(equip_lines) | |
| def render_stat(stat_key: str, label: str) -> str: | |
| base_value = int(getattr(p, stat_key)) | |
| bonus_value = int(equipment_bonuses.get(stat_key, 0)) | |
| effective_value = int(effective_stats.get(stat_key, base_value)) | |
| if bonus_value > 0: | |
| return f"{label}: {effective_value} <span style='color:#4a6;'>(+{bonus_value} 装备)</span>" | |
| if bonus_value < 0: | |
| return f"{label}: {effective_value} <span style='color:#b44;'>({bonus_value} 装备)</span>" | |
| return f"{label}: {base_value}" | |
| def badge(text: str, bg: str, fg: str = "#1f2937") -> str: | |
| return ( | |
| f"<span style='display:inline-block;margin:0 6px 6px 0;padding:3px 10px;" | |
| f"border-radius:999px;background:{bg};color:{fg};font-size:0.8em;" | |
| f"font-weight:600;'>{text}</span>" | |
| ) | |
| # 状态效果 | |
| if p.status_effects: | |
| effect_lines = "<br>".join( | |
| f"{e.name}({e.duration}回合)" for e in p.status_effects | |
| ) | |
| else: | |
| effect_lines = "无" | |
| # 背包 | |
| if p.inventory: | |
| inventory_text = "<br>".join(p.inventory) | |
| else: | |
| inventory_text = "空" | |
| weather_colors = { | |
| "晴朗": "#fef3c7", | |
| "多云": "#e5e7eb", | |
| "小雨": "#dbeafe", | |
| "浓雾": "#e0e7ff", | |
| "暴风雨": "#c7d2fe", | |
| "大雪": "#f3f4f6", | |
| } | |
| light_colors = { | |
| "明亮": "#fde68a", | |
| "柔和": "#fcd34d", | |
| "昏暗": "#cbd5e1", | |
| "幽暗": "#94a3b8", | |
| "漆黑": "#334155", | |
| } | |
| danger_level = int(env_snapshot.get("danger_level", 0)) | |
| if danger_level >= 7: | |
| danger_badge = badge(f"危险 {danger_level}/10", "#fecaca", "#7f1d1d") | |
| elif danger_level >= 4: | |
| danger_badge = badge(f"危险 {danger_level}/10", "#fed7aa", "#9a3412") | |
| else: | |
| danger_badge = badge(f"危险 {danger_level}/10", "#dcfce7", "#166534") | |
| env_badges = "".join( | |
| [ | |
| badge(f"天气 {w.weather}", weather_colors.get(w.weather, "#e5e7eb")), | |
| badge( | |
| f"光照 {w.light_level}", | |
| light_colors.get(w.light_level, "#e5e7eb"), | |
| "#0f172a" if w.light_level not in {"幽暗", "漆黑"} else "#f8fafc", | |
| ), | |
| danger_badge, | |
| badge(f"场景 {env_snapshot.get('location_type', 'unknown')}", "#ede9fe", "#4c1d95"), | |
| ] | |
| ) | |
| recent_env_events = env_snapshot.get("recent_events", []) | |
| if recent_env_events: | |
| latest_event = recent_env_events[-1] | |
| latest_event_html = ( | |
| f"<div style='padding:8px 10px;border-radius:10px;background:#f8fafc;" | |
| f"border:1px solid #dbeafe;margin-bottom:6px;'>" | |
| f"<b>{latest_event.get('title', '环境事件')}</b>" | |
| f"<br><span style='font-size:0.82em;color:#475569;'>{latest_event.get('description', '')}</span>" | |
| f"</div>" | |
| ) | |
| recent_event_lines = "<br>".join( | |
| f"- {event.get('title', '环境事件')}" | |
| for event in reversed(recent_env_events[-3:]) | |
| ) | |
| else: | |
| latest_event_html = ( | |
| "<div style='padding:8px 10px;border-radius:10px;background:#f8fafc;" | |
| "border:1px dashed #cbd5e1;color:#64748b;'>本回合暂无显式环境事件</div>" | |
| ) | |
| recent_event_lines = "无" | |
| # 活跃任务(完整展示:描述、子目标、奖励、来源) | |
| active_quests = [q for q in w.quests.values() if q.status == "active"] | |
| if active_quests: | |
| quest_blocks = [] | |
| for q in active_quests: | |
| done = sum(1 for v in q.objectives.values() if v) | |
| total = len(q.objectives) | |
| tag = "主线" if q.quest_type == "main" else "支线" if q.quest_type == "side" else "🟡 " + q.quest_type | |
| # 子目标列表 | |
| obj_lines = "".join( | |
| f"<br> {'✅' if v else '⬜'} {k}" | |
| for k, v in q.objectives.items() | |
| ) | |
| # 奖励摘要 | |
| reward_parts = [] | |
| if q.rewards.gold: | |
| reward_parts.append(f"{q.rewards.gold}💰") | |
| if q.rewards.experience: | |
| reward_parts.append(f"{q.rewards.experience}经验") | |
| if q.rewards.items: | |
| reward_parts.append("、".join(q.rewards.items)) | |
| if q.rewards.unlock_skill: | |
| reward_parts.append(f"技能:{q.rewards.unlock_skill}") | |
| if q.rewards.title: | |
| reward_parts.append(f"称号:{q.rewards.title}") | |
| reward_str = " | ".join(reward_parts) if reward_parts else "无" | |
| block = ( | |
| f"<details open><summary><b>{tag} {q.title}</b>({done}/{total})</summary>" | |
| f"<span style='font-size:0.9em;color:#666;'>来源: {q.giver_npc or '未知'}</span><br>" | |
| f"<span style='font-size:0.9em;'>{q.description}</span>" | |
| f"{obj_lines}" | |
| f"<br><span style='font-size:0.9em;color:#2f7a4a;'>奖励: {reward_str}</span>" | |
| f"</details>" | |
| ) | |
| quest_blocks.append(block) | |
| quest_text = "".join(quest_blocks) | |
| else: | |
| quest_text = "无活跃任务" | |
| # 使用 HTML 双列布局 | |
| status = f"""<div style="font-size:0.9em;"> | |
| <h3 style="margin:0 0 4px 0;text-align:center;">{p.name} — {p.title}</h3> | |
| <p style="text-align:center;margin:2px 0 6px 0;">等级 {p.level} | 经验 {p.experience}/{p.exp_to_next_level}</p> | |
| <div style="display:grid;grid-template-columns:1fr 1fr;gap:6px 12px;"> | |
| <div> | |
| <h4 style="margin:4px 0 2px 0;">🩸 生命与状态</h4> | |
| <span style="font-size:0.85em;"> | |
| {hp_bar}<br> | |
| {mp_bar}<br> | |
| {stamina_bar}<br> | |
| {hunger_bar}<br> | |
| {sanity_bar}<br> | |
| {morale_bar} | |
| </span> | |
| </div> | |
| <div> | |
| <h4 style="margin:4px 0 2px 0;">🎒 背包</h4> | |
| <span style="font-size:0.85em;"> | |
| {inventory_text} | |
| </span> | |
| </div> | |
| <div> | |
| <h4 style="margin:4px 0 2px 0;">⚔️ 战斗属性</h4> | |
| <span style="font-size:0.85em;"> | |
| {render_stat("attack", "攻击")}<br> | |
| {render_stat("defense", "防御")}<br> | |
| {render_stat("speed", "速度")}<br> | |
| {render_stat("luck", "幸运")}<br> | |
| {render_stat("perception", "感知")} | |
| </span> | |
| </div> | |
| <div> | |
| <h4 style="margin:4px 0 2px 0;">🛡️ 装备</h4> | |
| <span style="font-size:0.85em;"> | |
| {equip_text} | |
| </span> | |
| </div> | |
| <div> | |
| <h4 style="margin:4px 0 2px 0;">💰 资源</h4> | |
| <span style="font-size:0.85em;"> | |
| 金币: {p.gold}<br> | |
| 善恶值: {p.karma} | |
| </span> | |
| </div> | |
| <div> | |
| <h4 style="margin:4px 0 2px 0;">✨ 状态效果</h4> | |
| <span style="font-size:0.85em;"> | |
| {effect_lines} | |
| </span> | |
| </div> | |
| <div style="grid-column: 1 / -1;"> | |
| <h4 style="margin:4px 0 2px 0;">📜 任务</h4> | |
| <span style="font-size:0.9em;line-height:1.55;"> | |
| {quest_text} | |
| </span> | |
| </div> | |
| <div style="grid-column: 1 / -1;"> | |
| <h4 style="margin:4px 0 2px 0;">🧭 当前场景信息</h4> | |
| <div style="font-size:0.85em;line-height:1.5;padding:8px 10px;border-radius:12px;background:#fff7ed;border:1px solid #fed7aa;"> | |
| {env_badges} | |
| <div style="margin:6px 0 8px 0;color:#475569;"> | |
| 时间 {clock_display} | 场景 {w.current_scene} | 状态系数 {survival_snapshot.get('combined_multiplier', 1.0)} | |
| </div> | |
| {latest_event_html} | |
| <div style="margin-top:8px;">{scene_summary}</div> | |
| <div style="margin-top:6px;color:#475569;">最近环境事件: {recent_event_lines}</div> | |
| </div> | |
| </div> | |
| </div> | |
| </div>""" | |
| return status | |
| def _format_world_info_panel(gs: GameState | None) -> str: | |
| """格式化世界信息面板(放在故事框上方)。""" | |
| if gs is None: | |
| return "🌍 **世界信息**:未开始冒险" | |
| w = gs.world | |
| if hasattr(gs, "get_clock_display"): | |
| clock_display = gs.get_clock_display() | |
| else: | |
| minute_of_day = int(getattr(w, "time_progress_units", 0)) * 10 % (24 * 60) | |
| clock_display = f"{minute_of_day // 60:02d}:{minute_of_day % 60:02d}" | |
| current_scene = getattr(w, "current_scene", "未知地点") | |
| day_count = getattr(w, "day_count", 1) | |
| time_of_day = getattr(w, "time_of_day", "未知时段") | |
| weather = getattr(w, "weather", "未知") | |
| light_level = getattr(w, "light_level", "未知") | |
| season = getattr(w, "season", "未知") | |
| return ( | |
| "🌍 **世界信息**:" | |
| f"位置 {current_scene} | " | |
| f"第{day_count}天 {time_of_day}({clock_display}) | " | |
| f"天气 {weather} | 光照 {light_level} | " | |
| f"季节 {season} | 回合 {getattr(gs, 'turn', 0)}" | |
| ) | |
| def _progress_bar(current: int, maximum: int, label: str, length: int = 10) -> str: | |
| """生成 HTML 进度条(数值单独一行,低于 40% 高亮红色)""" | |
| ratio = current / maximum if maximum > 0 else 0 | |
| filled = int(ratio * length) | |
| empty = length - filled | |
| bar = "█" * filled + "░" * empty | |
| value_color = "#b91c1c" if ratio < 0.4 else "#0f172a" | |
| return ( | |
| f"{label}: <span style='font-family:monospace;'>{bar}</span>" | |
| f"<br><span style='color:{value_color};font-weight:600;'>{current}/{maximum}</span>" | |
| ) | |
| # ============================================================ | |
| # Gradio 界面构建 | |
| # ============================================================ | |
| def build_app() -> gr.Blocks: | |
| """构建 Gradio 界面""" | |
| with gr.Blocks( | |
| title="StoryWeaver - 交互式叙事系统", | |
| ) as app: | |
| app.css = APP_UI_CSS | |
| gr.Markdown( | |
| """ | |
| # StoryWeaver — 交互式叙事系统 | |
| *基于 AI 的动态分支剧情 RPG 体验* | |
| """ | |
| ) | |
| # 游戏会话状态(Gradio State) | |
| game_session = gr.State(value={}) | |
| with gr.Row(): | |
| # ================== | |
| # 左侧:聊天区域 | |
| # ================== | |
| with gr.Column(scale=10): | |
| # 玩家姓名输入 + 开始按钮 | |
| with gr.Row(): | |
| player_name_input = gr.Textbox( | |
| label="角色名称", | |
| placeholder="输入你的角色名称(默认: 旅人)", | |
| value="旅人", | |
| scale=3, | |
| ) | |
| start_btn = gr.Button( | |
| "开始冒险", | |
| variant="primary", | |
| scale=2, | |
| ) | |
| restart_btn = gr.Button( | |
| "重启冒险", | |
| variant="stop", | |
| scale=2, | |
| ) | |
| world_info_panel = gr.Markdown( | |
| value=_format_world_info_panel(None), | |
| ) | |
| # 聊天窗口 | |
| chatbot = gr.Chatbot( | |
| label="故事", | |
| height=480, | |
| ) | |
| location_map_panel = gr.Markdown( | |
| elem_classes=["scene-card", "status-panel"], | |
| value="地图关系图\n(未开始)", | |
| label="地图", | |
| ) | |
| # 选项按钮(最多 6 个,分两行显示) | |
| with gr.Column(): | |
| with gr.Row(): | |
| option_btn_1 = gr.Button( | |
| "...", | |
| visible=True, | |
| interactive=False, | |
| elem_classes=["option-btn"], | |
| ) | |
| option_btn_2 = gr.Button( | |
| "...", | |
| visible=True, | |
| interactive=False, | |
| elem_classes=["option-btn"], | |
| ) | |
| option_btn_3 = gr.Button( | |
| "...", | |
| visible=True, | |
| interactive=False, | |
| elem_classes=["option-btn"], | |
| ) | |
| with gr.Row(): | |
| option_btn_4 = gr.Button( | |
| "...", | |
| visible=False, | |
| interactive=False, | |
| elem_classes=["option-btn"], | |
| ) | |
| option_btn_5 = gr.Button( | |
| "...", | |
| visible=False, | |
| interactive=False, | |
| elem_classes=["option-btn"], | |
| ) | |
| option_btn_6 = gr.Button( | |
| "...", | |
| visible=False, | |
| interactive=False, | |
| elem_classes=["option-btn"], | |
| ) | |
| option_buttons = [ | |
| option_btn_1, | |
| option_btn_2, | |
| option_btn_3, | |
| option_btn_4, | |
| option_btn_5, | |
| option_btn_6, | |
| ] | |
| # 自由输入 | |
| with gr.Row(): | |
| user_input = gr.Textbox( | |
| label="自由输入(也可以直接点击上方选项)", | |
| placeholder="输入你想做的事情,例如:和村长说话、攻击哥布林、搜索这个区域...", | |
| scale=5, | |
| interactive=False, | |
| ) | |
| with gr.Column(scale=1): | |
| send_btn = gr.Button("发送", variant="primary", elem_classes=["side-action-btn"]) | |
| open_backpack_btn = gr.Button( | |
| "打开背包", | |
| variant="secondary", | |
| elem_classes=["side-action-btn", "backpack-btn"], | |
| ) | |
| # ================== | |
| # 右侧:状态面板 | |
| # ================== | |
| with gr.Column(scale=2, min_width=320, elem_classes=["scene-sidebar"]): | |
| scene_image = gr.Image( | |
| value=None, | |
| type="filepath", | |
| label="场景画面", | |
| show_label=False, | |
| container=False, | |
| interactive=False, | |
| height=260, | |
| visible=False, | |
| elem_classes=["scene-card", "scene-image"], | |
| ) | |
| status_panel = gr.Markdown( | |
| elem_classes=["scene-card", "status-panel"], | |
| value="## 等待开始...\n\n请输入角色名称并点击「开始冒险」", | |
| label="角色状态", | |
| ) | |
| # ============================================================ | |
| # 事件绑定 | |
| # ============================================================ | |
| # 开始游戏 | |
| start_btn.click( | |
| fn=start_game, | |
| inputs=[player_name_input, game_session], | |
| outputs=[ | |
| chatbot, world_info_panel, status_panel, location_map_panel, scene_image, | |
| *option_buttons, | |
| game_session, user_input, | |
| ], | |
| ) | |
| # 重启冒险 | |
| restart_btn.click( | |
| fn=restart_game, | |
| inputs=[], | |
| outputs=[ | |
| chatbot, world_info_panel, status_panel, location_map_panel, scene_image, | |
| *option_buttons, | |
| game_session, user_input, player_name_input, | |
| ], | |
| ) | |
| # 文本输入发送 | |
| send_btn.click( | |
| fn=process_user_input, | |
| inputs=[user_input, chatbot, game_session], | |
| outputs=[ | |
| chatbot, world_info_panel, status_panel, location_map_panel, scene_image, | |
| *option_buttons, | |
| game_session, | |
| ], | |
| ).then( | |
| fn=lambda: "", | |
| outputs=[user_input], | |
| ) | |
| # 打开背包(常驻按钮) | |
| open_backpack_btn.click( | |
| fn=open_backpack, | |
| inputs=[chatbot, game_session], | |
| outputs=[ | |
| chatbot, world_info_panel, status_panel, location_map_panel, scene_image, | |
| *option_buttons, | |
| game_session, | |
| ], | |
| ) | |
| # 回车发送 | |
| user_input.submit( | |
| fn=process_user_input, | |
| inputs=[user_input, chatbot, game_session], | |
| outputs=[ | |
| chatbot, world_info_panel, status_panel, location_map_panel, scene_image, | |
| *option_buttons, | |
| game_session, | |
| ], | |
| ).then( | |
| fn=lambda: "", | |
| outputs=[user_input], | |
| ) | |
| # 选项按钮点击(需要使用 yield from 的生成器包装函数, | |
| # 使 Gradio 能正确识别为流式输出) | |
| def _make_option_click_handler(index: int): | |
| def _handler(ch, gs): | |
| yield from process_option_click(index, ch, gs) | |
| return _handler | |
| for index, option_button in enumerate(option_buttons): | |
| option_button.click( | |
| fn=_make_option_click_handler(index), | |
| inputs=[chatbot, game_session], | |
| outputs=[ | |
| chatbot, world_info_panel, status_panel, location_map_panel, scene_image, | |
| *option_buttons, | |
| game_session, | |
| ], | |
| ) | |
| return app | |
| # ============================================================ | |
| # 启动入口 | |
| # ============================================================ | |
| if __name__ == "__main__": | |
| logger.info("启动 StoryWeaver 交互式叙事系统...") | |
| app = build_app() | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| theme=gr.themes.Soft( | |
| primary_hue="emerald", | |
| secondary_hue="blue", | |
| ), | |
| css=APP_UI_CSS, | |
| allowed_paths=[str(IMAGE_DIR)], | |
| ) | |