""" app.py - StoryWeaver Gradio 交互界面 职责: 1. 构建游戏的 Web 前端界面 (Gradio) 2. 串联 NLU 引擎、叙事引擎、状态管理器 3. 管理用户交互流程(文本输入 + 选项点击) 4. 展示游戏状态(HP、背包、任务等) 数据流转: 用户输入 → NLU 引擎(意图识别) → 叙事引擎(两阶段生成) ↕ ↕ Gradio UI ← 状态管理器(校验 + 更新) ← 叙事引擎(文本 + 选项) """ import copy from collections import Counter import html import json import logging from time import perf_counter import gradio as gr from state_manager import GameState from nlu_engine import NLUEngine from scene_assets import get_scene_image_path, IMAGE_DIR from story_engine import StoryEngine from telemetry import append_turn_log, create_session_metadata from utils import logger APP_UI_CSS = """ .story-chat {min-height: 500px;} .status-panel { font-family: "Microsoft YaHei UI", "Noto Sans SC", sans-serif; font-size: 0.9em; line-height: 1.5; background: transparent !important; border: none !important; border-radius: 0 !important; padding: 10px 12px !important; box-shadow: none !important; overflow: visible !important; } .status-panel > div, .status-panel [class*="prose"], .status-panel .markdown-body, .status-panel [class*="wrap"] { background: transparent !important; border: none !important; box-shadow: none !important; padding: 0 !important; overflow: visible !important; } .status-panel * { word-break: break-word; overflow-wrap: anywhere; } .option-btn {min-height: 50px !important;} .side-action-btn, .side-action-btn button {min-height: 50px !important;} .backpack-btn button { min-height: 50px !important; background: #ffffff !important; color: #0f172a !important; border: 1px solid #d1d5db !important; } .scene-sidebar {gap: 12px;} .scene-card { border: 1px solid #e5e7eb !important; border-radius: 12px !important; background: #fcfcfd !important; box-shadow: 0 4px 14px rgba(15, 23, 42, 0.04) !important; } .scene-image { min-height: 260px; padding: 10px !important; } .scene-image > div, .scene-image img, .scene-image button, .scene-image [class*="image"], .scene-image [class*="wrap"], .scene-image [class*="frame"], .scene-image [class*="preview"] { border: none !important; box-shadow: none !important; background: transparent !important; } .scene-image img { width: 100%; height: 100%; object-fit: contain !important; border-radius: 10px; padding: 4px; background: #ffffff !important; } """ # ============================================================ # 全局游戏实例(每个会话独立) # ============================================================ # 使用 Gradio State 管理每个用户的游戏状态 # 这里先定义工厂函数 def create_new_game(player_name: str = "旅人") -> dict: """创建新游戏实例,返回包含所有引擎的字典""" game_state = GameState(player_name=player_name) nlu = NLUEngine(game_state) story = StoryEngine(game_state, enable_rule_text_polish=True) return { "game_state": game_state, "nlu": nlu, "story": story, "current_options": [], "started": False, **create_session_metadata(), } def _json_safe(value): """Convert nested values into JSON-serializable data for logs.""" if value is None or isinstance(value, (str, int, float, bool)): return value if isinstance(value, dict): return {str(key): _json_safe(val) for key, val in value.items()} if isinstance(value, (list, tuple, set)): return [_json_safe(item) for item in value] if hasattr(value, "model_dump"): return _json_safe(value.model_dump()) return str(value) def _build_state_snapshot(gs: GameState) -> dict: """Build a compact state snapshot for reproducible evaluation logs.""" active_quests = [] effective_stats = gs.get_effective_player_stats() equipment_bonuses = gs.get_equipment_stat_bonuses() environment_snapshot = gs.get_environment_snapshot(limit=3) for quest in gs.world.quests.values(): if quest.status == "active": active_quests.append( { "quest_id": quest.quest_id, "title": quest.title, "status": quest.status, "objectives": _json_safe(quest.objectives), } ) return { "turn": gs.turn, "game_mode": gs.game_mode, "location": gs.player.location, "scene": gs.world.current_scene, "day": gs.world.day_count, "time_of_day": gs.world.time_of_day, "weather": gs.world.weather, "light_level": gs.world.light_level, "environment": _json_safe(environment_snapshot), "player": { "name": gs.player.name, "level": gs.player.level, "hp": gs.player.hp, "max_hp": gs.player.max_hp, "mp": gs.player.mp, "max_mp": gs.player.max_mp, "attack": gs.player.attack, "defense": gs.player.defense, "speed": gs.player.speed, "luck": gs.player.luck, "perception": gs.player.perception, "gold": gs.player.gold, "morale": gs.player.morale, "sanity": gs.player.sanity, "hunger": gs.player.hunger, "karma": gs.player.karma, "effective_stats": _json_safe(effective_stats), "equipment_bonuses": _json_safe(equipment_bonuses), "inventory": list(gs.player.inventory), "equipment": copy.deepcopy(gs.player.equipment), "skills": list(gs.player.skills), "status_effects": [effect.name for effect in gs.player.status_effects], }, "active_quests": active_quests, "event_log_size": len(gs.event_log), } def _record_interaction_log( game_session: dict, *, input_source: str, user_input: str, intent_result: dict | None, output_text: str, latency_ms: float, nlu_latency_ms: float | None = None, generation_latency_ms: float | None = None, final_result: dict | None = None, selected_option: dict | None = None, ): """Append a structured interaction log without affecting gameplay.""" if not game_session or "game_state" not in game_session: return final_result = final_result or {} telemetry = _json_safe(final_result.get("telemetry", {})) or {} record = { "input_source": input_source, "user_input": user_input, "selected_option": _json_safe(selected_option), "nlu_result": _json_safe(intent_result), "latency_ms": round(latency_ms, 2), "nlu_latency_ms": None if nlu_latency_ms is None else round(nlu_latency_ms, 2), "generation_latency_ms": None if generation_latency_ms is None else round(generation_latency_ms, 2), "used_fallback": bool(telemetry.get("used_fallback", False)), "fallback_reason": telemetry.get("fallback_reason"), "engine_mode": telemetry.get("engine_mode"), "state_changes": _json_safe(final_result.get("state_changes", {})), "change_log": _json_safe(final_result.get("change_log", [])), "consistency_issues": _json_safe(final_result.get("consistency_issues", [])), "output_text": output_text, "story_text": final_result.get("story_text"), "options": _json_safe(final_result.get("options", game_session.get("current_options", []))), "post_turn_snapshot": _build_state_snapshot(game_session["game_state"]), } try: append_turn_log(game_session, record) except Exception as exc: logger.warning(f"Failed to append interaction log: {exc}") def _build_option_intent(selected_option: dict) -> dict: """Represent button clicks in the same schema as free-text NLU output.""" option_text = selected_option.get("text", "") return { "intent": selected_option.get("action_type", "EXPLORE"), "target": selected_option.get("target"), "details": option_text, "raw_input": option_text, "parser_source": "option_click", } def _get_scene_image_value(gs: GameState) -> str | None: focus_npc = getattr(gs, "last_interacted_npc", None) return get_scene_image_path(gs, focus_npc=focus_npc) def _get_scene_image_update(gs: GameState): image_value = _get_scene_image_value(gs) return gr.update(value=image_value, visible=bool(image_value)) def _build_map_graph_data(gs: GameState) -> dict: """基于已发现地点与连接关系构建地图拓扑数据。""" world_locations = getattr(getattr(gs, "world", None), "locations", {}) or {} discovered = list(getattr(getattr(gs, "world", None), "discovered_locations", []) or []) history = list(getattr(gs, "location_history", []) or []) current_location = str(getattr(gs, "current_location", None) or "").strip() if not current_location: current_location = str(getattr(getattr(gs, "player", None), "location", None) or "未知之地") visible_set: set[str] = set(discovered) | set(history) if current_location: visible_set.add(current_location) # 使用世界注册顺序保证地图输出稳定,便于玩家快速扫描。 ordered_nodes: list[str] = [name for name in world_locations.keys() if name in visible_set] for name in discovered + history + [current_location]: if name and name in visible_set and name not in ordered_nodes: ordered_nodes.append(name) visited_set = set(history) if current_location: visited_set.add(current_location) adjacency: dict[str, list[str]] = {} for node in ordered_nodes: loc_info = world_locations.get(node) if not loc_info: adjacency[node] = [] continue neighbors = [] for neighbor in list(getattr(loc_info, "connected_to", []) or []): if neighbor in visible_set and neighbor != node: neighbors.append(neighbor) adjacency[node] = neighbors node_state: dict[str, str] = {} for node in ordered_nodes: if node == current_location: node_state[node] = "current" elif node in visited_set: node_state[node] = "visited" else: node_state[node] = "known" return { "current_location": current_location, "nodes": ordered_nodes, "adjacency": adjacency, "node_state": node_state, } def _build_location_hover_text(gs: GameState, location_name: str) -> str: """构造地点 hover 提示:展示 NPC 与怪物。""" world = getattr(gs, "world", None) locations = getattr(world, "locations", {}) or {} npcs = getattr(world, "npcs", {}) or {} loc = locations.get(location_name) if not loc: return f"{location_name}\nNPC: 无\n怪物: 无" npc_names: set[str] = set(getattr(loc, "npcs_present", []) or []) for npc in npcs.values(): if getattr(npc, "location", None) == location_name and getattr(npc, "is_alive", True): npc_names.add(getattr(npc, "name", "")) npc_names = {name for name in npc_names if name} enemy_names = [str(name) for name in list(getattr(loc, "enemies", []) or []) if str(name)] npc_text = "、".join(sorted(npc_names)) if npc_names else "无" enemy_text = "、".join(enemy_names) if enemy_names else "无" return f"{location_name}\nNPC: {npc_text}\n怪物: {enemy_text}" def _truncate_map_label(name: str, max_len: int = 8) -> str: text = str(name or "") return text if len(text) <= max_len else f"{text[:max_len]}..." def _build_fixed_branch_layout(nodes: list[str], adjacency: dict[str, list[str]]) -> dict[str, tuple[int, int]]: """固定起点的分层布局:保持总体顺序稳定,同时展示分支。""" if not nodes: return {} node_set = set(nodes) layers: dict[str, int] = {} def _bfs(seed: str, base_layer: int) -> None: if seed not in node_set or seed in layers: return queue: list[str] = [seed] layers[seed] = base_layer cursor = 0 while cursor < len(queue): node = queue[cursor] cursor += 1 next_layer = layers[node] + 1 for nxt in adjacency.get(node, []): if nxt in node_set and nxt not in layers: layers[nxt] = next_layer queue.append(nxt) # 第一出现地点作为固定起点,避免因当前位置变化而重排。 _bfs(nodes[0], 0) for name in nodes: if name not in layers: base = (max(layers.values()) + 1) if layers else 0 _bfs(name, base) level_nodes: dict[int, list[str]] = {} for name in nodes: level = layers.get(name, 0) level_nodes.setdefault(level, []).append(name) positions: dict[str, tuple[int, int]] = {} for col_idx, level in enumerate(sorted(level_nodes.keys())): for row_idx, name in enumerate(level_nodes[level]): positions[name] = (col_idx, row_idx) return positions def _render_text_map(gs: GameState | None) -> str: """拓扑地图:从左到右显示地点关系图。""" if gs is None: return "地图关系图\n(未开始)" graph = _build_map_graph_data(gs) nodes = graph["nodes"] adjacency = graph["adjacency"] node_state = graph["node_state"] current_location = graph["current_location"] if not nodes: current = current_location or "未知之地" return f"地图关系图\n当前位置:{current}" positions = _build_fixed_branch_layout(nodes, adjacency) if not positions: return "地图关系图\n(暂无可显示节点)" node_width = 110 node_height = 34 col_gap = 140 row_gap = 50 x_margin = 16 y_margin = 16 max_col = max(col for col, _ in positions.values()) max_row = max(row for _, row in positions.values()) canvas_width = x_margin * 2 + max_col * col_gap + node_width canvas_height = y_margin * 2 + max_row * row_gap + node_height canvas_height = max(canvas_height, 74) node_boxes: dict[str, tuple[int, int, int, int]] = {} centers: dict[str, tuple[int, int]] = {} for name, (col, row) in positions.items(): x = x_margin + col * col_gap y = y_margin + row * row_gap node_boxes[name] = (x, y, node_width, node_height) centers[name] = (x + node_width // 2, y + node_height // 2) edge_pairs: set[tuple[str, str]] = set() for source, neighbors in adjacency.items(): for target in neighbors: if source in positions and target in positions and source != target: edge_pairs.add(tuple(sorted((source, target)))) edge_svg: list[str] = [] def _segment_hits_box_horizontal(y: float, x_start: float, x_end: float, box: tuple[int, int, int, int]) -> bool: bx, by, bw, bh = box left = min(x_start, x_end) right = max(x_start, x_end) return (by + 1) <= y <= (by + bh - 1) and not (right <= bx + 1 or left >= bx + bw - 1) def _segment_hits_box_vertical(x: float, y_start: float, y_end: float, box: tuple[int, int, int, int]) -> bool: bx, by, bw, bh = box top = min(y_start, y_end) bottom = max(y_start, y_end) return (bx + 1) <= x <= (bx + bw - 1) and not (bottom <= by + 1 or top >= by + bh - 1) for source, target in sorted(edge_pairs): sx, sy, sw, sh = node_boxes[source] tx, ty, tw, th = node_boxes[target] source_exit_x = sx + sw source_exit_y = sy + sh / 2 target_entry_x = tx target_entry_y = ty + th / 2 mid_x = (source_exit_x + target_entry_x) / 2 needs_detour = False for name, box in node_boxes.items(): if name in {source, target}: continue if ( _segment_hits_box_horizontal(source_exit_y, source_exit_x, mid_x, box) or _segment_hits_box_vertical(mid_x, source_exit_y, target_entry_y, box) or _segment_hits_box_horizontal(target_entry_y, mid_x, target_entry_x, box) ): needs_detour = True break if not needs_detour: points = ( f"{source_exit_x},{source_exit_y} " f"{mid_x},{source_exit_y} " f"{mid_x},{target_entry_y} " f"{target_entry_x},{target_entry_y}" ) else: # 局部上绕:仅在必要时走上方,减少杂乱感同时避免压到节点。 route_y = max(4, min(source_exit_y, target_entry_y) - node_height / 2 - 10) route_left_x = source_exit_x + 6 route_right_x = target_entry_x - 6 points = ( f"{source_exit_x},{source_exit_y} " f"{route_left_x},{source_exit_y} " f"{route_left_x},{route_y} " f"{route_right_x},{route_y} " f"{route_right_x},{target_entry_y} " f"{target_entry_x},{target_entry_y}" ) edge_svg.append( f"" ) node_svg: list[str] = [] for name in nodes: col, row = positions[name] x = x_margin + col * col_gap y = y_margin + row * row_gap state = node_state.get(name, "known") escaped_name = html.escape(_truncate_map_label(name)) hover_text = html.escape(_build_location_hover_text(gs, name)) if state == "current": fill = "#fff7ed" stroke = "#f97316" text_color = "#9a3412" stroke_width = 1.8 display_name = escaped_name elif state == "visited": fill = "#f1f5f9" stroke = "#64748b" text_color = "#334155" stroke_width = 1.4 display_name = escaped_name else: fill = "#ffffff" stroke = "#cbd5e1" text_color = "#334155" stroke_width = 1.2 display_name = escaped_name rect_class_attr = " class='map-current-node'" if state == "current" else "" node_svg.append( "" f"{hover_text}" f"" f"{display_name}" "" ) svg = ( f"" "" + "".join(edge_svg) + "".join(node_svg) + "" ) return ( "
" "
" "展开地图关系图" "
" "鼠标悬停于地点格可查看NPC与怪物。" "
" "
" + svg + "
" "
" "
" ) def restart_game() -> tuple: """ 重启冒险:清空所有数据,回到初始输入名称阶段。 Returns: (空聊天历史, 初始状态面板, 隐藏选项按钮×3, 空游戏会话, 禁用文本输入框, 重置角色名称) """ loading = _get_loading_button_updates() return ( [], # 清空聊天历史 _format_world_info_panel(None), # 重置世界信息 "## 等待开始...\n\n请输入角色名称并点击「开始冒险」", # 重置状态面板 "地图关系图\n(未开始)", # 清空地图 gr.update(value=None, visible=False), # 清空场景图片 *loading, # 占位选项按钮 {}, # 清空游戏会话 gr.update(value="", interactive=False), # 禁用并清空文本输入 gr.update(value="旅人"), # 重置角色名称 ) # ============================================================ # 核心交互函数 # ============================================================ def start_game(player_name: str, game_session: dict): """ 开始新游戏:流式生成开场叙事。 使用生成器 yield 实现流式输出,让用户看到文字逐步出现。 """ if not player_name.strip(): player_name = "旅人" # 创建新游戏 game_session = create_new_game(player_name) game_session["started"] = True # 初始 yield:显示加载状态,按钮保持可见但禁用 chat_history = [{"role": "assistant", "content": "⏳ 正在生成开场..."}] world_info_text = _format_world_info_panel(game_session["game_state"]) status_text = _format_status_panel(game_session["game_state"]) loading = _get_loading_button_updates() yield ( chat_history, world_info_text, status_text, _render_text_map(game_session["game_state"]), _get_scene_image_update(game_session["game_state"]), *loading, game_session, gr.update(interactive=False), ) # 流式生成开场(选项仅在流结束后从 final 事件中提取,流式期间不解析选项) turn_started = perf_counter() story_text = "" final_result = None for update in game_session["story"].generate_opening_stream(): if update["type"] == "story_chunk": story_text = update["text"] chat_history[-1]["content"] = story_text yield ( chat_history, world_info_text, status_text, _render_text_map(game_session["game_state"]), _get_scene_image_update(game_session["game_state"]), *loading, game_session, gr.update(interactive=False), ) elif update["type"] == "final": final_result = update generation_latency_ms = (perf_counter() - turn_started) * 1000 # ★ 只在数据流完全结束后,从 final_result 中提取选项 if final_result: story_text = final_result.get("story_text", story_text) options = final_result.get("options", []) else: options = [] # ★ 安全兜底:强制确保恰好 3 个选项 options = _finalize_session_options(options) # 最终 yield:显示完整文本 + 选项 + 启用按钮 game_session["current_options"] = options full_message = story_text if not final_result: final_result = { "story_text": story_text, "options": options, "state_changes": {}, "change_log": [], "consistency_issues": [], "telemetry": { "engine_mode": "opening_app", "used_fallback": True, "fallback_reason": "missing_final_event", }, } chat_history[-1]["content"] = full_message world_info_text = _format_world_info_panel(game_session["game_state"]) status_text = _format_status_panel(game_session["game_state"]) btn_updates = _get_button_updates(options) _record_interaction_log( game_session, input_source="system_opening", user_input="", intent_result=None, output_text=full_message, latency_ms=generation_latency_ms, generation_latency_ms=generation_latency_ms, final_result=final_result, ) yield ( chat_history, world_info_text, status_text, _render_text_map(game_session["game_state"]), _get_scene_image_update(game_session["game_state"]), *btn_updates, game_session, gr.update(interactive=True), ) def process_user_input(user_input: str, chat_history: list, game_session: dict): """ 处理用户文本输入(流式版本)。 流程: 1. NLU 引擎解析意图 2. 叙事引擎流式生成故事 3. 逐步更新 UI """ if not game_session or not game_session.get("started"): chat_history = chat_history or [] chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"}) loading = _get_loading_button_updates() yield ( chat_history, _format_world_info_panel(None), "", "", gr.update(value=None, visible=False), *loading, game_session, ) return if not user_input.strip(): btn_updates = _get_button_updates(game_session.get("current_options", [])) yield ( chat_history, _format_world_info_panel(game_session["game_state"]), _format_status_panel(game_session["game_state"]), _render_text_map(game_session["game_state"]), _get_scene_image_update(game_session["game_state"]), *btn_updates, game_session, ) return gs: GameState = game_session["game_state"] nlu: NLUEngine = game_session["nlu"] story: StoryEngine = game_session["story"] turn_started = perf_counter() # 检查游戏是否已结束 if gs.is_game_over(): chat_history.append({"role": "user", "content": user_input}) chat_history.append({"role": "assistant", "content": "游戏已结束。请点击「重新开始」按钮开始新的冒险。"}) restart_buttons = _get_button_updates( [ {"id": 1, "text": "重新开始", "action_type": "RESTART"}, ] ) yield ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *restart_buttons, game_session, ) return # 1. NLU 解析 nlu_started = perf_counter() intent = nlu.parse_intent(user_input) nlu_latency_ms = (perf_counter() - nlu_started) * 1000 # 1.5 预校验:立即驳回违反一致性的操作(不调用 LLM,不消耗回合) is_valid, rejection_msg = gs.pre_validate_action(intent) if not is_valid: chat_history.append({"role": "user", "content": user_input}) options = game_session.get("current_options", []) options = _finalize_session_options(options) rejection_content = ( f"⚠️ **行动被驳回**:{rejection_msg}\n\n" f"请重新选择行动,或输入其他指令。" ) chat_history.append({"role": "assistant", "content": rejection_content}) rejection_result = { "story_text": rejection_content, "options": options, "state_changes": {}, "change_log": [], "consistency_issues": [], "telemetry": { "engine_mode": "pre_validation", "used_fallback": False, "fallback_reason": None, }, } _record_interaction_log( game_session, input_source="text_input", user_input=user_input, intent_result=intent, output_text=rejection_content, latency_ms=(perf_counter() - turn_started) * 1000, nlu_latency_ms=nlu_latency_ms, generation_latency_ms=0.0, final_result=rejection_result, ) btn_updates = _get_button_updates(options) yield ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *btn_updates, game_session, ) return # 2. 添加用户消息 + 空的 assistant 消息(用于流式填充) chat_history.append({"role": "user", "content": user_input}) chat_history.append({"role": "assistant", "content": "⏳ 正在生成..."}) # 按钮保持可见但禁用,防止流式期间点击 loading = _get_loading_button_updates( max(len(game_session.get("current_options", [])), MIN_OPTION_BUTTONS) ) yield ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *loading, game_session, ) # 3. 流式生成故事 generation_started = perf_counter() final_result = None for update in story.generate_story_stream(intent): if update["type"] == "story_chunk": chat_history[-1]["content"] = update["text"] yield ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *loading, game_session, ) elif update["type"] == "final": final_result = update generation_latency_ms = (perf_counter() - generation_started) * 1000 # 4. 最终更新:完整文本 + 状态变化 + 选项 + 按钮 if final_result: # ★ 安全兜底:强制确保恰好 3 个选项 options = _finalize_session_options(final_result.get("options", [])) game_session["current_options"] = options change_log = final_result.get("change_log", []) log_text = "" if change_log: log_text = "\n".join(f" {c}" for c in change_log) log_text = f"\n\n**状态变化:**\n{log_text}" issues = final_result.get("consistency_issues", []) issues_text = "" if issues: issues_text = "\n".join(f" {i}" for i in issues) issues_text = f"\n\n**一致性提示:**\n{issues_text}" full_message = f"{final_result['story_text']}{log_text}{issues_text}" chat_history[-1]["content"] = full_message status_text = _format_status_panel(gs) btn_updates = _get_button_updates(options) _record_interaction_log( game_session, input_source="text_input", user_input=user_input, intent_result=intent, output_text=full_message, latency_ms=(perf_counter() - turn_started) * 1000, nlu_latency_ms=nlu_latency_ms, generation_latency_ms=generation_latency_ms, final_result=final_result, ) yield ( chat_history, _format_world_info_panel(gs), status_text, _render_text_map(gs), _get_scene_image_update(gs), *btn_updates, game_session, ) else: # ★ 兜底:final_result 为空,说明流式生成未产生 final 事件 logger.warning("流式生成未产生 final 事件,使用兜底文本") fallback_text = "你环顾四周,思考着接下来该做什么..." fallback_options = _finalize_session_options([]) game_session["current_options"] = fallback_options full_message = fallback_text fallback_result = { "story_text": fallback_text, "options": fallback_options, "state_changes": {}, "change_log": [], "consistency_issues": [], "telemetry": { "engine_mode": "app_fallback", "used_fallback": True, "fallback_reason": "missing_final_event", }, } chat_history[-1]["content"] = full_message status_text = _format_status_panel(gs) btn_updates = _get_button_updates(fallback_options) _record_interaction_log( game_session, input_source="text_input", user_input=user_input, intent_result=intent, output_text=full_message, latency_ms=(perf_counter() - turn_started) * 1000, nlu_latency_ms=nlu_latency_ms, generation_latency_ms=generation_latency_ms, final_result=fallback_result, ) yield ( chat_history, _format_world_info_panel(gs), status_text, _render_text_map(gs), _get_scene_image_update(gs), *btn_updates, game_session, ) return def process_option_click(option_idx: int, chat_history: list, game_session: dict): """ 处理玩家点击选项按钮(流式版本)。 Args: option_idx: 选项索引 (0-5) """ if not game_session or not game_session.get("started"): chat_history = chat_history or [] chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"}) loading = _get_loading_button_updates() yield ( chat_history, _format_world_info_panel(None), "", "", gr.update(value=None, visible=False), *loading, game_session, ) return options = game_session.get("current_options", []) if option_idx >= len(options): btn_updates = _get_button_updates(options) yield ( chat_history, _format_world_info_panel(game_session["game_state"]), _format_status_panel(game_session["game_state"]), _render_text_map(game_session["game_state"]), _get_scene_image_update(game_session["game_state"]), *btn_updates, game_session, ) return selected_option = options[option_idx] gs: GameState = game_session["game_state"] story: StoryEngine = game_session["story"] option_intent = _build_option_intent(selected_option) turn_started = perf_counter() # 检查特殊选项:退出背包(不消耗回合) if selected_option.get("action_type") == "BACKPACK_EXIT": chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"}) chat_history.append({"role": "assistant", "content": "你合上背包,把注意力重新放回当前局势。"}) restored_options = game_session.pop("backpack_return_options", None) if not isinstance(restored_options, list): restored_options = _finalize_session_options([]) game_session["current_options"] = restored_options btn_updates = _get_button_updates(restored_options) yield ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *btn_updates, game_session, ) return from_backpack_menu = str(selected_option.get("menu", "")) == "backpack" # 检查特殊选项:重新开始 if selected_option.get("action_type") == "RESTART": # 重新开始时使用流式开场 game_session = create_new_game(gs.player.name) game_session["started"] = True chat_history = [{"role": "assistant", "content": "⏳ 正在重新生成开场..."}] world_info_text = _format_world_info_panel(game_session["game_state"]) status_text = _format_status_panel(game_session["game_state"]) loading = _get_loading_button_updates() yield ( chat_history, world_info_text, status_text, _render_text_map(game_session["game_state"]), _get_scene_image_update(game_session["game_state"]), *loading, game_session, ) story_text = "" restart_final = None for update in game_session["story"].generate_opening_stream(): if update["type"] == "story_chunk": story_text = update["text"] chat_history[-1]["content"] = story_text yield ( chat_history, world_info_text, status_text, _render_text_map(game_session["game_state"]), _get_scene_image_update(game_session["game_state"]), *loading, game_session, ) elif update["type"] == "final": restart_final = update # ★ 只在流完全结束后提取选项 if restart_final: story_text = restart_final.get("story_text", story_text) restart_options = restart_final.get("options", []) else: restart_options = [] # ★ 安全兜底:强制确保恰好 3 个选项 restart_options = _finalize_session_options(restart_options) game_session["current_options"] = restart_options full_message = story_text chat_history[-1]["content"] = full_message status_text = _format_status_panel(game_session["game_state"]) btn_updates = _get_button_updates(restart_options) yield ( chat_history, _format_world_info_panel(game_session["game_state"]), status_text, _render_text_map(game_session["game_state"]), _get_scene_image_update(game_session["game_state"]), *btn_updates, game_session, ) return # 检查特殊选项:退出 if selected_option.get("action_type") == "QUIT": chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"}) chat_history.append({"role": "assistant", "content": "感谢游玩 StoryWeaver!\n你的冒险到此结束,但故事永远不会真正终结...\n\n点击「开始冒险」可以重新开始。"}) quit_buttons = _get_button_updates( [ {"id": 1, "text": "重新开始", "action_type": "RESTART"}, ] ) yield ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *quit_buttons, game_session, ) return # 正常选项处理:流式生成 chat_history.append({"role": "user", "content": f"选择: {selected_option['text']}"}) chat_history.append({"role": "assistant", "content": "⏳ 正在生成..."}) # 按钮保持可见但禁用 loading = _get_loading_button_updates(max(len(options), MIN_OPTION_BUTTONS)) yield ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *loading, game_session, ) generation_started = perf_counter() final_result = None for update in story.process_option_selection_stream(selected_option): if update["type"] == "story_chunk": chat_history[-1]["content"] = update["text"] yield ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *loading, game_session, ) elif update["type"] == "final": final_result = update generation_latency_ms = (perf_counter() - generation_started) * 1000 if final_result: # 背包菜单内执行使用/装备后,继续停留在背包菜单 if from_backpack_menu: options = _build_backpack_options(gs) else: options = _finalize_session_options(final_result.get("options", [])) game_session["current_options"] = options change_log = final_result.get("change_log", []) log_text = "" if change_log: log_text = "\n".join(f" {c}" for c in change_log) log_text = f"\n\n**状态变化:**\n{log_text}" full_message = f"{final_result['story_text']}{log_text}" chat_history[-1]["content"] = full_message status_text = _format_status_panel(gs) btn_updates = _get_button_updates(options) _record_interaction_log( game_session, input_source="option_click", user_input=selected_option.get("text", ""), intent_result=option_intent, output_text=full_message, latency_ms=(perf_counter() - turn_started) * 1000, generation_latency_ms=generation_latency_ms, final_result=final_result, selected_option=selected_option, ) yield ( chat_history, _format_world_info_panel(gs), status_text, _render_text_map(gs), _get_scene_image_update(gs), *btn_updates, game_session, ) else: # ★ 兜底:final_result 为空,说明流式生成未产生 final 事件 logger.warning("[选项点击] 流式生成未产生 final 事件,使用兜底文本") if from_backpack_menu: fallback_text = "你整理了一下背包,却一时没想好先使用哪件物品。" fallback_options = _build_backpack_options(gs) else: fallback_text = "你环顾四周,思考着接下来该做什么..." fallback_options = _finalize_session_options([]) game_session["current_options"] = fallback_options full_message = fallback_text fallback_result = { "story_text": fallback_text, "options": fallback_options, "state_changes": {}, "change_log": [], "consistency_issues": [], "telemetry": { "engine_mode": "app_fallback", "used_fallback": True, "fallback_reason": "missing_final_event", }, } chat_history[-1]["content"] = full_message status_text = _format_status_panel(gs) btn_updates = _get_button_updates(fallback_options) _record_interaction_log( game_session, input_source="option_click", user_input=selected_option.get("text", ""), intent_result=option_intent, output_text=full_message, latency_ms=(perf_counter() - turn_started) * 1000, generation_latency_ms=generation_latency_ms, final_result=fallback_result, selected_option=selected_option, ) yield ( chat_history, _format_world_info_panel(gs), status_text, _render_text_map(gs), _get_scene_image_update(gs), *btn_updates, game_session, ) return # ============================================================ # UI 辅助函数 # ============================================================ MIN_OPTION_BUTTONS = 3 MAX_OPTION_BUTTONS = 6 # 兜底默认选项(当解析出的选项为空时使用) _FALLBACK_BUTTON_OPTIONS = [ {"id": 1, "text": "查看周围", "action_type": "EXPLORE"}, {"id": 2, "text": "等待一会", "action_type": "REST"}, {"id": 3, "text": "检查状态", "action_type": "EXPLORE"}, ] def _normalize_options( options: list[dict], *, minimum: int = 0, maximum: int = MAX_OPTION_BUTTONS, ) -> list[dict]: """ 规范化选项列表: - 至多保留 maximum 个选项 - 仅当 minimum > 0 时补充兜底项 - 始终重新编号 """ if not isinstance(options, list): options = [] normalized = [opt for opt in options if isinstance(opt, dict)][:maximum] for fb in _FALLBACK_BUTTON_OPTIONS: if len(normalized) >= minimum: break if not any(o.get("text") == fb["text"] for o in normalized): normalized.append(fb.copy()) while len(normalized) < minimum: normalized.append({ "id": len(normalized) + 1, "text": "继续探索", "action_type": "EXPLORE", }) for i, opt in enumerate(normalized[:maximum], 1): if isinstance(opt, dict): opt["id"] = i return normalized[:maximum] def _finalize_session_options(options: list[dict]) -> list[dict]: minimum = MIN_OPTION_BUTTONS if not options else 0 return _normalize_options(options, minimum=minimum) def _format_options(options: list[dict]) -> str: """将选项列表格式化为可读的文本(纯文字,绝不显示 JSON)""" if not options: return "" lines = ["---", "**你的选择:**"] for i, opt in enumerate(options): # 安全提取:兼容 dict 和异常情况 if isinstance(opt, dict): idx = opt.get("id", i + 1) text = opt.get("text", "未知选项") else: idx = i + 1 text = str(opt) lines.append(f" **[{idx}]** {text}") return "\n".join(lines) def _is_backpack_menu_active(options: list[dict]) -> bool: return any( isinstance(opt, dict) and str(opt.get("action_type", "")).upper() == "BACKPACK_EXIT" and str(opt.get("menu", "")) == "backpack" for opt in (options or []) ) def _format_item_function(item_info) -> str: if item_info is None: return "功能未知" if item_info.use_effect: return f"效果:{item_info.use_effect}" if item_info.stat_bonus: bonus_text = ",".join( f"{stat}{'+' if int(value) >= 0 else ''}{int(value)}" for stat, value in item_info.stat_bonus.items() ) return f"装备加成:{bonus_text}" if item_info.lore_text: return f"线索:{item_info.lore_text}" return "暂无可用效果" def _build_backpack_options(gs: GameState) -> list[dict]: inventory = list(gs.player.inventory) if not inventory: return [ {"id": 1, "text": "退出背包", "action_type": "BACKPACK_EXIT", "menu": "backpack"}, ] inventory_order = list(dict.fromkeys(inventory)) equip_types = {"weapon", "armor", "accessory", "helmet", "boots"} consumable_options: list[dict] = [] equip_options: list[dict] = [] for item_name in inventory_order: item_info = gs.world.item_registry.get(item_name) if item_info and gs.is_item_consumable(item_name): consumable_options.append( { "text": f"使用{item_name}", "action_type": "USE_ITEM", "target": item_name, "menu": "backpack", } ) continue if item_info and item_info.item_type in equip_types: equip_options.append( { "text": f"装备{item_name}", "action_type": "EQUIP", "target": item_name, "menu": "backpack", } ) max_action_slots = MAX_OPTION_BUTTONS - 1 merged_actions = (consumable_options + equip_options)[:max_action_slots] merged_actions.append( {"text": "退出背包", "action_type": "BACKPACK_EXIT", "menu": "backpack"} ) return _normalize_options(merged_actions, minimum=0, maximum=MAX_OPTION_BUTTONS) def _format_backpack_story(gs: GameState) -> str: inventory = list(gs.player.inventory) if not inventory: return "你打开背包,里面空空如也。" inventory_counter = Counter(inventory) inventory_order = list(dict.fromkeys(inventory)) lines = ["你打开背包,快速检查随身物资:"] for item_name in inventory_order: item_info = gs.world.item_registry.get(item_name) quantity = inventory_counter.get(item_name, 1) quantity_text = f"x{quantity} " if quantity > 1 else "" description = item_info.description if item_info else "暂无描述" function_text = _format_item_function(item_info) lines.append(f"- {quantity_text}**{item_name}**:{description}({function_text})") lines.append("\n你可以直接在下方选择“使用/装备”对应物品,或退出背包。") return "\n".join(lines) def open_backpack(chat_history: list, game_session: dict): chat_history = chat_history or [] if not game_session or not game_session.get("started"): chat_history.append({"role": "assistant", "content": "请先点击「开始冒险」按钮!"}) loading = _get_loading_button_updates() return ( chat_history, _format_world_info_panel(None), "", "", gr.update(value=None, visible=False), *loading, game_session, ) gs: GameState = game_session["game_state"] current_options = game_session.get("current_options", []) if not _is_backpack_menu_active(current_options): game_session["backpack_return_options"] = copy.deepcopy(current_options) backpack_story = _format_backpack_story(gs) backpack_options = _build_backpack_options(gs) game_session["current_options"] = backpack_options chat_history.append({"role": "user", "content": "打开背包"}) chat_history.append({"role": "assistant", "content": backpack_story}) _record_interaction_log( game_session, input_source="backpack_button", user_input="打开背包", intent_result={"intent": "OPEN_BACKPACK", "target": None}, output_text=backpack_story, latency_ms=0.0, generation_latency_ms=0.0, final_result={ "story_text": backpack_story, "options": backpack_options, "state_changes": {}, "change_log": [], "consistency_issues": [], "telemetry": { "engine_mode": "backpack_menu", "used_fallback": False, "fallback_reason": None, }, }, ) btn_updates = _get_button_updates(backpack_options) return ( chat_history, _format_world_info_panel(gs), _format_status_panel(gs), _render_text_map(gs), _get_scene_image_update(gs), *btn_updates, game_session, ) def _get_loading_button_updates(visible_count: int = MIN_OPTION_BUTTONS) -> list: """返回加载中占位按钮更新,支持最多 6 个选项槽位。""" visible_count = max(0, min(int(visible_count or 0), MAX_OPTION_BUTTONS)) updates = [] for index in range(MAX_OPTION_BUTTONS): updates.append( gr.update( value="...", visible=index < visible_count, interactive=False, ) ) return updates def _get_button_updates(options: list[dict]) -> list: """从选项列表生成按钮更新,始终返回 6 个槽位。""" options = _normalize_options(options, minimum=0) updates = [] for i in range(MAX_OPTION_BUTTONS): opt = options[i] if i < len(options) else None if isinstance(opt, dict): text = opt.get("text", "...") visible = True else: text = "..." visible = False updates.append(gr.update(value=text, visible=visible, interactive=visible)) return updates def _format_status_panel(gs: GameState) -> str: """格式化状态面板文本(双列 HTML 布局,减少滚动)""" p = gs.player w = gs.world effective_stats = gs.get_effective_player_stats() equipment_bonuses = gs.get_equipment_stat_bonuses() env_snapshot = gs.get_environment_snapshot(limit=3) survival_snapshot = gs.get_survival_state_snapshot() scene_summary = gs.get_scene_summary().replace("\n", "
") clock_display = gs.get_clock_display() # 属性进度条 hp_bar = _progress_bar(p.hp, p.max_hp, "HP") mp_bar = _progress_bar(p.mp, p.max_mp, "MP") stamina_bar = _progress_bar(p.stamina, p.max_stamina, "体力") hunger_bar = _progress_bar(p.hunger, 100, "饱食") sanity_bar = _progress_bar(p.sanity, 100, "理智") morale_bar = _progress_bar(p.morale, 100, "士气") # 装备 slot_names = { "weapon": "武器", "armor": "护甲", "accessory": "饰品", "helmet": "头盔", "boots": "靴子", } equip_lines = [] for slot, item in p.equipment.items(): equip_lines.append(f"{slot_names.get(slot, slot)}: {item or '无'}") equip_text = "
".join(equip_lines) def render_stat(stat_key: str, label: str) -> str: base_value = int(getattr(p, stat_key)) bonus_value = int(equipment_bonuses.get(stat_key, 0)) effective_value = int(effective_stats.get(stat_key, base_value)) if bonus_value > 0: return f"{label}: {effective_value} (+{bonus_value} 装备)" if bonus_value < 0: return f"{label}: {effective_value} ({bonus_value} 装备)" return f"{label}: {base_value}" def badge(text: str, bg: str, fg: str = "#1f2937") -> str: return ( f"{text}" ) # 状态效果 if p.status_effects: effect_lines = "
".join( f"{e.name}({e.duration}回合)" for e in p.status_effects ) else: effect_lines = "无" # 背包 if p.inventory: inventory_text = "
".join(p.inventory) else: inventory_text = "空" weather_colors = { "晴朗": "#fef3c7", "多云": "#e5e7eb", "小雨": "#dbeafe", "浓雾": "#e0e7ff", "暴风雨": "#c7d2fe", "大雪": "#f3f4f6", } light_colors = { "明亮": "#fde68a", "柔和": "#fcd34d", "昏暗": "#cbd5e1", "幽暗": "#94a3b8", "漆黑": "#334155", } danger_level = int(env_snapshot.get("danger_level", 0)) if danger_level >= 7: danger_badge = badge(f"危险 {danger_level}/10", "#fecaca", "#7f1d1d") elif danger_level >= 4: danger_badge = badge(f"危险 {danger_level}/10", "#fed7aa", "#9a3412") else: danger_badge = badge(f"危险 {danger_level}/10", "#dcfce7", "#166534") env_badges = "".join( [ badge(f"天气 {w.weather}", weather_colors.get(w.weather, "#e5e7eb")), badge( f"光照 {w.light_level}", light_colors.get(w.light_level, "#e5e7eb"), "#0f172a" if w.light_level not in {"幽暗", "漆黑"} else "#f8fafc", ), danger_badge, badge(f"场景 {env_snapshot.get('location_type', 'unknown')}", "#ede9fe", "#4c1d95"), ] ) recent_env_events = env_snapshot.get("recent_events", []) if recent_env_events: latest_event = recent_env_events[-1] latest_event_html = ( f"
" f"{latest_event.get('title', '环境事件')}" f"
{latest_event.get('description', '')}" f"
" ) recent_event_lines = "
".join( f"- {event.get('title', '环境事件')}" for event in reversed(recent_env_events[-3:]) ) else: latest_event_html = ( "
本回合暂无显式环境事件
" ) recent_event_lines = "无" # 活跃任务(完整展示:描述、子目标、奖励、来源) active_quests = [q for q in w.quests.values() if q.status == "active"] if active_quests: quest_blocks = [] for q in active_quests: done = sum(1 for v in q.objectives.values() if v) total = len(q.objectives) tag = "主线" if q.quest_type == "main" else "支线" if q.quest_type == "side" else "🟡 " + q.quest_type # 子目标列表 obj_lines = "".join( f"
  {'✅' if v else '⬜'} {k}" for k, v in q.objectives.items() ) # 奖励摘要 reward_parts = [] if q.rewards.gold: reward_parts.append(f"{q.rewards.gold}💰") if q.rewards.experience: reward_parts.append(f"{q.rewards.experience}经验") if q.rewards.items: reward_parts.append("、".join(q.rewards.items)) if q.rewards.unlock_skill: reward_parts.append(f"技能:{q.rewards.unlock_skill}") if q.rewards.title: reward_parts.append(f"称号:{q.rewards.title}") reward_str = " | ".join(reward_parts) if reward_parts else "无" block = ( f"
{tag} {q.title}({done}/{total})" f"来源: {q.giver_npc or '未知'}
" f"{q.description}" f"{obj_lines}" f"
奖励: {reward_str}" f"
" ) quest_blocks.append(block) quest_text = "".join(quest_blocks) else: quest_text = "无活跃任务" # 使用 HTML 双列布局 status = f"""

