MahjongGameDesigner / design_state.py
zhongchuyi
Initial deployment
20984d5
"""
DesignState 解析与差分(用于"多轮可控迭代")。
约定:
- 模型需在回答中输出一个 ```json ...``` 代码块作为 DesignState
- 该 JSON 应包含 new_variant_name / mechanics 等关键字段
"""
import json
import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
# ==================== 版本历史管理 ====================
class DesignStateHistory:
"""
管理 DesignState 的版本历史,支持回滚
每个版本包含:
- version: 版本号
- state_obj: DesignState 字典
- state_raw: 原始 JSON 字符串
- timestamp: 时间戳
- summary: 紧凑版摘要
"""
def __init__(self, max_versions: int = 20):
self.max_versions = max_versions
self.versions: List[Dict[str, Any]] = []
self.current_index: int = -1 # 当前版本在 versions 中的索引
def add_version(
self,
version: int,
state_obj: Dict[str, Any],
state_raw: str,
summary: str = ""
) -> None:
"""添加新版本"""
if not state_obj:
return
record = {
"version": version,
"state_obj": state_obj.copy() if state_obj else {},
"state_raw": state_raw,
"timestamp": datetime.now().strftime("%H:%M:%S"),
"summary": summary or self._generate_summary(state_obj),
}
# 如果当前不在最新位置,截断后面的版本(类似 git 的分支)
if self.current_index >= 0 and self.current_index < len(self.versions) - 1:
self.versions = self.versions[:self.current_index + 1]
self.versions.append(record)
self.current_index = len(self.versions) - 1
# 超出最大版本数时,删除最早的
if len(self.versions) > self.max_versions:
self.versions.pop(0)
self.current_index = len(self.versions) - 1
def _generate_summary(self, state_obj: Dict[str, Any]) -> str:
"""生成简短摘要"""
name = state_obj.get("new_variant_name", "(未命名)")
mechanics = state_obj.get("mechanics", [])
mech_count = len(mechanics) if isinstance(mechanics, list) else 0
return f"{name} ({mech_count}个机制)"
def get_version(self, version: int) -> Optional[Dict[str, Any]]:
"""获取指定版本"""
for record in self.versions:
if record["version"] == version:
return record
return None
def get_version_by_index(self, index: int) -> Optional[Dict[str, Any]]:
"""通过索引获取版本"""
if 0 <= index < len(self.versions):
return self.versions[index]
return None
def rollback_to(self, version: int) -> Optional[Tuple[Dict[str, Any], str]]:
"""
回滚到指定版本
返回:(state_obj, state_raw) 或 None
"""
for i, record in enumerate(self.versions):
if record["version"] == version:
self.current_index = i
return record["state_obj"].copy(), record["state_raw"]
return None
def get_version_list(self) -> List[Dict[str, Any]]:
"""获取所有版本列表(用于 UI 显示)"""
return [
{
"version": r["version"],
"timestamp": r["timestamp"],
"summary": r["summary"],
"is_current": i == self.current_index,
}
for i, r in enumerate(self.versions)
]
def get_version_choices(self) -> List[str]:
"""生成下拉选项列表"""
choices = []
for i, r in enumerate(self.versions):
marker = " ← 当前" if i == self.current_index else ""
choices.append(f"v{r['version']} [{r['timestamp']}] {r['summary']}{marker}")
return choices
def get_current_version(self) -> int:
"""获取当前版本号"""
if self.current_index >= 0 and self.current_index < len(self.versions):
return self.versions[self.current_index]["version"]
return 0
def clear(self) -> None:
"""清空历史"""
self.versions = []
self.current_index = -1
def to_serializable(self) -> Dict[str, Any]:
"""转为可序列化格式(用于 Gradio State)"""
return {
"versions": self.versions,
"current_index": self.current_index,
"max_versions": self.max_versions,
}
@classmethod
def from_serializable(cls, data: Dict[str, Any]) -> "DesignStateHistory":
"""从序列化格式恢复"""
if not data or not isinstance(data, dict):
return cls()
history = cls(max_versions=data.get("max_versions", 20))
history.versions = data.get("versions", [])
history.current_index = data.get("current_index", -1)
return history
def create_empty_history() -> Dict[str, Any]:
"""创建空的历史记录(用于初始化 State)"""
return DesignStateHistory().to_serializable()
def add_to_history(
history_data: Dict[str, Any],
version: int,
state_obj: Dict[str, Any],
state_raw: str,
summary: str = ""
) -> Dict[str, Any]:
"""添加版本到历史"""
history = DesignStateHistory.from_serializable(history_data)
history.add_version(version, state_obj, state_raw, summary)
return history.to_serializable()
def rollback_history(
history_data: Dict[str, Any],
version: int
) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]], Optional[str]]:
"""
回滚到指定版本
返回:(更新后的 history_data, state_obj, state_raw)
"""
history = DesignStateHistory.from_serializable(history_data)
result = history.rollback_to(version)
if result:
return history.to_serializable(), result[0], result[1]
return history_data, None, None
def get_history_choices(history_data: Dict[str, Any]) -> List[str]:
"""获取版本下拉选项"""
history = DesignStateHistory.from_serializable(history_data)
return history.get_version_choices()
def parse_version_from_choice(choice: str) -> int:
"""从选项字符串中解析版本号"""
if not choice:
return 0
# 格式: "v3 [14:23:45] 玩法名 (2个机制)"
match = re.match(r"v(\d+)", choice)
if match:
return int(match.group(1))
return 0
_JSON_FENCE_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL | re.IGNORECASE)
_READY_RE = re.compile(r"READY_TO_GENERATE\s*:\s*(true|false)", re.IGNORECASE)
def extract_design_state(text: str) -> Tuple[Optional[Dict[str, Any]], str]:
"""
返回:(state_dict_or_none, raw_json_or_empty)
"""
if not text:
return None, ""
candidates = _JSON_FENCE_RE.findall(text)
if not candidates:
return None, ""
# 选择“最像 DesignState”的 JSON:包含几个关键字段
best_raw = ""
best_score = -1
best_obj = None
for raw in candidates:
raw = (raw or "").strip()
try:
obj = json.loads(raw)
except Exception:
continue
score = 0
for k in ["new_variant_name", "mechanics", "tileset", "game_variant", "scoring_mode"]:
if k in obj:
score += 1
if score > best_score:
best_score = score
best_raw = raw
best_obj = obj
return best_obj, best_raw
def extract_ready_to_generate(text: str) -> Optional[bool]:
"""
从文本中提取 READY_TO_GENERATE: true/false
返回 None 表示未提供该标记。
"""
if not text:
return None
m = _READY_RE.search(text)
if not m:
return None
return m.group(1).lower() == "true"
def summarize_design_state(state: Dict[str, Any]) -> str:
if not state:
return ""
name = str(state.get("new_variant_name") or "").strip()
base = state.get("base_variants") or []
fusion = state.get("fusion_variants") or []
mechanics = state.get("mechanics") or []
return "玩法:{0} | 底座:{1} | 融合:{2} | 机制数:{3}".format(
name or "(未命名)",
",".join(base) if isinstance(base, list) else str(base),
",".join(fusion) if isinstance(fusion, list) else str(fusion),
len(mechanics) if isinstance(mechanics, list) else 0,
)
def diff_keys(prev: Dict[str, Any], cur: Dict[str, Any]) -> List[str]:
"""
粗粒度:仅比较顶层 key 的变化(新增/删除/值变更)。
"""
if prev is None or cur is None:
return []
changed = set()
prev_keys = set(prev.keys())
cur_keys = set(cur.keys())
for k in prev_keys.symmetric_difference(cur_keys):
changed.add(k)
for k in prev_keys.intersection(cur_keys):
if prev.get(k) != cur.get(k):
changed.add(k)
return sorted(changed)
def diff_mechanics(prev: Dict[str, Any], cur: Dict[str, Any]) -> List[str]:
"""
简单机制差分:按 mechanic.name 对齐,输出变更摘要字符串列表。
"""
prev_list = prev.get("mechanics") if isinstance(prev, dict) else None
cur_list = cur.get("mechanics") if isinstance(cur, dict) else None
if not isinstance(prev_list, list) or not isinstance(cur_list, list):
return []
def to_map(lst: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
out = {}
for it in lst:
if not isinstance(it, dict):
continue
name = str(it.get("name") or "").strip()
if not name:
continue
out[name] = it
return out
pm = to_map(prev_list)
cm = to_map(cur_list)
changes = []
for name in sorted(set(pm.keys()) | set(cm.keys())):
if name not in pm:
changes.append("新增机制: {0}".format(name))
continue
if name not in cm:
changes.append("删除机制: {0}".format(name))
continue
if pm[name] == cm[name]:
continue
# 只输出变了哪些字段(不展开值)
fields = sorted(set(pm[name].keys()) | set(cm[name].keys()))
touched = [f for f in fields if pm[name].get(f) != cm[name].get(f)]
changes.append("机制变更: {0} 字段={1}".format(name, ",".join(touched)))
return changes
def is_change_within_scope(changed_top_keys: List[str], scope: str) -> bool:
"""
软约束:判断顶层改动是否落在预设范围内。
"""
if not scope or scope == "自由迭代(仍保最小修改)":
return True
allowed = set()
if scope == "仅优化创新机制":
allowed.update(["mechanics", "open_questions", "core_constraints"])
elif scope == "仅优化计分与番型":
allowed.update(["scoring_mode"])
elif scope == "仅优化流程与阶段":
allowed.update(["game_variant"])
elif scope == "仅修复校验问题":
# 允许改动更宽一些:常见缺失字段补齐
allowed.update([
"mechanics",
"tileset",
"core_constraints",
"players",
"game_variant",
"scoring_mode",
])
else:
return True
# 名称/来源类字段默认不允许在"仅优化"里被改(除非自由迭代)
forbidden = {"new_variant_name", "base_variants", "fusion_variants"}
for k in changed_top_keys:
if k in forbidden:
return False
if k not in allowed:
return False
return True
# ==================== 差异可视化(新增) ====================
# 字段中文名映射
_FIELD_NAMES = {
"new_variant_name": "玩法名称",
"base_variants": "底座玩法",
"fusion_variants": "融合玩法",
"game_variant": "游戏模式",
"players": "玩家人数",
"scoring_mode": "计分模式",
"tileset": "牌组配置",
"core_constraints": "核心约束",
"mechanics": "机制列表",
"open_questions": "待定问题",
}
def _format_value(val: Any, max_len: int = 40) -> str:
"""格式化值为简短字符串"""
if val is None:
return "(空)"
if isinstance(val, bool):
return "是" if val else "否"
if isinstance(val, (int, float)):
return str(val)
if isinstance(val, str):
s = val.strip()
return s if len(s) <= max_len else s[:max_len] + "..."
if isinstance(val, list):
if not val:
return "(空列表)"
if len(val) <= 3:
return ", ".join(str(v) for v in val)
return ", ".join(str(v) for v in val[:3]) + f"... 共{len(val)}项"
if isinstance(val, dict):
keys = list(val.keys())
if len(keys) <= 3:
return "{" + ", ".join(keys) + "}"
return "{" + ", ".join(keys[:3]) + f"... 共{len(keys)}项" + "}"
return str(val)[:max_len]
def diff_open_questions(prev: Dict[str, Any], cur: Dict[str, Any]) -> Tuple[List[str], List[str]]:
"""
对比 open_questions 的变化
返回:(已解决的问题列表, 新增的问题列表)
"""
prev_qs = prev.get("open_questions") if isinstance(prev, dict) else None
cur_qs = cur.get("open_questions") if isinstance(cur, dict) else None
if not isinstance(prev_qs, list):
prev_qs = []
if not isinstance(cur_qs, list):
cur_qs = []
prev_set = set(str(q) for q in prev_qs)
cur_set = set(str(q) for q in cur_qs)
resolved = sorted(prev_set - cur_set)
added = sorted(cur_set - prev_set)
return resolved, added
def generate_diff_summary(
prev: Optional[Dict[str, Any]],
cur: Optional[Dict[str, Any]],
prev_version: int = 0,
cur_version: int = 0
) -> str:
"""
生成人可读的 DesignState 变更摘要(Markdown 格式)
"""
if not cur:
return ""
if not prev:
# 首次建立
name = cur.get("new_variant_name", "(未命名)")
mechanics = cur.get("mechanics", [])
mech_count = len(mechanics) if isinstance(mechanics, list) else 0
return f"**DesignState v{cur_version} 已建立**\n\n玩法:{name}\n机制数:{mech_count}"
lines = []
version_str = f"v{prev_version} → v{cur_version}" if prev_version and cur_version else ""
lines.append(f"**DesignState 变更摘要** {version_str}")
lines.append("")
# 1. 顶层字段变更(排除 mechanics 和 open_questions,单独处理)
skip_keys = {"mechanics", "open_questions"}
changed_fields = []
all_keys = set(prev.keys()) | set(cur.keys())
for key in sorted(all_keys):
if key in skip_keys:
continue
prev_val = prev.get(key)
cur_val = cur.get(key)
if prev_val != cur_val:
field_name = _FIELD_NAMES.get(key, key)
if key not in prev:
changed_fields.append(f" • **[新增]** {field_name}: {_format_value(cur_val)}")
elif key not in cur:
changed_fields.append(f" • **[删除]** {field_name}")
else:
changed_fields.append(f" • {field_name}: `{_format_value(prev_val)}` → `{_format_value(cur_val)}`")
if changed_fields:
lines.append("📝 **顶层字段变更:**")
lines.extend(changed_fields)
lines.append("")
# 2. 机制变更
mech_changes = diff_mechanics(prev, cur)
if mech_changes:
lines.append("🔧 **机制变更:**")
for change in mech_changes:
if change.startswith("新增机制"):
lines.append(f" • **[新增]** {change.replace('新增机制: ', '')}")
elif change.startswith("删除机制"):
lines.append(f" • **[删除]** {change.replace('删除机制: ', '')}")
else:
# 机制变更: XXX 字段=a,b,c
parts = change.replace("机制变更: ", "").split(" 字段=")
if len(parts) == 2:
lines.append(f" • **[修改]** {parts[0]}: {parts[1]}")
else:
lines.append(f" • {change}")
lines.append("")
# 3. 待定问题变化
resolved, added = diff_open_questions(prev, cur)
if resolved or added:
lines.append("❓ **待定问题变化:**")
for q in resolved:
lines.append(f" • ~~[已解决]~~ {q[:50]}{'...' if len(q) > 50 else ''}")
for q in added:
lines.append(f" • **[新增]** {q[:50]}{'...' if len(q) > 50 else ''}")
lines.append("")
# 如果没有任何变更
if len(lines) <= 2:
lines.append("_(无变更)_")
return "\n".join(lines).strip()
def generate_diff_summary_compact(
prev: Optional[Dict[str, Any]],
cur: Optional[Dict[str, Any]]
) -> str:
"""
生成紧凑版变更摘要(单行,用于状态栏)
"""
if not cur:
return "无变更"
if not prev:
name = cur.get("new_variant_name", "(未命名)")
return f"新建: {name}"
changes = []
# 顶层字段变更数
field_changes = diff_keys(prev, cur)
skip_keys = {"mechanics", "open_questions"}
field_changes = [k for k in field_changes if k not in skip_keys]
if field_changes:
changes.append(f"{len(field_changes)}个字段")
# 机制变更数
mech_changes = diff_mechanics(prev, cur)
if mech_changes:
changes.append(f"{len(mech_changes)}个机制")
# 问题变化
resolved, added = diff_open_questions(prev, cur)
if resolved:
changes.append(f"解决{len(resolved)}个问题")
if added:
changes.append(f"新增{len(added)}个问题")
if not changes:
return "无变更"
return "变更: " + ", ".join(changes)
# ==================== 多阶段引导式交互 ====================
class InteractionPhase:
"""交互阶段枚举"""
INITIAL = "initial" # 初始阶段:用户提出需求
UNDERSTAND = "understand" # 理解确认:确认对已有玩法的理解
DIVERGE = "diverge" # 方案发散:生成多种机制组合方案
SELECT = "select" # 方案选择:用户选择具体方案
ELABORATE = "elaborate" # 深入展开:对选定方案进行详细设计
ITERATE = "iterate" # 迭代优化:基于反馈优化方案
# 方案提取正则
_PROPOSAL_BLOCK_RE = re.compile(
r"(?:###?\s*)?(?:方案|选项|Option)\s*([A-Z\d一二三四五六七八九十]+)[::\s]*(.+?)(?=(?:###?\s*)?(?:方案|选项|Option)\s*[A-Z\d一二三四五六七八九十]+[::\s]|$)",
re.DOTALL | re.IGNORECASE
)
# 理解确认问题提取
_CLARIFY_QUESTION_RE = re.compile(
r"(?:❓|🤔|【确认】|【问题】|\[确认\]|\[问题\]|请确认|请问|是否是指)\s*(.+?\?)",
re.DOTALL
)
def extract_proposals(text: str) -> List[Dict[str, Any]]:
"""
从模型输出中提取多个候选方案
返回: [{"id": "A", "title": "...", "description": "...", "highlights": [...]}]
"""
if not text:
return []
proposals = []
matches = _PROPOSAL_BLOCK_RE.findall(text)
for idx, (proposal_id, content) in enumerate(matches):
content = content.strip()
# 提取标题(第一行或冒号前的部分)
lines = content.split("\n")
title = lines[0].strip() if lines else f"方案 {proposal_id}"
# 清理标题中的 markdown 标记
title = re.sub(r"^[#\-\*]+\s*", "", title)
title = re.sub(r"\*+", "", title)
# 提取描述
description = "\n".join(lines[1:]).strip() if len(lines) > 1 else ""
# 提取亮点/创新点
highlights = []
highlight_patterns = [
r"[★✦⭐🌟]\s*(.+)",
r"(?:创新点|亮点|特色)[::]\s*(.+)",
r"[-•]\s*(?:创新|特色|核心)[::]\s*(.+)",
]
for pattern in highlight_patterns:
hl_matches = re.findall(pattern, content)
highlights.extend([h.strip() for h in hl_matches])
proposals.append({
"id": proposal_id.strip(),
"title": title[:50], # 限制标题长度
"description": description[:200] + ("..." if len(description) > 200 else ""),
"highlights": highlights[:3], # 最多3个亮点
"full_content": content,
})
return proposals
def extract_clarify_questions(text: str) -> List[str]:
"""
从模型输出中提取需要用户确认的问题
"""
if not text:
return []
questions = []
matches = _CLARIFY_QUESTION_RE.findall(text)
for q in matches:
q = q.strip()
if q and len(q) > 5: # 过滤太短的
questions.append(q)
# 也尝试提取 open_questions 中的内容
try:
ds_obj, _ = extract_design_state(text)
if ds_obj and "open_questions" in ds_obj:
for q in ds_obj.get("open_questions", []):
if isinstance(q, str) and q not in questions:
questions.append(q)
except Exception:
pass
return questions[:5] # 最多5个问题
def detect_interaction_phase(text: str, ds_obj: Optional[Dict[str, Any]] = None) -> str:
"""
根据模型输出内容检测当前交互阶段
"""
if not text:
return InteractionPhase.INITIAL
text_lower = text.lower()
# 检测是否有多个方案供选择
proposals = extract_proposals(text)
if len(proposals) >= 2:
return InteractionPhase.DIVERGE
# 检测是否有需要确认的问题
questions = extract_clarify_questions(text)
if questions:
return InteractionPhase.UNDERSTAND
# 检测是否有完整的 DesignState 且 READY_TO_GENERATE
ready = extract_ready_to_generate(text)
if ready is True:
return InteractionPhase.ELABORATE
# 检测是否在迭代中
if ds_obj and ds_obj.get("mechanics"):
return InteractionPhase.ITERATE
return InteractionPhase.INITIAL
def format_proposals_for_display(proposals: List[Dict[str, Any]]) -> str:
"""
将方案列表格式化为 Markdown 显示
"""
if not proposals:
return ""
lines = ["## 🎯 请选择一个方案深入展开\n"]
for p in proposals:
lines.append(f"### 方案 {p['id']}: {p['title']}")
if p.get("highlights"):
for hl in p["highlights"]:
lines.append(f" ✦ {hl}")
if p.get("description"):
lines.append(f"\n{p['description'][:150]}...")
lines.append("")
lines.append("\n💡 请在下方选择方案编号,或输入「其他」描述你的想法。")
return "\n".join(lines)
def generate_phase_prompt_hint(phase: str, ds_obj: Optional[Dict[str, Any]] = None) -> str:
"""
根据当前阶段生成提示词补充
"""
hints = {
InteractionPhase.INITIAL: "",
InteractionPhase.UNDERSTAND: (
"\n\n【阶段提示】当前处于「理解确认」阶段。"
"请确保你完全理解用户描述的已有玩法,如有不明确之处请提出确认问题。"
),
InteractionPhase.DIVERGE: (
"\n\n【阶段提示】当前处于「方案发散」阶段。"
"请发散思维,给出 2-4 种不同方向的机制组合方案,每个方案需包含:\n"
"1. 方案编号(A/B/C/D)\n"
"2. 方案名称\n"
"3. 核心创新点(1-2句话)\n"
"4. 机制组合简述\n"
"不要深入展开,等用户选择后再详细设计。"
),
InteractionPhase.SELECT: (
"\n\n【阶段提示】用户已选择方案,请对该方案进行深入展开设计。"
),
InteractionPhase.ELABORATE: (
"\n\n【阶段提示】当前处于「深入展开」阶段。"
"请输出完整的玩法设计(含 DesignState、mGDL、自检报告)。"
),
InteractionPhase.ITERATE: (
"\n\n【阶段提示】当前处于「迭代优化」阶段。"
"请基于用户反馈对现有方案进行最小修改。"
),
}
return hints.get(phase, "")
def create_phase_state() -> Dict[str, Any]:
"""创建阶段状态(用于 Gradio State)"""
return {
"current_phase": InteractionPhase.INITIAL,
"confirmed_understanding": [], # 已确认的理解点
"proposals": [], # 当前可选方案
"selected_proposal": None, # 用户选择的方案
"phase_history": [], # 阶段历史
}
def update_phase_state(
phase_state: Dict[str, Any],
new_phase: str,
proposals: Optional[List[Dict[str, Any]]] = None,
selected: Optional[str] = None,
confirmed: Optional[List[str]] = None,
) -> Dict[str, Any]:
"""更新阶段状态"""
state = phase_state.copy() if phase_state else create_phase_state()
# 记录阶段变化历史
if state["current_phase"] != new_phase:
state["phase_history"].append({
"from": state["current_phase"],
"to": new_phase,
"timestamp": datetime.now().strftime("%H:%M:%S"),
})
state["current_phase"] = new_phase
if proposals is not None:
state["proposals"] = proposals
if selected is not None:
state["selected_proposal"] = selected
if confirmed is not None:
state["confirmed_understanding"].extend(confirmed)
return state
def get_phase_display_name(phase: str) -> str:
"""获取阶段的显示名称"""
names = {
InteractionPhase.INITIAL: "🚀 初始",
InteractionPhase.UNDERSTAND: "🔍 理解确认",
InteractionPhase.DIVERGE: "💡 方案发散",
InteractionPhase.SELECT: "✅ 方案选择",
InteractionPhase.ELABORATE: "📝 深入展开",
InteractionPhase.ITERATE: "🔄 迭代优化",
}
return names.get(phase, phase)
# ==================== 细粒度范围控制 ====================
# DesignState 可控字段定义
CONTROLLABLE_FIELDS = {
"new_variant_name": {"label": "玩法名称", "category": "基础信息", "lockable": True},
"base_variants": {"label": "底座玩法", "category": "基础信息", "lockable": True},
"fusion_variants": {"label": "融合玩法", "category": "基础信息", "lockable": True},
"game_variant": {"label": "游戏模式", "category": "游戏规则", "lockable": True},
"players": {"label": "玩家人数", "category": "游戏规则", "lockable": True},
"scoring_mode": {"label": "计分模式", "category": "计分系统", "lockable": True},
"tileset": {"label": "牌组配置", "category": "游戏规则", "lockable": True},
"core_constraints": {"label": "核心约束", "category": "游戏规则", "lockable": False},
"mechanics": {"label": "机制列表", "category": "创新机制", "lockable": False},
"open_questions": {"label": "待定问题", "category": "其他", "lockable": False},
}
# 字段分类
FIELD_CATEGORIES = {
"基础信息": ["new_variant_name", "base_variants", "fusion_variants"],
"游戏规则": ["game_variant", "players", "tileset", "core_constraints"],
"计分系统": ["scoring_mode"],
"创新机制": ["mechanics"],
"其他": ["open_questions"],
}
class ScopeConstraint:
"""范围约束类型"""
SOFT = "soft" # 软约束:检测到越界时提示,但不阻断
HARD = "hard" # 硬约束:检测到越界时阻断并要求重做
def create_scope_config() -> Dict[str, Any]:
"""创建默认的范围配置"""
return {
"mode": "preset", # preset(预设模式)或 custom(自定义模式)
"preset": "自由迭代(仍保最小修改)", # 预设选项
"constraint_level": ScopeConstraint.SOFT, # 约束级别
"locked_fields": [], # 锁定的顶层字段
"allowed_fields": [], # 允许修改的顶层字段(custom 模式下使用)
"locked_mechanics": [], # 锁定的机制名称(不允许修改这些机制)
"allowed_mechanics": [], # 只允许修改这些机制(为空表示不限制)
}
def get_scope_preset_options() -> List[str]:
"""获取预设范围选项"""
return [
"自由迭代(仍保最小修改)",
"仅优化创新机制",
"仅优化计分与番型",
"仅优化流程与阶段",
"仅修复校验问题",
"锁定核心(仅微调细节)",
]
def get_allowed_fields_for_preset(preset: str) -> List[str]:
"""根据预设获取允许修改的字段"""
presets = {
"自由迭代(仍保最小修改)": list(CONTROLLABLE_FIELDS.keys()),
"仅优化创新机制": ["mechanics", "open_questions", "core_constraints"],
"仅优化计分与番型": ["scoring_mode", "open_questions"],
"仅优化流程与阶段": ["game_variant", "open_questions"],
"仅修复校验问题": ["mechanics", "tileset", "core_constraints", "players", "game_variant", "scoring_mode", "open_questions"],
"锁定核心(仅微调细节)": ["open_questions", "core_constraints"],
}
return presets.get(preset, list(CONTROLLABLE_FIELDS.keys()))
def get_forbidden_fields_for_preset(preset: str) -> List[str]:
"""根据预设获取禁止修改的字段"""
if preset == "自由迭代(仍保最小修改)":
return []
allowed = set(get_allowed_fields_for_preset(preset))
all_fields = set(CONTROLLABLE_FIELDS.keys())
return list(all_fields - allowed)
def validate_scope_compliance(
prev: Dict[str, Any],
cur: Dict[str, Any],
scope_config: Dict[str, Any],
) -> Dict[str, Any]:
"""
验证变更是否符合范围约束
返回:
{
"compliant": bool, # 是否符合约束
"violations": [ # 违规列表
{"field": "xxx", "type": "field_locked|field_forbidden|mechanic_locked", "detail": "..."}
],
"warnings": [], # 警告(软约束时)
"summary": "..." # 摘要文本
}
"""
if not prev or not cur:
return {"compliant": True, "violations": [], "warnings": [], "summary": "无变更"}
result = {
"compliant": True,
"violations": [],
"warnings": [],
"summary": "",
}
mode = scope_config.get("mode", "preset")
constraint_level = scope_config.get("constraint_level", ScopeConstraint.SOFT)
# 确定允许和禁止的字段
if mode == "preset":
preset = scope_config.get("preset", "自由迭代(仍保最小修改)")
allowed_fields = set(get_allowed_fields_for_preset(preset))
forbidden_fields = set(get_forbidden_fields_for_preset(preset))
else:
allowed_fields = set(scope_config.get("allowed_fields", []))
forbidden_fields = set(scope_config.get("locked_fields", []))
locked_mechanics = set(scope_config.get("locked_mechanics", []))
allowed_mechanics = set(scope_config.get("allowed_mechanics", []))
# 检查顶层字段变更
changed_fields = diff_keys(prev, cur)
for field in changed_fields:
# 检查是否在禁止列表中
if field in forbidden_fields:
violation = {
"field": field,
"type": "field_forbidden",
"detail": f"字段「{CONTROLLABLE_FIELDS.get(field, {}).get('label', field)}」不在允许修改范围内",
}
result["violations"].append(violation)
continue
# 检查是否被显式锁定
if field in scope_config.get("locked_fields", []):
violation = {
"field": field,
"type": "field_locked",
"detail": f"字段「{CONTROLLABLE_FIELDS.get(field, {}).get('label', field)}」已被锁定",
}
result["violations"].append(violation)
continue
# 检查机制级别变更
if "mechanics" in changed_fields:
mech_changes = diff_mechanics(prev, cur)
for change in mech_changes:
# 提取机制名称
mech_name = ""
if "新增机制:" in change:
mech_name = change.replace("新增机制: ", "").strip()
elif "删除机制:" in change:
mech_name = change.replace("删除机制: ", "").strip()
elif "机制变更:" in change:
mech_name = change.split(" 字段=")[0].replace("机制变更: ", "").strip()
# 检查机制是否被锁定
if mech_name and mech_name in locked_mechanics:
violation = {
"field": "mechanics",
"type": "mechanic_locked",
"detail": f"机制「{mech_name}」已被锁定,不允许修改",
}
result["violations"].append(violation)
# 检查是否只允许修改特定机制
if allowed_mechanics and mech_name and mech_name not in allowed_mechanics:
# 只对修改和删除进行限制,新增通常是允许的
if "删除机制:" in change or "机制变更:" in change:
violation = {
"field": "mechanics",
"type": "mechanic_not_allowed",
"detail": f"机制「{mech_name}」不在允许修改的机制列表中",
}
result["violations"].append(violation)
# 根据约束级别处理违规
if result["violations"]:
if constraint_level == ScopeConstraint.HARD:
result["compliant"] = False
result["summary"] = f"❌ 发现 {len(result['violations'])} 处范围越界(硬约束),请重新生成"
else:
result["warnings"] = result["violations"]
result["violations"] = []
result["summary"] = f"⚠️ 发现 {len(result['warnings'])} 处范围越界(软约束),建议检查"
else:
result["summary"] = "✅ 变更符合范围约束"
return result
def format_scope_violations(validation_result: Dict[str, Any]) -> str:
"""格式化范围违规为 Markdown"""
if validation_result.get("compliant", True) and not validation_result.get("warnings"):
return validation_result.get("summary", "")
lines = [validation_result.get("summary", "")]
items = validation_result.get("violations", []) or validation_result.get("warnings", [])
if items:
lines.append("")
for item in items:
field_label = CONTROLLABLE_FIELDS.get(item["field"], {}).get("label", item["field"])
lines.append(f" • **{field_label}**: {item['detail']}")
return "\n".join(lines)
def get_mechanics_from_state(ds_obj: Dict[str, Any]) -> List[str]:
"""从 DesignState 中提取机制名称列表"""
if not ds_obj:
return []
mechanics = ds_obj.get("mechanics", [])
if not isinstance(mechanics, list):
return []
names = []
for m in mechanics:
if isinstance(m, dict) and "name" in m:
names.append(m["name"])
return names
def generate_scope_prompt_hint(scope_config: Dict[str, Any], ds_obj: Optional[Dict[str, Any]] = None) -> str:
"""生成范围约束的提示词补充"""
mode = scope_config.get("mode", "preset")
if mode == "preset":
preset = scope_config.get("preset", "自由迭代(仍保最小修改)")
if preset == "自由迭代(仍保最小修改)":
return ""
allowed = get_allowed_fields_for_preset(preset)
forbidden = get_forbidden_fields_for_preset(preset)
allowed_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in allowed]
forbidden_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in forbidden]
hint = f"\n\n【范围约束】当前模式:{preset}\n"
hint += f"允许修改:{', '.join(allowed_labels)}\n"
if forbidden_labels:
hint += f"禁止修改:{', '.join(forbidden_labels)}\n"
return hint
# 自定义模式
locked_fields = scope_config.get("locked_fields", [])
locked_mechanics = scope_config.get("locked_mechanics", [])
if not locked_fields and not locked_mechanics:
return ""
hint = "\n\n【范围约束】自定义模式\n"
if locked_fields:
locked_labels = [CONTROLLABLE_FIELDS.get(f, {}).get("label", f) for f in locked_fields]
hint += f"锁定字段(禁止修改):{', '.join(locked_labels)}\n"
if locked_mechanics:
hint += f"锁定机制(禁止修改):{', '.join(locked_mechanics)}\n"
return hint