|
|
|
|
|
|
|
|
from __future__ import annotations |
|
|
|
|
|
from typing import Any, Dict |
|
|
import copy |
|
|
import traceback |
|
|
from .terminals import DSLTerminal |
|
|
|
|
|
class SimulationLayer: |
|
|
def simulate(self, match_result: Any, decomposition: Any, mined_knowledge: Dict[str, Any] = None) -> Dict[str, Any]: |
|
|
terminal = DSLTerminal() |
|
|
|
|
|
if not mined_knowledge: |
|
|
mined_knowledge = {} |
|
|
|
|
|
query_text = mined_knowledge.get("QUERY_TEXT") or getattr(decomposition, "text_part", "") or "" |
|
|
ops_list = mined_knowledge.get("OPS") or [ |
|
|
{"op": "EXTRACT_CLAIMS", "args": {}}, |
|
|
{"op": "REGISTER_DEFS", "args": {"source": "mined"}}, |
|
|
{"op": "LEXICAL_MATCH", "args": {}}, |
|
|
{"op": "VERIFY_WITH_RULESET", "args": {}}, |
|
|
{"op": "VERIFY_NECESSITY", "args": {}}, |
|
|
{"op": "SOLVE_CHOICE", "args": {}}, |
|
|
] |
|
|
|
|
|
|
|
|
mined_rules = mined_knowledge.get("MINED_RULES", []) |
|
|
mined_definitions = mined_knowledge.get("DEFINITIONS", {}) |
|
|
|
|
|
session_vars = { |
|
|
"choices": getattr(decomposition, "slots", {}).get("choices", {}), |
|
|
"frame_props": getattr(decomposition, "slots", {}).get("frame_props", []), |
|
|
"refute_mode": getattr(decomposition, "slots", {}).get("refute_mode", False), |
|
|
"need_exactness": getattr(decomposition, "slots", {}).get("need_exactness", False), |
|
|
"intent_candidates": getattr(decomposition, "slots", {}).get("intent_candidates", []), |
|
|
"entity": getattr(decomposition, "slots", {}).get("entity", ""), |
|
|
"mined_rules": mined_rules, |
|
|
"mined_definitions": mined_definitions, |
|
|
"query_text": query_text, |
|
|
"claims": [], |
|
|
"verified_map": [], |
|
|
"rule_hits": [], |
|
|
"refutation_hits": [], |
|
|
"evidence_count": 0, |
|
|
} |
|
|
|
|
|
steps_log = [] |
|
|
verified_results = [] |
|
|
evidence_count = 0 |
|
|
|
|
|
|
|
|
attainment = { |
|
|
"total_ops": [op.get("op", "UNKNOWN") for op in ops_list], |
|
|
"completed": [], |
|
|
"failed_at": None, |
|
|
"error": None, |
|
|
"traceback": None, |
|
|
"snapshots": {}, |
|
|
} |
|
|
|
|
|
print(f"⚙️ [SIM] OPS List: {[op.get('op') for op in ops_list]}") |
|
|
|
|
|
def snapshot_state(op_name: str, res: Dict[str, Any] | None): |
|
|
""" |
|
|
UIに出したい「直前までの状態」を最小限で保存 |
|
|
""" |
|
|
attainment["snapshots"][op_name] = { |
|
|
"op": op_name, |
|
|
"verdict": (res or {}).get("verdict"), |
|
|
"result": (res or {}).get("result"), |
|
|
"frame_props": list(session_vars.get("frame_props", [])), |
|
|
"choices": list((session_vars.get("choices") or {}).keys()), |
|
|
"registered_defs_count": len(session_vars.get("mined_definitions") or {}), |
|
|
"claims_count": len(session_vars.get("claims") or []), |
|
|
"rule_hits_count": len(session_vars.get("rule_hits") or []), |
|
|
"refutation_hits_count": len(session_vars.get("refutation_hits") or []), |
|
|
"evidence_count": evidence_count, |
|
|
} |
|
|
|
|
|
final_verdict = "insufficient_evidence" |
|
|
|
|
|
for op_data in ops_list: |
|
|
op_name = op_data.get("op", "UNKNOWN") |
|
|
print(f"⚙️ [SIM] Op: {op_name}") |
|
|
try: |
|
|
|
|
|
if op_name.upper() == "SOLVE_CHOICE": |
|
|
op_data.setdefault("args", {}) |
|
|
op_data["args"]["choices"] = session_vars["choices"] |
|
|
op_data["args"]["question"] = query_text |
|
|
|
|
|
session_vars["evidence_count"] = evidence_count |
|
|
|
|
|
res = terminal.execute_op(op_data, session_vars) |
|
|
if res is None: |
|
|
res = {"verdict": "rejected", "error": "None result"} |
|
|
|
|
|
|
|
|
if op_name.upper() in ["SOLVE_CHOICE", "GENERATE_EXPLANATION", "APPLY_SCHEMA"]: |
|
|
final_verdict = res.get("verdict", "insufficient_evidence") |
|
|
|
|
|
|
|
|
if "claims" in res: |
|
|
session_vars["claims"] = res["claims"] |
|
|
if "verified_map" in res: |
|
|
session_vars["verified_map"] = res["verified_map"] |
|
|
if "rule_hits" in res: |
|
|
session_vars["rule_hits"].extend(res["rule_hits"]) |
|
|
if "refutation_hits" in res: |
|
|
session_vars["refutation_hits"].extend(res["refutation_hits"]) |
|
|
|
|
|
|
|
|
verdict = res.get("verdict") |
|
|
result_txt = res.get("result", "") |
|
|
|
|
|
if verdict == "accepted": |
|
|
steps_log.append(f"✅ PASS: {op_name} -> {result_txt}") |
|
|
print(f" → Verdict: {verdict}") |
|
|
print(f" → Result: {result_txt}") |
|
|
verified_results.append(result_txt) |
|
|
|
|
|
if res.get("verified"): |
|
|
added = int(res.get("count", 1)) |
|
|
evidence_count += added |
|
|
steps_log.append(f" ⭐ Logic Evidence +{added} (Total: {evidence_count})") |
|
|
print(f" → Evidence added: +{added} (Total: {evidence_count})") |
|
|
|
|
|
elif verdict == "insufficient_evidence": |
|
|
reason = res.get('reason', result_txt) |
|
|
steps_log.append(f"⚠️ WAIT: {op_name} -> {reason}") |
|
|
print(f" → Verdict: {verdict}") |
|
|
print(f" → Reason: {reason}") |
|
|
|
|
|
else: |
|
|
reason = res.get('error') or res.get('reason') or "Unknown error" |
|
|
steps_log.append(f"❌ FAIL: {op_name} ({reason})") |
|
|
print(f" → Verdict: {verdict} (FAIL)") |
|
|
print(f" → Reason: {reason}") |
|
|
|
|
|
|
|
|
attainment["completed"].append(op_name) |
|
|
snapshot_state(op_name, res) |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
attainment["failed_at"] = op_name |
|
|
attainment["error"] = str(e) |
|
|
attainment["traceback"] = traceback.format_exc(limit=2) |
|
|
|
|
|
print(f"🔥 [SIM] ERROR at {op_name}: {e}") |
|
|
print(traceback.format_exc()) |
|
|
|
|
|
|
|
|
snapshot_state(op_name, None) |
|
|
|
|
|
|
|
|
steps_log.append(f"❌ FAIL: {op_name} (logic error: {e})") |
|
|
break |
|
|
|
|
|
content = { |
|
|
"definition": f"生産型推論シミュレーション完了 (Evidence: {evidence_count}).", |
|
|
"logic_check": steps_log, |
|
|
"conclusion": "\n".join(verified_results) if verified_results else "", |
|
|
"summary": f"AXIS-V13.3.x Verified {evidence_count} logic units.", |
|
|
|
|
|
"reasoning_attainment": attainment, |
|
|
|
|
|
"meta": { |
|
|
"proofs_count": len(session_vars.get("rule_hits", [])), |
|
|
"refutations_count": len(session_vars.get("refutation_hits", [])), |
|
|
"evidence_count": evidence_count, |
|
|
"final_verdict": final_verdict |
|
|
} |
|
|
} |
|
|
return content |