{p.name} — {p.title}

等级 {p.level} | 经验 {p.experience}/{p.exp_to_next_level}

🩸 生命与状态

{hp_bar}
{mp_bar}
{stamina_bar}
{hunger_bar}
{sanity_bar}
{morale_bar}

🎒 背包

{inventory_text}

⚔️ 战斗属性

{render_stat("attack", "攻击")}
{render_stat("defense", "防御")}
{render_stat("speed", "速度")}
{render_stat("luck", "幸运")}
{render_stat("perception", "感知")}

🛡️ 装备

{equip_text}

💰 资源

金币: {p.gold}
善恶值: {p.karma}

✨ 状态效果

{effect_lines}

📜 任务

{quest_text}

🧭 当前场景信息

{env_badges}
时间 {clock_display} | 场景 {w.current_scene} | 状态系数 {survival_snapshot.get('combined_multiplier', 1.0)}
{latest_event_html}
{scene_summary}
最近环境事件: {recent_event_lines}
""" return status def _format_world_info_panel(gs: GameState | None) -> str: """格式化世界信息面板(放在故事框上方)。""" if gs is None: return "🌍 **世界信息**:未开始冒险" w = gs.world if hasattr(gs, "get_clock_display"): clock_display = gs.get_clock_display() else: minute_of_day = int(getattr(w, "time_progress_units", 0)) * 10 % (24 * 60) clock_display = f"{minute_of_day // 60:02d}:{minute_of_day % 60:02d}" current_scene = getattr(w, "current_scene", "未知地点") day_count = getattr(w, "day_count", 1) time_of_day = getattr(w, "time_of_day", "未知时段") weather = getattr(w, "weather", "未知") light_level = getattr(w, "light_level", "未知") season = getattr(w, "season", "未知") return ( "🌍 **世界信息**:" f"位置 {current_scene} | " f"第{day_count}天 {time_of_day}({clock_display}) | " f"天气 {weather} | 光照 {light_level} | " f"季节 {season} | 回合 {getattr(gs, 'turn', 0)}" ) def _progress_bar(current: int, maximum: int, label: str, length: int = 10) -> str: """生成 HTML 进度条(数值单独一行,低于 40% 高亮红色)""" ratio = current / maximum if maximum > 0 else 0 filled = int(ratio * length) empty = length - filled bar = "█" * filled + "░" * empty value_color = "#b91c1c" if ratio < 0.4 else "#0f172a" return ( f"{label}: {bar}" f"
{current}/{maximum}" ) # ============================================================ # Gradio 界面构建 # ============================================================ def build_app() -> gr.Blocks: """构建 Gradio 界面""" with gr.Blocks( title="StoryWeaver - 交互式叙事系统", theme=gr.themes.Soft( primary_hue="emerald", secondary_hue="blue", ), css=APP_UI_CSS, ) as app: gr.Markdown( """ # StoryWeaver — 交互式叙事系统 *基于 AI 的动态分支剧情 RPG 体验* """ ) # 游戏会话状态(Gradio State) game_session = gr.State(value={}) with gr.Row(): # ================== # 左侧:聊天区域 # ================== with gr.Column(scale=10): # 玩家姓名输入 + 开始按钮 with gr.Row(): player_name_input = gr.Textbox( label="角色名称", placeholder="输入你的角色名称(默认: 旅人)", value="旅人", scale=3, ) start_btn = gr.Button( "开始冒险", variant="primary", scale=2, ) restart_btn = gr.Button( "重启冒险", variant="stop", scale=2, ) world_info_panel = gr.Markdown( value=_format_world_info_panel(None), ) # 聊天窗口 chatbot = gr.Chatbot( label="故事", height=480, type="messages", ) location_map_panel = gr.Markdown( elem_classes=["scene-card", "status-panel"], value="地图关系图\n(未开始)", label="地图", ) # 选项按钮(最多 6 个,分两行显示) with gr.Column(): with gr.Row(): option_btn_1 = gr.Button( "...", visible=True, interactive=False, elem_classes=["option-btn"], ) option_btn_2 = gr.Button( "...", visible=True, interactive=False, elem_classes=["option-btn"], ) option_btn_3 = gr.Button( "...", visible=True, interactive=False, elem_classes=["option-btn"], ) with gr.Row(): option_btn_4 = gr.Button( "...", visible=False, interactive=False, elem_classes=["option-btn"], ) option_btn_5 = gr.Button( "...", visible=False, interactive=False, elem_classes=["option-btn"], ) option_btn_6 = gr.Button( "...", visible=False, interactive=False, elem_classes=["option-btn"], ) option_buttons = [ option_btn_1, option_btn_2, option_btn_3, option_btn_4, option_btn_5, option_btn_6, ] # 自由输入 with gr.Row(): user_input = gr.Textbox( label="自由输入(也可以直接点击上方选项)", placeholder="输入你想做的事情,例如:和村长说话、攻击哥布林、搜索这个区域...", scale=5, interactive=False, ) with gr.Column(scale=1): send_btn = gr.Button("发送", variant="primary", elem_classes=["side-action-btn"]) open_backpack_btn = gr.Button( "打开背包", variant="secondary", elem_classes=["side-action-btn", "backpack-btn"], ) # ================== # 右侧:状态面板 # ================== with gr.Column(scale=2, min_width=320, elem_classes=["scene-sidebar"]): scene_image = gr.Image( value=None, type="filepath", label="场景画面", show_label=False, container=False, interactive=False, height=260, visible=False, elem_classes=["scene-card", "scene-image"], ) status_panel = gr.Markdown( elem_classes=["scene-card", "status-panel"], value="## 等待开始...\n\n请输入角色名称并点击「开始冒险」", label="角色状态", ) # ============================================================ # 事件绑定 # ============================================================ # 开始游戏 start_btn.click( fn=start_game, inputs=[player_name_input, game_session], outputs=[ chatbot, world_info_panel, status_panel, location_map_panel, scene_image, *option_buttons, game_session, user_input, ], ) # 重启冒险 restart_btn.click( fn=restart_game, inputs=[], outputs=[ chatbot, world_info_panel, status_panel, location_map_panel, scene_image, *option_buttons, game_session, user_input, player_name_input, ], ) # 文本输入发送 send_btn.click( fn=process_user_input, inputs=[user_input, chatbot, game_session], outputs=[ chatbot, world_info_panel, status_panel, location_map_panel, scene_image, *option_buttons, game_session, ], ).then( fn=lambda: "", outputs=[user_input], ) # 打开背包(常驻按钮) open_backpack_btn.click( fn=open_backpack, inputs=[chatbot, game_session], outputs=[ chatbot, world_info_panel, status_panel, location_map_panel, scene_image, *option_buttons, game_session, ], ) # 回车发送 user_input.submit( fn=process_user_input, inputs=[user_input, chatbot, game_session], outputs=[ chatbot, world_info_panel, status_panel, location_map_panel, scene_image, *option_buttons, game_session, ], ).then( fn=lambda: "", outputs=[user_input], ) # 选项按钮点击(需要使用 yield from 的生成器包装函数, # 使 Gradio 能正确识别为流式输出) def _make_option_click_handler(index: int): def _handler(ch, gs): yield from process_option_click(index, ch, gs) return _handler for index, option_button in enumerate(option_buttons): option_button.click( fn=_make_option_click_handler(index), inputs=[chatbot, game_session], outputs=[ chatbot, world_info_panel, status_panel, location_map_panel, scene_image, *option_buttons, game_session, ], ) return app # ============================================================ # 启动入口 # ============================================================ demo = build_app() if __name__ == "__main__": logger.info("启动 StoryWeaver 交互式叙事系统...") demo.launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True, allowed_paths=[str(IMAGE_DIR)], )