# -*- coding: utf-8 -*- """ normalizer_v2.py — 语义标准化(兼容多布局,提取 mechanics 名称/阶段/时机) """ from __future__ import annotations from typing import Any, Dict, List from copy import deepcopy def _as_list(x): if x is None: return [] return x if isinstance(x, list) else [x] def _as_dict(x): return x if isinstance(x, dict) else {} def _scalar(x: Any) -> str: if isinstance(x, str): return x if isinstance(x, (int, float)): return str(x) if isinstance(x, dict): if "_" in x and isinstance(x["_"], list) and x["_"] and isinstance(x["_"][0], (str,int,float)): return _scalar(x["_"][0]) if "name" in x and isinstance(x["name"], str): return x["name"] if len(x)==1: k, v = next(iter(x.items())) if v in (None, {}) and isinstance(k, str): return k if isinstance(x, list) and x: return _scalar(x[0]) return "" def _is_standard_mechanic_structure(node: Any) -> bool: """ 检查一个节点是否是标准的 {"mechanic": {...}} 结构。 这是 GDL 中 (mechanic ...) 被 _to_obj 解析后的直接结果。 """ return isinstance(node, dict) and len(node) == 1 and 'mechanic' in node and isinstance(node['mechanic'], dict) def _extract_mechanics_from_sm(sm_node: Any) -> List[Dict[str, Any]]: """ 专门从 special_mechanics 的节点中提取 mechanic 定义。 sm_node 是 special_mechanics 键对应的值。 这个函数严格处理 special_mechanics 的内容,避免误判内部结构。 """ mechanics = [] # sm_node 应该是 {'mechanic': [...]} 或 {'mechanic': {...}} 或 [{'mechanic': {...}}, ...] sm_dict = _as_dict(sm_node) if 'mechanic' in sm_dict: mech_content = sm_dict.get("mechanic") if isinstance(mech_content, dict): # Case 1: {"mechanic": {"mechanic_name": {...}, "mechanic_name2": {...}}} # 这种情况发生在 _to_obj 合并了多个同名 mechanic 时 for name, definition in mech_content.items(): if isinstance(definition, dict): # 构造标准的 mechanic 结构 mechanic_def = definition.copy() # 确保有 name 字段 if 'name' not in mechanic_def: mechanic_def['name'] = name mechanics.append({"mechanic": mechanic_def}) elif isinstance(mech_content, list): # Case 2: {"mechanic": [...]} for item in mech_content: # item 应该是 {"_": ["Name", ...], ...} 或 {"name": "Name", ...} 这样的结构 inside the mechanic # Check if item itself is the content of a mechanic (i.e., has 'name' or '_') if isinstance(item, dict) and ('name' in item or ('_' in item and isinstance(item.get('_'), list) and item['_'] and isinstance(item['_'][0], str))): # Wrap it in the standard {"mechanic": {...}} format mechanics.append({"mechanic": item}) elif isinstance(sm_node, list): # Case 3: [{"mechanic": {...}}, ...] - A list of standard structures for item in sm_node: if _is_standard_mechanic_structure(item): mechanics.append(item) # item is {"mechanic": {...}} elif _is_standard_mechanic_structure(sm_node): # Case 4: {"mechanic": {...}} - A single standard structure mechanics.append(sm_node) # sm_node is {"mechanic": {...}} # If sm_node is just a dict like {"_": ["Name", ...]}, it's likely an error or edge case from _merge_kv_list_to_dict # We ignore this case as it shouldn't happen if special_mechanics is structured correctly in GDL. return mechanics def _walk_collect_mechs(node, bucket, inside_special_mechanics=False): """ 递归收集 special_mechanics / actions.special inside_special_mechanics 标志用于防止在 mechanic 内部结构中递归查找其他 mechanic。 """ if isinstance(node, dict): # 专门处理 special_mechanics 键 - 这是主要的 mechanic 来源 if "special_mechanics" in node: sm_node = node.get("special_mechanics") # Extract mechanics only from this specific key using the dedicated function extracted = _extract_mechanics_from_sm(sm_node) bucket.extend(extracted) # Mark that we are now processing inside special_mechanics context for recursion # We don't pass this flag deeper here, as _extract_mechanics_from_sm handles the context. # 处理 actions.special elif "actions" in node and not inside_special_mechanics: # Only process actions if not already inside special_mechanics a = _as_dict(node.get("actions")) special_actions = _as_list(a.get("special")) # 检查 special_actions 中是否包含 mechanic 定义 (This is less common than in special_mechanics) for sa in special_actions: # sa 可能是 {"name": "...", ...} or {"_": ["Name", ...], ...} # Check if it matches a basic mechanic-like structure if isinstance(sa, dict) and ("name" in sa or ("_" in sa and isinstance(sa.get("_"), list) and sa["_"] and isinstance(sa["_"][0], str))): bucket.append({"mechanic": sa}) # Wrap in standard format # Recurse into other keys, but avoid recursing into known internal structures of a mechanic # like transfer_path, visibility_change, etc., which are unlikely to contain top-level mechanics. # The main risk was from 'phases' -> 'mechanic_conditions', which we handled by using _extract_mechanics_from_sm. # We can now recurse more safely, but mark if we are inside a mechanic's content. for k, v in node.items(): if k in ("special_mechanics", "actions"): # Already handled these above continue # Do not recurse into known internal structures that are not top-level mechanic containers if k in ("transfer_path", "visibility_change", "params"): # Add other known internal keys if needed continue _walk_collect_mechs(v, bucket, inside_special_mechanics=inside_special_mechanics) elif isinstance(node, list): # If the top-level is a list containing special_mechanics, handle it # Or if it's a list inside a mechanic, recurse carefully for v in node: _walk_collect_mechs(v, bucket, inside_special_mechanics) def _collect_mechanics(parsed: Dict[str, Any]) -> List[Dict[str, Any]]: items: List[Any] = [] # Start the walk from the top-level game structure _walk_collect_mechs(parsed, items, inside_special_mechanics=False) mechs: List[Dict[str, Any]] = [] for i, raw in enumerate(items): m = _as_dict(raw) if not m: continue # --- Process standard structure {"mechanic": {...}} --- m_content = m if len(m) == 1 and 'mechanic' in m and isinstance(m['mechanic'], dict): m_content = m['mechanic'] else: # If the raw item is not in the expected {"mechanic": {...}} format, # it means the extraction logic failed or it was collected incorrectly. # For safety, skip. continue # Now m_content holds the actual mechanic definition like {"_": ["Name", ...], ...} or {"name": "Name", ...} name = _scalar(m_content.get("name") or (m_content.get("_") or [None])[0] or "") if not name: name = f"mechanic@{i}" phase = _scalar(m_content.get("phase") or "playing_phase") # 先拿原词 timing = _scalar(m_content.get("timing") or "") # 修改:将 phase 映射为标准名称,例如 playing_phase -> playing standard_phase = phase.replace("_phase", "") mech = { "name": name, "enabled": True if _scalar(m_content.get("enabled")) in ("true","1","yes","True") else True, "description": _scalar(m_content.get("description")), "phase": standard_phase, # 使用标准名称 "timing": timing, "trigger_condition": _scalar(m_content.get("trigger_condition")), "raw_definition": { k: v for k, v in m_content.items() if k not in ("name","enabled","description","phase","timing","trigger_condition") } } mechs.append(mech) return mechs def _collect_mechanics_from_sexpr(parsed: Dict[str, Any]) -> List[Dict[str, Any]]: """ 直接从 S-expression 中收集 mechanics,避免 _to_obj 的键合并问题 """ from gdl_parser_v2 import parse_sexpr mechanics = [] def find_and_extract_mechanics(obj, path=''): if isinstance(obj, dict): for k, v in obj.items(): if k == 'special_mechanics': # 找到 special_mechanics,尝试提取其中的 mechanics if isinstance(v, dict) and 'mechanic' in v: mech_content = v['mechanic'] if isinstance(mech_content, dict): # 处理合并后的情况:{"mechanic_name": {...}, "mechanic_name2": {...}} for name, definition in mech_content.items(): if isinstance(definition, dict): mechanics.append(_process_mechanic_definition(name, definition)) else: find_and_extract_mechanics(v, f'{path}.{k}' if path else k) elif isinstance(obj, list): for i, item in enumerate(obj): find_and_extract_mechanics(item, f'{path}[{i}]') find_and_extract_mechanics(parsed) return mechanics def _collect_mechanics_from_raw_sexpr(gdl_text: str) -> List[Dict[str, Any]]: """ 直接从原始 GDL 文本中收集 mechanics,避免解析器的键合并问题 """ from gdl_parser_v2 import parse_sexpr mechanics = [] # 找到 special_mechanics 部分 lines = gdl_text.split('\n') start_idx = -1 end_idx = -1 paren_count = 0 for i, line in enumerate(lines): if '(special_mechanics' in line: start_idx = i paren_count = line.count('(') - line.count(')') break if start_idx != -1: for i in range(start_idx + 1, len(lines)): paren_count += lines[i].count('(') - lines[i].count(')') if paren_count == 0: end_idx = i break if start_idx != -1 and end_idx != -1: special_mechanics_lines = lines[start_idx:end_idx + 1] special_mechanics_text = '\n'.join(special_mechanics_lines) try: sexpr = parse_sexpr(special_mechanics_text) if len(sexpr) > 0 and isinstance(sexpr[0], list): for item in sexpr[0]: if isinstance(item, list) and len(item) > 0 and item[0] == 'mechanic': # 这是一个 mechanic 定义 name = item[1] if len(item) > 1 else "unknown" # 解析 mechanic 的属性 definition = {} for i in range(2, len(item)): if isinstance(item[i], list) and len(item[i]) >= 2: key = item[i][0] value = item[i][1] if len(item[i]) == 2 else item[i][1:] definition[key] = value mechanics.append(_process_mechanic_definition(name, definition)) except Exception as e: print(f"Error parsing special_mechanics: {e}") return mechanics def _process_mechanic_definition(name: str, definition: Dict[str, Any]) -> Dict[str, Any]: """处理单个 mechanic 定义""" phase = _scalar(definition.get("phase") or "playing_phase") timing = _scalar(definition.get("timing") or "") standard_phase = phase.replace("_phase", "") return { "name": name, "enabled": True if _scalar(definition.get("enabled")) in ("true","1","yes","True") else True, "description": _scalar(definition.get("description")), "phase": standard_phase, "timing": timing, "trigger_condition": _scalar(definition.get("trigger_condition")), "raw_definition": { k: v for k, v in definition.items() if k not in ("name","enabled","description","phase","timing","trigger_condition") } } def normalize_ir(parsed: Dict[str, Any], gdl_text: str = None) -> Dict[str, Any]: p = _as_dict(parsed) game = _as_dict(p.get("game") or p) out: Dict[str, Any] = {"game": {}} # ---- name ---- nm = _scalar(game.get("name") or game.get("_") or "") out["game"]["name"] = nm or "UnnamedGame" # ---- players count ---- players_count = game.get("players", 0) out["game"]["players_count"] = players_count # 为 mapper_v2 提供原始玩家数量 # ---- roles ---- roles: List[Dict[str, Any]] = [] if "roles" in game: flat = [] for r in _as_list(game["roles"]): if isinstance(r, list): flat.extend(r) else: flat.append(r) for r in flat: rd = _as_dict(r) if "role" in rd and isinstance(rd["role"], dict): u = rd["role"].get("_") if isinstance(u, list) and u: name = _scalar(u[0]); cnt = 1 if len(u) >= 2: try: cnt = int(u[1]) except: cnt = 1 roles.append({"name": name or "Player", "count": max(1, cnt)}); continue name = _scalar(rd.get("name") or rd.get("role") or "Player") cnt = rd.get("count", 1) try: cnt=int(cnt) except: cnt=1 roles.append({"name": name, "count": max(1,cnt)}) out["game"]["roles"] = roles or [{"name":"Player","count": game.get("players") or 0}] # ---- turns.order from turn_order ---- order = [] to = _as_list(game.get("turn_order")) if to: flat=[] for elem in to: if isinstance(elem, list): flat.extend(elem) else: flat.append(elem) # {"Landlord":{"_":[...]} } if flat and isinstance(flat[0], dict) and len(flat[0])==1: k, v = next(iter(flat[0].items())) seq = [k] if isinstance(v, dict) and isinstance(v.get("_"), list): seq.extend(v.get("_")) order = [ _scalar(x) for x in seq ] else: order = [ _scalar(x) for x in flat ] if order: out["turns"] = {"order": order} # ---- phases ---- 兼容 *_phase 键名 phases = _as_list(game.get("phases")) phase_names: List[str] = [] if phases: for sub in phases: xs = _as_list(sub) if isinstance(sub, list) else [sub] for x in xs: s = _scalar(x) if s: phase_names.append(s) elif isinstance(x, dict) and len(x)==1: k = next(iter(x.keys())) if isinstance(k, str) and k.endswith("_phase"): # 修改:将 phase 映射为标准名称 standard_name = k.replace("_phase", "") phase_names.append(standard_name) out["phases"] = phase_names # Ensure 'phases' key is set in output # ---- deck info ---- deck_info = _as_dict(game.get("deck")) if deck_info: # Extract deck type, shuffling, deal_pattern deck_type = _scalar(deck_info.get("_") or deck_info.get("type") or "Standard54") # Default to Standard54 shuffling = _scalar(deck_info.get("shuffling")) deal_pattern = _scalar(deck_info.get("deal_pattern")) out["game"]["deck"] = { "type": deck_type, "shuffling": shuffling, "deal_pattern": deal_pattern } # ---- setup details ---- setup_info = _as_dict(game.get("setup")) if setup_info: # Extract zones, card_relations, deal count zones_info = _as_dict(setup_info.get("zones")) card_relations_info = _as_dict(setup_info.get("card_relations")) deal_count = setup_info.get("deal") if zones_info: zone_defs = [] if "hand" in zones_info: zone_defs.append({"type": "hand", **_as_dict(zones_info.get("hand"))}) if "field" in zones_info: zone_defs.append({"type": "field", **_as_dict(zones_info.get("field"))}) if "discard_pile" in zones_info: zone_defs.append({"type": "discard_pile", **_as_dict(zones_info.get("discard_pile"))}) if "main_deck" in zones_info: zone_defs.append({"type": "main_deck", **_as_dict(zones_info.get("main_deck"))}) if "special_deck" in zones_info: # Handle special_deck which can be a list or dict sd = zones_info.get("special_deck") if isinstance(sd, dict): # If it's a dict like { (UNO_Cards) ... } # We need to extract the name and details # The structure is likely {"UNO_Cards": {details}} for name, details in sd.items(): if isinstance(details, dict): zone_defs.append({"type": "special_deck", "name": name, **details}) else: zone_defs.append({"type": "special_deck", "name": name, "initial_cards": details}) elif isinstance(sd, list): # If it's a list, handle each element for item in sd: if isinstance(item, dict) and len(item) == 1: name, details = next(iter(item.items())) zone_defs.append({"type": "special_deck", "name": name, **_as_dict(details)}) out["zones"] = zone_defs # Ensure 'zones' key is set in output if card_relations_info: out["card_relations"] = card_relations_info if deal_count is not None: out["setup"] = {"deal": deal_count} # ---- combinations ---- combinations_info = _as_dict(game.get("combinations")) if combinations_info: # Extract custom_combinations custom_combs = [] if "custom_combination" in combinations_info: raw_customs = _as_list(combinations_info.get("custom_combination")) for cc in raw_customs: cc_dict = _as_dict(cc) if "_" in cc_dict and isinstance(cc_dict["_"], list) and cc_dict["_"]: name = _scalar(cc_dict["_"][0]) spec = {k: v for k, v in cc_dict.items() if k != "_"} custom_combs.append({"name": name, "spec": spec}) elif "name" in cc_dict: name = _scalar(cc_dict.get("name")) spec = {k: v for k, v in cc_dict.items() if k != "name"} custom_combs.append({"name": name, "spec": spec}) out["combinations"] = {"custom": custom_combs} # Ensure 'combinations' key is set in output with custom part # ---- mechanics ---- # 如果有原始 GDL 文本,尝试直接从文本中收集 mechanics if gdl_text: try: out["special_mechanics"] = _collect_mechanics_from_raw_sexpr(gdl_text) if not out["special_mechanics"]: # 如果新方法没有找到,回退到旧方法 out["special_mechanics"] = _collect_mechanics({"game": game}) except: # 如果新方法出错,回退到旧方法 out["special_mechanics"] = _collect_mechanics({"game": game}) else: # 没有原始文本,使用旧方法 out["special_mechanics"] = _collect_mechanics({"game": game}) # ---- 提取 actions(可能在 setup 中)---- actions = None if "actions" in game: actions = deepcopy(game["actions"]) elif "setup" in game: setup = _as_list(game["setup"]) for item in setup: if isinstance(item, dict) and "actions" in item: actions = deepcopy(item["actions"]) break if actions: out["actions"] = actions # ---- 保留原始信息(由 mapper 兜底)---- for k in ("visibility","invariants","scoring","extensions"): if k in p: out[k] = deepcopy(p[k]) elif k in game: out[k] = deepcopy(game[k]) return out