File size: 35,646 Bytes
bc46b62 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 | # -*- coding: utf-8 -*-
"""
mapper_v2.py — 将 normalizer_v2 输出映射为 v0.95 IR(schema 友好;兼容 UNO/七夕)
"""
from __future__ import annotations
from typing import Any, Dict, List
from copy import deepcopy
import re
REQUIRED = ["meta","players","cards","zones","visibility","combinations","comparison","actions","phases","turns","ending","scoring"]
ZONE_RE = re.compile(r"^(hand:[A-Za-z0-9_:-]+|field|main_deck|discard_pile|reserve_zone|public_pool|item_deck:[A-Za-z0-9_:-]+|history_deck)$")
def _as_dict(x): return x if isinstance(x,dict) else {}
def _as_list(x): return x if isinstance(x,list) else ([] if x is None else [x])
def _scalar(x):
if isinstance(x,str): return x
if isinstance(x,(int,float)): return str(x)
if isinstance(x,dict):
if "_" in x and isinstance(x["_"], list) and x["_"]: return _scalar(x["_"][0])
if "name" in x and isinstance(x["name"], str): return x["name"]
if len(x)==1:
k,v = next(iter(x.items()))
if v in (None,{}) and isinstance(k,str): return k
if isinstance(x,list) and x: return _scalar(x[0])
return ""
def _uniquify(seq: List[str]) -> List[str]:
seen={}; out=[]
for s in map(str, seq):
if s not in seen:
seen[s]=0; out.append(s)
else:
seen[s]+=1; out.append(f"{s}_{seen[s]}")
return out
def _phase_to_schema(s: str) -> str:
# Mapping GDL phase names to schema phase names
mp={
"deal":"setup", # GDL 'deal' often maps to setup for initialization tasks, but can also be a separate 'deal' phase
"setup":"setup",
"bid":"bidding",
"bidding":"bidding",
"double":"doubling",
"doubling":"doubling",
"initiative":"setup", # Map GDL's initiative_phase to schema's setup
"grouping":"grouping",
"play":"playing",
"playing":"playing",
"playing_phase":"play", # Map GDL's playing_phase to schema's play
"settle":"settlement",
"settlement":"settlement",
"settlement_phase":"settlement"
}
return mp.get(s, s)
def _derive_players(nz: Dict[str,Any]) -> Dict[str,Any]:
roles=_as_list(_as_dict(nz.get("game")).get("roles"))
order=_as_dict(nz.get("turns")).get("order") or []
ids = _uniquify([_scalar(x) for x in order]) if order else []
if not ids and roles:
tmp=[]
for r in roles:
rd=_as_dict(r)
name=_scalar(rd.get("name") or rd.get("role") or "Player")
cnt=int(rd.get("count",1) or 1)
tmp += [name]*max(1,cnt)
ids=_uniquify(tmp)
if not ids:
ids=["P0","P1","P2"]
# --- Modification: Create player instances based on roles and counts, and store in roles.players ---
# Schema v0.95 expects count and roles list with name, count, and optionally players.
out_roles = []
current_id_idx = 0
for r in roles:
rd = _as_dict(r)
role_name = _scalar(rd.get("name") or "Player")
role_count = int(rd.get("count", 1) or 1)
# Generate unique player IDs for this role
role_players = []
for i in range(role_count):
if current_id_idx < len(ids):
player_id = ids[current_id_idx]
role_players.append(player_id)
current_id_idx += 1
else:
# Fallback if ids list is shorter than expected
player_id = f"{role_name}_{i}" if i > 0 else role_name
role_players.append(player_id)
# Add the role definition (name, count, players) to the output roles list
out_roles.append({"name": role_name, "count": role_count, "players": role_players})
# The actual player IDs are used for turns.order and zone generation
return {"count": len(ids), "roles": out_roles, "_player_ids": ids} # Removed _role_definitions as not needed for schema
def _expand_zones(zs: List[Dict[str, Any]], pids: List[str]) -> List[str]:
out=[]
zone_types_map = {
"hand": "hand:{pid}",
"field": "field",
"discard_pile": "discard_pile",
"main_deck": "main_deck",
"special_deck": "item_deck:{name}" # Map special_deck to item_deck:<name>
}
for z_def in zs or []:
z_type = z_def.get("type")
if z_type in zone_types_map:
template = zone_types_map[z_type]
if z_type == "hand":
# Expand hand for each player ID
for pid in pids:
out.append(template.format(pid=pid))
elif z_type == "special_deck":
# Use the name from the definition
name = z_def.get("name", "Unknown")
out.append(template.format(name=name))
else:
# Use the template directly for other types
out.append(template)
# Add common base zones if not already present
base = ["field","discard_pile","main_deck"]
for b in base:
if b not in out: out.append(b)
# Remove duplicates while preserving order
seen=set(); uniq=[]
for z in out:
if z not in seen:
seen.add(z); uniq.append(z)
return uniq
def _expand_vis_hand_star(vis: Dict[str,Any], pids: List[str]) -> Dict[str,Any]:
vis=_as_dict(vis); vis.setdefault("defaults", {}); by=_as_dict(vis.get("by_zone"))
if "hand:*" in by:
cfg=by.pop("hand:*")
for pid in pids: by[f"hand:{pid}"]=cfg
vis["by_zone"]=by; return vis
def _ensure_cards(nz: Dict[str,Any]) -> Dict[str,Any]:
cards=_as_dict(nz.get("cards"))
ranks=_as_list(cards.get("ranks")); suits=_as_list(cards.get("suits"))
if not ranks or not suits:
# --- Modification: Derive from card_relations if available ---
card_rels = _as_dict(nz.get("card_relations"))
if card_rels:
# Derive from card_values
card_vals = _as_list(card_rels.get("card_values"))
# Map common symbols to integers
symbol_map = {
"J": 11, "Q": 12, "K": 13, "A": 14, "2": 15
}
ranks = []
for val in card_vals:
if isinstance(val, int):
ranks.append(val)
elif isinstance(val, str) and val in symbol_map:
ranks.append(symbol_map[val])
else:
ranks.append(_scalar(val)) # Fallback
# Derive from suit_relations
suit_rels = _as_dict(card_rels.get("suit_relations"))
suit_order = _as_list(suit_rels.get("order"))
suits = [s for s in suit_order if isinstance(s, str)] or ["Spade","Heart","Club","Diamond"]
# Determine jokers and suitless_ranks
jokers = {}
suitless_ranks = []
# Standard54 implies 2 jokers
if "2" in card_vals or 15 in ranks: # Assuming '2' is always high and joker-like
# Standard54 has small joker 16, big joker 17
# In our mapping, if 2 is last, it might be 15. Let's check.
# Actually, Standard54 usually has fixed values for jokers regardless of card_values order.
# Let's assume standard mapping for Standard54 deck type.
deck_info = _as_dict(_as_dict(nz.get("game")).get("deck"))
if deck_info.get("type") == "Standard54":
jokers = {"small": 16, "big": 17}
suitless_ranks = [16, 17]
# Add jokers to ranks if not already present
if 16 not in ranks: ranks.append(16)
if 17 not in ranks: ranks.append(17)
# --- Modification: Ensure suitless_ranks, copies_per_deck, num_decks are included ---
cards = {
"ranks": sorted(ranks),
"suits": suits,
"jokers": jokers,
"suitless_ranks": suitless_ranks,
"copies_per_deck": 1, # Assuming 1 copy for Standard54
"num_decks": 1 # Assuming 1 deck
}
else:
# Fallback to standard 54
ranks=list(range(3,18)) # 3..A(14), 小王16, 大王17
suits=["Spade","Heart","Club","Diamond"]
cards={"ranks":ranks, "suits":suits, "jokers":{"small":16,"big":17}}
else:
# If ranks and suits were already provided, ensure the missing fields are added if possible
# This path might be taken if the initial cards dict had some info but not all.
# We can infer suitless_ranks from jokers, and assume defaults for copies/num_decks if not present.
if "jokers" in cards and "suitless_ranks" not in cards:
jokers = cards.get("jokers", {})
small_j = jokers.get("small")
big_j = jokers.get("big")
suitless_ranks = [r for r in [small_j, big_j] if r is not None]
cards["suitless_ranks"] = suitless_ranks
if "copies_per_deck" not in cards:
cards["copies_per_deck"] = 1
if "num_decks" not in cards:
cards["num_decks"] = 1
return cards
def _ensure_combinations(nz: Dict[str,Any]) -> Dict[str,Any]:
cmb=_as_dict(nz.get("combinations"))
out={"single":{}, "pair":{}, "triple":{}, "straight":{}, "pairs_chain":{}, "airplane":{},
"triple_with_single":{}, "triple_with_pair":{}, "four_with_twoSingles":{}, "four_with_twoPairs":{}, "bomb":{}, "rocket":{}, "custom":[]}
# --- Modification: Handle combinations from GDL ---
# Get raw combinations from normalizer
raw_combinations = _as_dict(nz.get("combinations", {}))
# Handle straight and bomb with parameters
if "straight" in nz.get("game", {}).get("combinations", {}):
# Look for the argument, e.g., (straight 5)
gdl_combinations = _as_dict(nz.get("game", {}).get("combinations", {}))
if isinstance(gdl_combinations.get("straight"), (int, list)):
val = gdl_combinations.get("straight")
if isinstance(val, list) and val:
val = val[0] # Get first argument if list
out["straight"] = {"min_len": int(val)}
else:
out["straight"] = {} # Fallback if no arg found in GDL
if "bomb" in nz.get("game", {}).get("combinations", {}):
gdl_combinations = _as_dict(nz.get("game", {}).get("combinations", {}))
if isinstance(gdl_combinations.get("bomb"), (int, list)):
val = gdl_combinations.get("bomb")
if isinstance(val, list) and val:
val = val[0]
out["bomb"] = {"len": int(val)}
else:
out["bomb"] = {} # Fallback if no arg found in GDL
# Handle custom combinations from normalizer
if "custom" in raw_combinations and isinstance(raw_combinations["custom"], list):
out["custom"] = raw_combinations["custom"]
else:
# Fallback if normalizer didn't capture them correctly
out["custom"] = []
# Merge with any explicitly set values in the input cmb dict (e.g., from extensions)
for k in list(out.keys()):
if k in cmb and isinstance(cmb[k], dict):
out[k].update(cmb[k]) # Update with input values if present
return out
def _ensure_actions(nz: Dict[str,Any], pids: List[str] = None) -> Dict[str,Any]:
actions=_as_dict(nz.get("actions"))
pids = pids or []
# 解析 play 动作
play_actions = []
if "play" in actions:
play_def = actions["play"]
if isinstance(play_def, list):
# 解析器把 play 分解成了多个字典,需要合并成一个完整的 play action
play_action = _merge_play_actions(play_def, nz)
if play_action:
# 修复transfer_path中的zone ID
if "transfer_path" in play_action:
play_action["transfer_path"] = _fix_zone_ids_in_transfer_path(
play_action["transfer_path"], pids)
play_actions.append(play_action)
elif isinstance(play_def, dict):
# 单个 play 定义
play_action = _parse_play_action(play_def)
if "transfer_path" in play_action:
play_action["transfer_path"] = _fix_zone_ids_in_transfer_path(
play_action["transfer_path"], pids)
play_actions.append(play_action)
# 解析 special 动作
special_actions = []
if "special" in actions:
special_def = actions["special"]
if isinstance(special_def, list):
special_actions = _merge_special_actions(special_def)
elif isinstance(special_def, dict):
special_actions.append(_parse_special_action(special_def))
# 修复special动作中的transfer_path
for action in special_actions:
if "transfer_path" in action:
action["transfer_path"] = _fix_zone_ids_in_transfer_path(
action["transfer_path"], pids)
# 构建输出
output = {
"play": play_actions,
"pass": {"transfer_path": {"from": "field", "to": "field"}},
"cleanup_trick": [{"from": "field", "to": "discard_pile", "count": "all"}],
"other": {}
}
# 如果有 special 动作,添加到 other 中
if special_actions:
# 将special动作转换为Transfer格式
special_transfers = []
for action in special_actions:
if "transfer_path" in action:
transfer = action["transfer_path"].copy()
if "visibility_change" in action:
transfer["visibility_change"] = action["visibility_change"]
special_transfers.append(transfer)
if special_transfers:
output["other"]["special"] = special_transfers
return output
def _merge_play_actions(play_defs: List[Dict[str, Any]], gdl_actions: Dict[str, Any] = None) -> Dict[str, Any]:
"""合并被解析器分解的 play 动作定义"""
merged = {}
raw_def = {}
# 首先尝试从play_defs中提取牌型列表
type_list = []
for play_def in play_defs:
if isinstance(play_def, dict) and "type" in play_def:
type_val = play_def["type"]
if isinstance(type_val, dict):
type_list = _extract_one_of_values(type_val)
break
if type_list:
# 过滤掉"one_of"和其他无效值
valid_types = [t for t in type_list if t not in ["one_of", "int?", "sequence?"]]
if valid_types:
# Schema期望type是单个字符串,不是数组,所以取第一个有效类型
merged["type"] = valid_types[0]
for play_def in play_defs:
if isinstance(play_def, dict):
# 检查是否是特定字段
if "type" in play_def and "type" not in merged:
type_val = play_def["type"]
if isinstance(type_val, dict):
if "one_of" in type_val:
# 处理 one_of 结构,提取所有牌型
type_list = _extract_one_of_values(type_val)
merged["type"] = type_list
else:
merged["type"] = _scalar(type_val)
else:
merged["type"] = _scalar(type_val)
elif "len" in play_def:
# 处理 len 字段,提取简单值
len_val = _extract_simple_value(play_def["len"])
if len_val != "int?":
merged["len"] = len_val
elif "core" in play_def:
# 处理 core 字段,提取简单值
core_val = _extract_simple_value(play_def["core"])
if core_val != "sequence?":
merged["core"] = core_val
elif "wings" in play_def:
# 处理 wings 字段,提取简单值
wings_val = _extract_simple_value(play_def["wings"])
if wings_val != "int?":
merged["wings"] = wings_val
elif "transfer_path" in play_def:
tp = play_def["transfer_path"]
if isinstance(tp, dict):
merged["transfer_path"] = tp
elif isinstance(tp, list):
merged["transfer_path"] = _parse_transfer_path_list(tp)
elif "visibility_change" in play_def:
vc = play_def["visibility_change"]
merged["visibility_change"] = _parse_visibility_change(vc)
else:
# 其他字段保存到 raw_definition
raw_def.update(play_def)
# 如果没有提取到type,设置默认值
if "type" not in merged:
merged["type"] = ["single", "pair", "triple", "straight", "bomb", "rocket"]
# 确保visibility_change有state字段
if "visibility_change" in merged:
vc = merged["visibility_change"]
if "state" not in vc:
vc["state"] = "visible"
# 保存原始定义
if raw_def:
merged["raw_definition"] = raw_def
return merged
def _extract_one_of_values(type_dict: Dict[str, Any]) -> List[str]:
"""从 one_of 结构中提取值列表"""
values = []
def extract_recursive(obj):
if isinstance(obj, dict):
if "_" in obj:
if isinstance(obj["_"], list):
values.extend(obj["_"])
for k, v in obj.items():
if k != "_":
# 如果key不是"_",那么key本身就是一个值
if isinstance(v, dict):
if len(v) == 0:
# 空字典,key就是值
values.append(k)
elif "_" in v:
# 有"_"字段,key是值,然后递归处理"_"
values.append(k)
extract_recursive(v)
else:
# 没有"_"字段,递归处理
extract_recursive(v)
else:
# v不是字典,k是值
values.append(k)
elif isinstance(obj, list):
for item in obj:
extract_recursive(item)
extract_recursive(type_dict)
return values
def _extract_play_types_from_gdl(gdl_actions: Dict[str, Any]) -> List[str]:
"""从GDL actions中提取play动作的牌型列表"""
if "play" not in gdl_actions:
return []
play_def = gdl_actions["play"]
if isinstance(play_def, list):
# 查找type字段
for item in play_def:
if isinstance(item, dict) and "type" in item:
type_val = item["type"]
if isinstance(type_val, dict) and "one_of" in type_val:
return _extract_one_of_values(type_val)
elif isinstance(play_def, dict) and "type" in play_def:
type_val = play_def["type"]
if isinstance(type_val, dict) and "one_of" in type_val:
return _extract_one_of_values(type_val)
# 如果从actions中没找到,尝试从game.actions中查找
game_actions = _as_dict(_as_dict(gdl_actions.get("game", {})).get("actions", {}))
if "play" in game_actions:
play_def = game_actions["play"]
if isinstance(play_def, list):
for item in play_def:
if isinstance(item, dict) and "type" in item:
type_val = item["type"]
if isinstance(type_val, dict) and "one_of" in type_val:
return _extract_one_of_values(type_val)
elif isinstance(play_def, dict) and "type" in play_def:
type_val = play_def["type"]
if isinstance(type_val, dict) and "one_of" in type_val:
return _extract_one_of_values(type_val)
return []
def _extract_simple_value(obj: Any) -> Any:
"""提取简单值,处理嵌套的 {"_": [value]} 结构"""
if isinstance(obj, dict):
if "_" in obj and isinstance(obj["_"], list) and len(obj["_"]) == 1:
return obj["_"][0]
elif len(obj) == 1:
key, value = next(iter(obj.items()))
if isinstance(value, dict) and len(value) == 0:
return key
return _extract_simple_value(value)
return obj
def _parse_visibility_change(vc: Dict[str, Any]) -> Dict[str, Any]:
"""解析visibility_change字段"""
result = {}
# 解析 to 字段
if "to" in vc:
to_val = vc["to"]
if isinstance(to_val, list):
# 提取所有受众
audiences = []
for item in to_val:
if isinstance(item, dict):
# 处理 {"owner": {"_": ["teammates", "enemies"]}} 结构
for key, value in item.items():
if isinstance(value, dict) and "_" in value:
audiences.extend(value["_"])
else:
audiences.append(key)
else:
audiences.append(_scalar(item))
result["to"] = audiences
else:
result["to"] = [_scalar(to_val)]
# 解析 state 字段
if "state" in vc:
state_val = _extract_simple_value(vc["state"])
if state_val != "visible":
result["state"] = state_val
# 解析 on_target 字段
if "on_target" in vc:
on_target_val = _extract_simple_value(vc["on_target"])
if on_target_val != "true":
result["on_target"] = on_target_val
return result
def _parse_play_action(play_def: Dict[str, Any]) -> Dict[str, Any]:
"""解析单个 play 动作定义"""
result = {}
# 解析 type
if "type" in play_def:
type_val = play_def["type"]
if isinstance(type_val, dict) and "one_of" in type_val:
result["type"] = _as_list(type_val["one_of"])
else:
result["type"] = _scalar(type_val)
# 解析其他字段
for key in ["len", "core", "wings"]:
if key in play_def:
result[key] = play_def[key]
# 解析 transfer_path
if "transfer_path" in play_def:
tp = play_def["transfer_path"]
if isinstance(tp, dict):
result["transfer_path"] = tp
elif isinstance(tp, list):
# 处理 from: hand to: field 格式
result["transfer_path"] = _parse_transfer_path_list(tp)
# 解析 visibility_change
if "visibility_change" in play_def:
result["visibility_change"] = play_def["visibility_change"]
# 保存原始定义用于复杂情况
result["raw_definition"] = {k: v for k, v in play_def.items()
if k not in ["type", "len", "core", "wings", "transfer_path", "visibility_change"]}
return result
def _parse_special_action(special_def: Dict[str, Any]) -> Dict[str, Any]:
"""解析 special 动作定义"""
result = {}
# 解析 name
if "name" in special_def:
result["name"] = _scalar(special_def["name"])
# 解析 params
if "params" in special_def:
result["params"] = special_def["params"]
# 解析 transfer_path
if "transfer_path" in special_def:
tp = special_def["transfer_path"]
if isinstance(tp, dict):
result["transfer_path"] = tp
elif isinstance(tp, list):
result["transfer_path"] = _parse_transfer_path_list(tp)
# 解析 visibility_change
if "visibility_change" in special_def:
result["visibility_change"] = _parse_visibility_change(special_def["visibility_change"])
# 保存原始定义
result["raw_definition"] = {k: v for k, v in special_def.items()
if k not in ["name", "params", "transfer_path", "visibility_change"]}
return result
def _merge_special_actions(special_defs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""合并被解析器分解的 special 动作定义"""
if not special_defs:
return []
# 按name分组,合并相同name的动作
grouped = {}
for special_def in special_defs:
if isinstance(special_def, dict):
name = _scalar(special_def.get("name", ""))
if name:
if name not in grouped:
grouped[name] = {}
# 合并字段
for key, value in special_def.items():
if key != "name":
grouped[name][key] = value
else:
# 没有name的,可能是参数或其他字段,需要与前面的动作合并
# 这里简化处理,直接作为独立动作
pass
# 转换为列表
result = []
for name, fields in grouped.items():
action = {"name": name}
action.update(fields)
result.append(_parse_special_action(action))
return result
def _parse_transfer_path_list(tp_list: List[Any]) -> Dict[str, str]:
"""解析 from: hand to: field 格式的 transfer_path"""
result = {}
i = 0
while i < len(tp_list) - 1:
if isinstance(tp_list[i], str) and tp_list[i].endswith(":"):
key = tp_list[i][:-1] # 去掉冒号
result[key] = _scalar(tp_list[i + 1])
i += 2
else:
i += 1
return result
def _fix_zone_ids_in_transfer_path(tp: Dict[str, str], pids: List[str]) -> Dict[str, str]:
"""修复transfer_path中的zone ID,将hand转换为具体的hand:PlayerID"""
if not isinstance(tp, dict):
return tp
result = tp.copy()
# 如果from是"hand",需要转换为具体的hand:PlayerID
if result.get("from") == "hand" and pids:
# 使用第一个玩家ID作为默认值
result["from"] = f"hand:{pids[0]}"
# 如果to是"hand",也需要转换
if result.get("to") == "hand" and pids:
result["to"] = f"hand:{pids[0]}"
return result
def _sanitize_mechanics(mechs: List[Dict[str,Any]]):
for m in mechs or []:
if not isinstance(m,dict): continue
# phase/timing/trigger_condition 缺省
m["phase"]=_phase_to_schema(_scalar(m.get("phase") or "playing"))
m.setdefault("timing", "during_action")
m.setdefault("enabled", True)
if str(m.get("timing") or "").strip() not in ("pre_action","post_action","during_action"):
m["timing"] = "during_action"
m.setdefault("trigger_condition", "always")
raw_def = m.get("raw_definition", {}).copy() # Work on a copy to avoid modifying original input
# --- Modification: Handle transfer_path for instantiation and Schema compliance ---
original_tp = raw_def.get("transfer_path")
if original_tp and isinstance(original_tp, dict):
from_val = original_tp.get("from")
to_val = original_tp.get("to")
# Check if 'from' or 'to' are templates (e.g., 'hand', 'hand:*') or invalid zone names
# If they are, keep the original transfer_path in raw_definition and do NOT create a top-level transfer_path
def _is_template(z):
return isinstance(z, str) and z in ("hand", "hand:*")
def _is_valid_zone(z):
return isinstance(z, str) and bool(ZONE_RE.match(z))
# If from or to is a template, or if from/to are not valid zones according to the schema's ZoneID pattern,
# then the transfer_path cannot be placed at the top level.
if _is_template(from_val) or _is_template(to_val) or not _is_valid_zone(from_val) or not _is_valid_zone(to_val):
# The raw_def already contains the original transfer_path, which is correct.
# Do not create a top-level transfer_path.
# Ensure raw_definition exists and contains the original
# --- Modification: Add note for instantiation ---
m["raw_definition"] = {"transfer_path_note": {"zone_style": "needs_instantiation"}}
# Remove transfer_path from top level if it was accidentally added (shouldn't happen here)
m.pop("transfer_path", None)
else:
# If 'from' and 'to' are concrete and valid zones, move transfer_path to top level
m["transfer_path"] = {"from": from_val, "to": to_val}
# Remove transfer_path from raw_definition if it's valid and concrete
raw_def.pop("transfer_path", None)
if raw_def: # Only keep raw_definition if it has other keys
m["raw_definition"] = raw_def
else:
m["raw_definition"] = {}
else:
# If no original transfer_path, or it's not a dict, just keep raw_def as is
m["raw_definition"] = raw_def
# 清掉 None 值,避免 schema 报错
for k in ("min_players","max_players","usage_limit","visibility_change"):
if m.get(k, "__absent__") is None:
m.pop(k, None)
def _ensure_scoring(ir: Dict[str,Any]) -> Dict[str,Any]:
sc=_as_dict(ir.get("scoring"))
base=sc.get("base")
if not isinstance(base,int) or base<0: sc["base"]=1
return sc
def map_to_v095(nz: Dict[str,Any], gdl_text: str = None) -> Dict[str,Any]:
# --- Modification: Dynamically set description based on game name ---
game_name = _scalar(_as_dict(nz.get("game")).get("name") or "UnnamedGame")
if "UNO" in game_name:
description = "由标准54张牌叠加UNO特殊牌机制的变体。"
elif "奇袭" in game_name:
description = "在传统斗地主基础上加入了【援手】和【换位】两个新机制,增加了农民间的协作和角色变换的可能性"
else:
description = f"由标准54张牌叠加特殊牌机制的变体。" # Generic fallback
# meta
meta={"name": game_name, "version":"v0.95", "origin": "Variant", "seeded": True, "description": description}
# players
players=_derive_players(nz); pids=players.pop("_player_ids", []) # Removed role_defs as not needed
# cards
cards=_ensure_cards(nz)
# zones - Modified to handle zone definitions from setup
zone_defs = _as_list(nz.get("zones", []))
zones=_expand_zones(zone_defs, pids)
# visibility
visibility=_expand_vis_hand_star(_as_dict(nz.get("visibility")), pids)
# combinations
combinations=_ensure_combinations(nz)
# comparison
comparison=_as_dict(nz.get("comparison")) or {"same_type": True, "same_len": True, "bomb_beats_all": True, "rocket_top": True, "tiebreaker": "none"}
# actions
actions=_ensure_actions(nz, pids)
# phases - Modified to handle GDL phase names and map to schema
ph=_as_list(nz.get("phases"))
allowed={"setup","deal","bid","double","initiative","play","settle","grouping"}
# Apply mapping logic using _phase_to_schema
cleaned=[]
for x in ph:
s=str(x).strip()
mapped_s = _phase_to_schema(s)
if mapped_s in allowed and mapped_s not in cleaned:
cleaned.append(mapped_s)
# Ensure 'setup' and 'settle' are present, with 'setup' first and 'settle' last
if "setup" not in cleaned: cleaned=["setup"]+[t for t in cleaned if t!="setup"]
if "settle" not in cleaned: cleaned=[t for t in cleaned if t!="settle"]+["settle"]
# Add 'deal' phase if not present, typically after 'setup'
if "deal" not in cleaned:
setup_idx = -1
for i, p in enumerate(cleaned):
if p == "setup":
setup_idx = i
break
if setup_idx != -1:
cleaned.insert(setup_idx + 1, "deal") # Insert 'deal' after 'setup'
else:
cleaned = ["deal"] + cleaned # If 'setup' is missing, add 'deal' at the beginning
# Add 'play' phase if not present, typically after 'deal'
if "play" not in cleaned:
deal_idx = -1
for i, p in enumerate(cleaned):
if p == "deal":
deal_idx = i
break
if deal_idx != -1:
cleaned.insert(deal_idx + 1, "play") # Insert 'play' after 'deal'
else:
# If 'deal' is also missing, try 'setup'
setup_idx = -1
for i, p in enumerate(cleaned):
if p == "setup":
setup_idx = i
break
if setup_idx != -1:
cleaned.insert(setup_idx + 1, "play") # Insert 'play' after 'setup'
else:
cleaned = ["play"] + cleaned # If neither 'setup' nor 'deal' is present, add 'play' at the beginning
# turns
turns=_as_dict(nz.get("turns")) or {}
# Use the uniquely generated player IDs from _derive_players for order
# If turns.order is empty or not provided, use pids
order=turns.get("order") or pids
# If turns.order was provided but is the original ["Landlord", "Peasant", "Peasant", "Peasant"],
# it should be replaced by the unique pids generated in _derive_players.
# The pids are ["Landlord", "Peasant", "Peasant_1", "Peasant_2"] which is the desired outcome.
# So, we prioritize the pids (which are unique) over the potentially non-unique turns.order.
# However, if turns.order is explicitly set *after* uniquification by normalizer, we should respect it.
# The safest way is to always use pids if they are available and represent the actual unique player turn order.
# Let's assume pids represents the final desired turn order.
# Use the unique player IDs generated by _derive_players
turns["order"]=pids
if not turns.get("leader") and pids: turns["leader"]=pids[0] # Use first unique ID as leader
turns.setdefault("trick_end_on", "two_pass")
# next_leader
ext=_as_dict(nz.get("extensions")); te=turns.get("trick_end_on","two_pass")
if te=="custom":
t_ext=_as_dict(ext.get("trick_end_on"))
next_leader = "last_non_pass" if t_ext.get("mode")=="consecutive_passes" else "fixed"
else:
next_leader = "last_non_pass"
turns.setdefault("next_leader", next_leader)
# ending & scoring
ending=_as_dict(nz.get("ending")) or {"when":"any_hand_empty"}
scoring=_ensure_scoring(nz)
# mechanics
mechs=_as_list(nz.get("special_mechanics")); _sanitize_mechanics(mechs)
# extensions - Add card mappings
extensions=_as_dict(nz.get("extensions"))
extensions["cards_mapping"] = {
"rank_name_to_int": {
"J": 11,
"Q": 12,
"K": 13,
"A": 14,
"2": 15
},
"suit_name_to_carrier": {
"Spade": "Spade",
"Heart": "Heart",
"Club": "Club",
"Diamond": "Diamond"
}
}
ir = {
"meta": meta,
"players": players,
"cards": cards,
"zones": zones,
"visibility": visibility,
"combinations": combinations,
"comparison": comparison,
"actions": actions,
"phases": cleaned,
"turns": turns,
"ending": ending,
"scoring": scoring,
"special_mechanics": mechs,
"extensions": extensions
}
missing=[k for k in REQUIRED if k not in ir]
if missing: raise ValueError(f"Missing required keys after mapping: {missing}")
return ir
|