GDL2IR_V2 / normalizer_v2.py
Estazz's picture
Upload folder using huggingface_hub
bc46b62 verified
# -*- coding: utf-8 -*-
"""
normalizer_v2.py — 语义标准化(兼容多布局,提取 mechanics 名称/阶段/时机)
"""
from __future__ import annotations
from typing import Any, Dict, List
from copy import deepcopy
def _as_list(x):
if x is None: return []
return x if isinstance(x, list) else [x]
def _as_dict(x):
return x if isinstance(x, dict) else {}
def _scalar(x: Any) -> str:
if isinstance(x, str): return x
if isinstance(x, (int, float)): return str(x)
if isinstance(x, dict):
if "_" in x and isinstance(x["_"], list) and x["_"] and isinstance(x["_"][0], (str,int,float)):
return _scalar(x["_"][0])
if "name" in x and isinstance(x["name"], str): return x["name"]
if len(x)==1:
k, v = next(iter(x.items()))
if v in (None, {}) and isinstance(k, str):
return k
if isinstance(x, list) and x:
return _scalar(x[0])
return ""
def _is_standard_mechanic_structure(node: Any) -> bool:
"""
检查一个节点是否是标准的 {"mechanic": {...}} 结构。
这是 GDL 中 (mechanic ...) 被 _to_obj 解析后的直接结果。
"""
return isinstance(node, dict) and len(node) == 1 and 'mechanic' in node and isinstance(node['mechanic'], dict)
def _extract_mechanics_from_sm(sm_node: Any) -> List[Dict[str, Any]]:
"""
专门从 special_mechanics 的节点中提取 mechanic 定义。
sm_node 是 special_mechanics 键对应的值。
这个函数严格处理 special_mechanics 的内容,避免误判内部结构。
"""
mechanics = []
# sm_node 应该是 {'mechanic': [...]} 或 {'mechanic': {...}} 或 [{'mechanic': {...}}, ...]
sm_dict = _as_dict(sm_node)
if 'mechanic' in sm_dict:
mech_content = sm_dict.get("mechanic")
if isinstance(mech_content, dict):
# Case 1: {"mechanic": {"mechanic_name": {...}, "mechanic_name2": {...}}}
# 这种情况发生在 _to_obj 合并了多个同名 mechanic 时
for name, definition in mech_content.items():
if isinstance(definition, dict):
# 构造标准的 mechanic 结构
mechanic_def = definition.copy()
# 确保有 name 字段
if 'name' not in mechanic_def:
mechanic_def['name'] = name
mechanics.append({"mechanic": mechanic_def})
elif isinstance(mech_content, list):
# Case 2: {"mechanic": [...]}
for item in mech_content:
# item 应该是 {"_": ["Name", ...], ...} 或 {"name": "Name", ...} 这样的结构 inside the mechanic
# Check if item itself is the content of a mechanic (i.e., has 'name' or '_')
if isinstance(item, dict) and ('name' in item or ('_' in item and isinstance(item.get('_'), list) and item['_'] and isinstance(item['_'][0], str))):
# Wrap it in the standard {"mechanic": {...}} format
mechanics.append({"mechanic": item})
elif isinstance(sm_node, list):
# Case 3: [{"mechanic": {...}}, ...] - A list of standard structures
for item in sm_node:
if _is_standard_mechanic_structure(item):
mechanics.append(item) # item is {"mechanic": {...}}
elif _is_standard_mechanic_structure(sm_node):
# Case 4: {"mechanic": {...}} - A single standard structure
mechanics.append(sm_node) # sm_node is {"mechanic": {...}}
# If sm_node is just a dict like {"_": ["Name", ...]}, it's likely an error or edge case from _merge_kv_list_to_dict
# We ignore this case as it shouldn't happen if special_mechanics is structured correctly in GDL.
return mechanics
def _walk_collect_mechs(node, bucket, inside_special_mechanics=False):
"""
递归收集 special_mechanics / actions.special
inside_special_mechanics 标志用于防止在 mechanic 内部结构中递归查找其他 mechanic。
"""
if isinstance(node, dict):
# 专门处理 special_mechanics 键 - 这是主要的 mechanic 来源
if "special_mechanics" in node:
sm_node = node.get("special_mechanics")
# Extract mechanics only from this specific key using the dedicated function
extracted = _extract_mechanics_from_sm(sm_node)
bucket.extend(extracted)
# Mark that we are now processing inside special_mechanics context for recursion
# We don't pass this flag deeper here, as _extract_mechanics_from_sm handles the context.
# 处理 actions.special
elif "actions" in node and not inside_special_mechanics: # Only process actions if not already inside special_mechanics
a = _as_dict(node.get("actions"))
special_actions = _as_list(a.get("special"))
# 检查 special_actions 中是否包含 mechanic 定义 (This is less common than in special_mechanics)
for sa in special_actions:
# sa 可能是 {"name": "...", ...} or {"_": ["Name", ...], ...}
# Check if it matches a basic mechanic-like structure
if isinstance(sa, dict) and ("name" in sa or ("_" in sa and isinstance(sa.get("_"), list) and sa["_"] and isinstance(sa["_"][0], str))):
bucket.append({"mechanic": sa}) # Wrap in standard format
# Recurse into other keys, but avoid recursing into known internal structures of a mechanic
# like transfer_path, visibility_change, etc., which are unlikely to contain top-level mechanics.
# The main risk was from 'phases' -> 'mechanic_conditions', which we handled by using _extract_mechanics_from_sm.
# We can now recurse more safely, but mark if we are inside a mechanic's content.
for k, v in node.items():
if k in ("special_mechanics", "actions"): # Already handled these above
continue
# Do not recurse into known internal structures that are not top-level mechanic containers
if k in ("transfer_path", "visibility_change", "params"): # Add other known internal keys if needed
continue
_walk_collect_mechs(v, bucket, inside_special_mechanics=inside_special_mechanics)
elif isinstance(node, list):
# If the top-level is a list containing special_mechanics, handle it
# Or if it's a list inside a mechanic, recurse carefully
for v in node:
_walk_collect_mechs(v, bucket, inside_special_mechanics)
def _collect_mechanics(parsed: Dict[str, Any]) -> List[Dict[str, Any]]:
items: List[Any] = []
# Start the walk from the top-level game structure
_walk_collect_mechs(parsed, items, inside_special_mechanics=False)
mechs: List[Dict[str, Any]] = []
for i, raw in enumerate(items):
m = _as_dict(raw)
if not m: continue
# --- Process standard structure {"mechanic": {...}} ---
m_content = m
if len(m) == 1 and 'mechanic' in m and isinstance(m['mechanic'], dict):
m_content = m['mechanic']
else:
# If the raw item is not in the expected {"mechanic": {...}} format,
# it means the extraction logic failed or it was collected incorrectly.
# For safety, skip.
continue
# Now m_content holds the actual mechanic definition like {"_": ["Name", ...], ...} or {"name": "Name", ...}
name = _scalar(m_content.get("name") or (m_content.get("_") or [None])[0] or "")
if not name:
name = f"mechanic@{i}"
phase = _scalar(m_content.get("phase") or "playing_phase") # 先拿原词
timing = _scalar(m_content.get("timing") or "")
# 修改:将 phase 映射为标准名称,例如 playing_phase -> playing
standard_phase = phase.replace("_phase", "")
mech = {
"name": name,
"enabled": True if _scalar(m_content.get("enabled")) in ("true","1","yes","True") else True,
"description": _scalar(m_content.get("description")),
"phase": standard_phase, # 使用标准名称
"timing": timing,
"trigger_condition": _scalar(m_content.get("trigger_condition")),
"raw_definition": {
k: v for k, v in m_content.items()
if k not in ("name","enabled","description","phase","timing","trigger_condition")
}
}
mechs.append(mech)
return mechs
def _collect_mechanics_from_sexpr(parsed: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
直接从 S-expression 中收集 mechanics,避免 _to_obj 的键合并问题
"""
from gdl_parser_v2 import parse_sexpr
mechanics = []
def find_and_extract_mechanics(obj, path=''):
if isinstance(obj, dict):
for k, v in obj.items():
if k == 'special_mechanics':
# 找到 special_mechanics,尝试提取其中的 mechanics
if isinstance(v, dict) and 'mechanic' in v:
mech_content = v['mechanic']
if isinstance(mech_content, dict):
# 处理合并后的情况:{"mechanic_name": {...}, "mechanic_name2": {...}}
for name, definition in mech_content.items():
if isinstance(definition, dict):
mechanics.append(_process_mechanic_definition(name, definition))
else:
find_and_extract_mechanics(v, f'{path}.{k}' if path else k)
elif isinstance(obj, list):
for i, item in enumerate(obj):
find_and_extract_mechanics(item, f'{path}[{i}]')
find_and_extract_mechanics(parsed)
return mechanics
def _collect_mechanics_from_raw_sexpr(gdl_text: str) -> List[Dict[str, Any]]:
"""
直接从原始 GDL 文本中收集 mechanics,避免解析器的键合并问题
"""
from gdl_parser_v2 import parse_sexpr
mechanics = []
# 找到 special_mechanics 部分
lines = gdl_text.split('\n')
start_idx = -1
end_idx = -1
paren_count = 0
for i, line in enumerate(lines):
if '(special_mechanics' in line:
start_idx = i
paren_count = line.count('(') - line.count(')')
break
if start_idx != -1:
for i in range(start_idx + 1, len(lines)):
paren_count += lines[i].count('(') - lines[i].count(')')
if paren_count == 0:
end_idx = i
break
if start_idx != -1 and end_idx != -1:
special_mechanics_lines = lines[start_idx:end_idx + 1]
special_mechanics_text = '\n'.join(special_mechanics_lines)
try:
sexpr = parse_sexpr(special_mechanics_text)
if len(sexpr) > 0 and isinstance(sexpr[0], list):
for item in sexpr[0]:
if isinstance(item, list) and len(item) > 0 and item[0] == 'mechanic':
# 这是一个 mechanic 定义
name = item[1] if len(item) > 1 else "unknown"
# 解析 mechanic 的属性
definition = {}
for i in range(2, len(item)):
if isinstance(item[i], list) and len(item[i]) >= 2:
key = item[i][0]
value = item[i][1] if len(item[i]) == 2 else item[i][1:]
definition[key] = value
mechanics.append(_process_mechanic_definition(name, definition))
except Exception as e:
print(f"Error parsing special_mechanics: {e}")
return mechanics
def _process_mechanic_definition(name: str, definition: Dict[str, Any]) -> Dict[str, Any]:
"""处理单个 mechanic 定义"""
phase = _scalar(definition.get("phase") or "playing_phase")
timing = _scalar(definition.get("timing") or "")
standard_phase = phase.replace("_phase", "")
return {
"name": name,
"enabled": True if _scalar(definition.get("enabled")) in ("true","1","yes","True") else True,
"description": _scalar(definition.get("description")),
"phase": standard_phase,
"timing": timing,
"trigger_condition": _scalar(definition.get("trigger_condition")),
"raw_definition": {
k: v for k, v in definition.items()
if k not in ("name","enabled","description","phase","timing","trigger_condition")
}
}
def normalize_ir(parsed: Dict[str, Any], gdl_text: str = None) -> Dict[str, Any]:
p = _as_dict(parsed)
game = _as_dict(p.get("game") or p)
out: Dict[str, Any] = {"game": {}}
# ---- name ----
nm = _scalar(game.get("name") or game.get("_") or "")
out["game"]["name"] = nm or "UnnamedGame"
# ---- players count ----
players_count = game.get("players", 0)
out["game"]["players_count"] = players_count # 为 mapper_v2 提供原始玩家数量
# ---- roles ----
roles: List[Dict[str, Any]] = []
if "roles" in game:
flat = []
for r in _as_list(game["roles"]):
if isinstance(r, list): flat.extend(r)
else: flat.append(r)
for r in flat:
rd = _as_dict(r)
if "role" in rd and isinstance(rd["role"], dict):
u = rd["role"].get("_")
if isinstance(u, list) and u:
name = _scalar(u[0]); cnt = 1
if len(u) >= 2:
try: cnt = int(u[1])
except: cnt = 1
roles.append({"name": name or "Player", "count": max(1, cnt)}); continue
name = _scalar(rd.get("name") or rd.get("role") or "Player")
cnt = rd.get("count", 1)
try: cnt=int(cnt)
except: cnt=1
roles.append({"name": name, "count": max(1,cnt)})
out["game"]["roles"] = roles or [{"name":"Player","count": game.get("players") or 0}]
# ---- turns.order from turn_order ----
order = []
to = _as_list(game.get("turn_order"))
if to:
flat=[]
for elem in to:
if isinstance(elem, list): flat.extend(elem)
else: flat.append(elem)
# {"Landlord":{"_":[...]} }
if flat and isinstance(flat[0], dict) and len(flat[0])==1:
k, v = next(iter(flat[0].items()))
seq = [k]
if isinstance(v, dict) and isinstance(v.get("_"), list): seq.extend(v.get("_"))
order = [ _scalar(x) for x in seq ]
else:
order = [ _scalar(x) for x in flat ]
if order:
out["turns"] = {"order": order}
# ---- phases ---- 兼容 *_phase 键名
phases = _as_list(game.get("phases"))
phase_names: List[str] = []
if phases:
for sub in phases:
xs = _as_list(sub) if isinstance(sub, list) else [sub]
for x in xs:
s = _scalar(x)
if s:
phase_names.append(s)
elif isinstance(x, dict) and len(x)==1:
k = next(iter(x.keys()))
if isinstance(k, str) and k.endswith("_phase"):
# 修改:将 phase 映射为标准名称
standard_name = k.replace("_phase", "")
phase_names.append(standard_name)
out["phases"] = phase_names # Ensure 'phases' key is set in output
# ---- deck info ----
deck_info = _as_dict(game.get("deck"))
if deck_info:
# Extract deck type, shuffling, deal_pattern
deck_type = _scalar(deck_info.get("_") or deck_info.get("type") or "Standard54") # Default to Standard54
shuffling = _scalar(deck_info.get("shuffling"))
deal_pattern = _scalar(deck_info.get("deal_pattern"))
out["game"]["deck"] = {
"type": deck_type,
"shuffling": shuffling,
"deal_pattern": deal_pattern
}
# ---- setup details ----
setup_info = _as_dict(game.get("setup"))
if setup_info:
# Extract zones, card_relations, deal count
zones_info = _as_dict(setup_info.get("zones"))
card_relations_info = _as_dict(setup_info.get("card_relations"))
deal_count = setup_info.get("deal")
if zones_info:
zone_defs = []
if "hand" in zones_info:
zone_defs.append({"type": "hand", **_as_dict(zones_info.get("hand"))})
if "field" in zones_info:
zone_defs.append({"type": "field", **_as_dict(zones_info.get("field"))})
if "discard_pile" in zones_info:
zone_defs.append({"type": "discard_pile", **_as_dict(zones_info.get("discard_pile"))})
if "main_deck" in zones_info:
zone_defs.append({"type": "main_deck", **_as_dict(zones_info.get("main_deck"))})
if "special_deck" in zones_info:
# Handle special_deck which can be a list or dict
sd = zones_info.get("special_deck")
if isinstance(sd, dict):
# If it's a dict like { (UNO_Cards) ... }
# We need to extract the name and details
# The structure is likely {"UNO_Cards": {details}}
for name, details in sd.items():
if isinstance(details, dict):
zone_defs.append({"type": "special_deck", "name": name, **details})
else:
zone_defs.append({"type": "special_deck", "name": name, "initial_cards": details})
elif isinstance(sd, list):
# If it's a list, handle each element
for item in sd:
if isinstance(item, dict) and len(item) == 1:
name, details = next(iter(item.items()))
zone_defs.append({"type": "special_deck", "name": name, **_as_dict(details)})
out["zones"] = zone_defs # Ensure 'zones' key is set in output
if card_relations_info:
out["card_relations"] = card_relations_info
if deal_count is not None:
out["setup"] = {"deal": deal_count}
# ---- combinations ----
combinations_info = _as_dict(game.get("combinations"))
if combinations_info:
# Extract custom_combinations
custom_combs = []
if "custom_combination" in combinations_info:
raw_customs = _as_list(combinations_info.get("custom_combination"))
for cc in raw_customs:
cc_dict = _as_dict(cc)
if "_" in cc_dict and isinstance(cc_dict["_"], list) and cc_dict["_"]:
name = _scalar(cc_dict["_"][0])
spec = {k: v for k, v in cc_dict.items() if k != "_"}
custom_combs.append({"name": name, "spec": spec})
elif "name" in cc_dict:
name = _scalar(cc_dict.get("name"))
spec = {k: v for k, v in cc_dict.items() if k != "name"}
custom_combs.append({"name": name, "spec": spec})
out["combinations"] = {"custom": custom_combs} # Ensure 'combinations' key is set in output with custom part
# ---- mechanics ----
# 如果有原始 GDL 文本,尝试直接从文本中收集 mechanics
if gdl_text:
try:
out["special_mechanics"] = _collect_mechanics_from_raw_sexpr(gdl_text)
if not out["special_mechanics"]:
# 如果新方法没有找到,回退到旧方法
out["special_mechanics"] = _collect_mechanics({"game": game})
except:
# 如果新方法出错,回退到旧方法
out["special_mechanics"] = _collect_mechanics({"game": game})
else:
# 没有原始文本,使用旧方法
out["special_mechanics"] = _collect_mechanics({"game": game})
# ---- 提取 actions(可能在 setup 中)----
actions = None
if "actions" in game:
actions = deepcopy(game["actions"])
elif "setup" in game:
setup = _as_list(game["setup"])
for item in setup:
if isinstance(item, dict) and "actions" in item:
actions = deepcopy(item["actions"])
break
if actions:
out["actions"] = actions
# ---- 保留原始信息(由 mapper 兜底)----
for k in ("visibility","invariants","scoring","extensions"):
if k in p: out[k] = deepcopy(p[k])
elif k in game: out[k] = deepcopy(game[k])
return out