govon-runtime / scripts /verify_e2e_tool_calling.py
umyunsang's picture
feat: E2E ๋ฒ•๋ฅ  4์นดํ…Œ๊ณ ๋ฆฌ ์‹œ๋‚˜๋ฆฌ์˜ค ์ถ”๊ฐ€ (๋ฏผ์‚ฌ/ํ˜•์‚ฌ/์ง€์‹์žฌ์‚ฐ/ํŒ๋ก€)
201e800 verified
#!/usr/bin/env python3
"""GovOn Native Tool Calling + AdapterRegistry E2E ๊ฒ€์ฆ ์Šคํฌ๋ฆฝํŠธ.
HuggingFace Space์— ๋ฐฐํฌ๋œ govon-runtime ์„œ๋ฒ„์— ๋Œ€ํ•ด
์—์ด์ „ํŠธ ํŒŒ์ดํ”„๋ผ์ธ(ํ”Œ๋ž˜๋„ˆ โ†’ ๋„๊ตฌ ์‹คํ–‰ โ†’ ์–ด๋Œ‘ํ„ฐ ์ „ํ™˜)์„ ๊ฒ€์ฆํ•œ๋‹ค.
์‚ฌ์šฉ๋ฒ•:
GOVON_RUNTIME_URL=https://<space-url>.hf.space python3 scripts/verify_e2e_tool_calling.py
GOVON_RUNTIME_URL=https://<space-url>.hf.space API_KEY=<key> python3 scripts/verify_e2e_tool_calling.py
5-Phase ๊ฒ€์ฆ (16 ์‹œ๋‚˜๋ฆฌ์˜ค):
Phase 1: Infrastructure (hard gate)
1. Health & Profile
2. Base Model Generation
3. Adapter Registry
Phase 2: Agent Pipeline Core
4. Planner Produces Valid Plan
5. Civil LoRA Draft Response
6. Legal LoRA Evidence Augmentation (depends on 5)
6a. Legal LoRA โ€” ๋ฏผ์‚ฌ๋ฒ• (Civil Law)
6b. Legal LoRA โ€” ํ˜•์‚ฌ๋ฒ• (Criminal Law)
6c. Legal LoRA โ€” ์ง€์‹์žฌ์‚ฐ๊ถŒ (IP)
6d. Legal LoRA โ€” ํŒ๋ก€ ํ•ด์„ (Precedent)
7. Task Type Classification
Phase 3: data.go.kr API Tools (soft gate)
8. External API Tool Invocation (4 sub-cases)
Phase 4: Adapter Dynamics
9. Sequential Adapter Switching
10. LoRA ID Consistency
Phase 5: Robustness
11. Empty Query Handling
12. Reject Flow Completeness
13. Concurrent Request Isolation
"""
# stdlib
import asyncio
import json
import logging
import os
import re
import sys
import time
from typing import Any, Optional
from uuid import uuid4
BASE_URL = os.environ.get("GOVON_RUNTIME_URL", "http://localhost:7860").rstrip("/")
API_KEY = os.environ.get("API_KEY")
TIMEOUT = 300 # ์‹œ๋‚˜๋ฆฌ์˜ค๋‹น ์ตœ๋Œ€ ๋Œ€๊ธฐ ์‹œ๊ฐ„ (์ดˆ)
BASE_MODEL = "LGAI-EXAONE/EXAONE-4.0-32B-AWQ"
_TIMESTAMP = time.strftime("%Y%m%d_%H%M%S")
RESULTS_PATH = f"verify_e2e_tool_calling_{_TIMESTAMP}.json"
LOG_PATH = f"verify_e2e_tool_calling_{_TIMESTAMP}.log"
VALID_TOOLS = frozenset(
{
"rag_search",
"api_lookup",
"draft_civil_response",
"append_evidence",
"issue_detector",
"stats_lookup",
"keyword_analyzer",
"demographics_lookup",
}
)
LEGAL_PATTERNS = [
r"์ œ\s*\d+\s*์กฐ",
r"์ œ\s*\d+\s*ํ•ญ",
r"๋ฒ•๋ฅ ",
r"์‹œํ–‰๋ น",
r"์กฐ๋ก€",
r"ํŒ๋ก€",
r"๋Œ€๋ฒ•์›",
r"๋ฒ•",
r"๋ น",
r"๊ทœ์ •",
]
# ---------------------------------------------------------------------------
# ๋กœ๊น… ์„ค์ •: ํ„ฐ๋ฏธ๋„ + ํŒŒ์ผ ๋™์‹œ ๊ธฐ๋ก
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(LOG_PATH, encoding="utf-8"),
],
)
logger = logging.getLogger(__name__)
logger.info(f"๋กœ๊ทธ ํŒŒ์ผ: {LOG_PATH}")
logger.info(f"๊ฒฐ๊ณผ ํŒŒ์ผ: {RESULTS_PATH}")
_results: list[dict] = []
def _save_intermediate_results() -> None:
"""์‹œ๋‚˜๋ฆฌ์˜ค ์™„๋ฃŒ ์‹œ๋งˆ๋‹ค ์ค‘๊ฐ„ ๊ฒฐ๊ณผ๋ฅผ JSON ํŒŒ์ผ์— ์ €์žฅํ•œ๋‹ค."""
output = {
"meta": {
"run_id": _run_id if "_run_id" in dir() else "",
"timestamp_utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"target_url": BASE_URL,
"log_file": LOG_PATH,
"status": "in_progress",
},
"summary": {
"total": len(_results),
"passed": sum(1 for r in _results if r["status"] == "passed"),
"failed": sum(1 for r in _results if r["status"] == "failed"),
"skipped": sum(1 for r in _results if r["status"] == "skipped"),
},
"scenarios": _results,
}
tmp_path = f"{RESULTS_PATH}.tmp"
try:
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
os.replace(tmp_path, RESULTS_PATH)
except Exception as exc:
logger.warning("์ค‘๊ฐ„ ๊ฒฐ๊ณผ ์ €์žฅ ์‹คํŒจ: %s", exc)
_observed_tools: set[str] = set()
_run_id = uuid4().hex
# ---------------------------------------------------------------------------
# HTTP ํด๋ผ์ด์–ธํŠธ ๋ ˆ์ด์–ด (httpx ์šฐ์„ , urllib fallback)
# ---------------------------------------------------------------------------
try:
import httpx
_HTTP_BACKEND = "httpx"
def _build_headers() -> dict:
h = {"Content-Type": "application/json", "Accept": "application/json"}
if API_KEY:
h["X-API-Key"] = API_KEY
return h
async def http_get(path: str, timeout: float = TIMEOUT) -> tuple[int, dict]:
url = BASE_URL + path
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url, headers=_build_headers())
try:
return resp.status_code, resp.json()
except Exception:
return resp.status_code, {"_raw": resp.text[:200]}
async def http_post(path: str, body: dict, timeout: float = TIMEOUT) -> tuple[int, dict]:
url = BASE_URL + path
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.post(url, json=body, headers=_build_headers())
try:
return resp.status_code, resp.json()
except Exception:
return resp.status_code, {"_raw": resp.text[:200]}
async def http_post_sse(
path: str, body: dict, timeout: float = TIMEOUT
) -> tuple[int, list[dict]]:
"""SSE ์ŠคํŠธ๋ฆฌ๋ฐ POST. ์ฒญํฌ๋ฅผ ์ˆ˜์ง‘ํ•˜์—ฌ ํŒŒ์‹ฑ๋œ ์ด๋ฒคํŠธ ๋ชฉ๋ก์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค."""
url = BASE_URL + path
h = _build_headers()
h["Accept"] = "text/event-stream"
events: list[dict] = []
status_code = 0
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream("POST", url, json=body, headers=h) as resp:
status_code = resp.status_code
async for line in resp.aiter_lines():
line = line.strip()
if not line.startswith("data:"):
continue
payload = line[len("data:") :].strip()
if not payload:
continue
try:
events.append(json.loads(payload))
except json.JSONDecodeError:
events.append({"_raw": payload})
return status_code, events
async def http_get_raw(url: str, timeout: float = 10) -> tuple[int, str]:
"""Raw GET for external connectivity checks."""
async with httpx.AsyncClient(timeout=timeout) as client:
resp = await client.get(url)
return resp.status_code, resp.text[:200]
except ImportError:
import urllib.error
import urllib.request
_HTTP_BACKEND = "urllib"
def _build_headers() -> dict:
h = {"Content-Type": "application/json", "Accept": "application/json"}
if API_KEY:
h["X-API-Key"] = API_KEY
return h
async def http_get(path: str, timeout: float = TIMEOUT) -> tuple[int, dict]:
url = BASE_URL + path
req = urllib.request.Request(url, headers=_build_headers(), method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
except urllib.error.HTTPError as e:
return e.code, {}
async def http_post(path: str, body: dict, timeout: float = TIMEOUT) -> tuple[int, dict]:
url = BASE_URL + path
data = json.dumps(body).encode()
req = urllib.request.Request(url, data=data, headers=_build_headers(), method="POST")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, json.loads(r.read().decode())
except urllib.error.HTTPError as e:
return e.code, {}
async def http_post_sse(
path: str, body: dict, timeout: float = TIMEOUT
) -> tuple[int, list[dict]]:
"""urllib fallback: SSE ์ŠคํŠธ๋ฆฌ๋ฐ์„ ๋™๊ธฐ ๋ฐฉ์‹์œผ๋กœ ์ฝ๋Š”๋‹ค."""
url = BASE_URL + path
data = json.dumps(body).encode()
h = _build_headers()
h["Accept"] = "text/event-stream"
req = urllib.request.Request(url, data=data, headers=h, method="POST")
events: list[dict] = []
status_code = 0
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
status_code = r.status
for raw_line in r:
line = raw_line.decode("utf-8", errors="replace").strip()
if not line.startswith("data:"):
continue
payload = line[len("data:") :].strip()
if not payload:
continue
try:
events.append(json.loads(payload))
except json.JSONDecodeError:
events.append({"_raw": payload})
except urllib.error.HTTPError as e:
status_code = e.code
return status_code, events
async def http_get_raw(url: str, timeout: float = 10) -> tuple[int, str]:
"""Raw GET for external connectivity checks."""
req = urllib.request.Request(url, method="GET")
try:
with urllib.request.urlopen(req, timeout=timeout) as r:
return r.status, r.read().decode()[:200]
except urllib.error.HTTPError as e:
return e.code, ""
except Exception:
return 0, ""
# ---------------------------------------------------------------------------
# ๊ฒฐ๊ณผ ๊ธฐ๋ก / ์ถœ๋ ฅ ํ—ฌํผ
# ---------------------------------------------------------------------------
def _record(
scenario_num: int,
name: str,
phase: int,
status: str,
elapsed: float,
attempts: int = 1,
assertions: Optional[list[str]] = None,
warnings: Optional[list[str]] = None,
error: Optional[str] = None,
detail: Optional[Any] = None,
) -> dict:
tag = {"passed": "[PASS]", "failed": "[FAIL]", "skipped": "[SKIP]"}.get(status, "[????]")
suffix = f"({elapsed:.2f}s)"
if status == "passed":
msg = f"{tag} Scenario {scenario_num}: {name} {suffix}"
logger.info(msg)
elif status == "skipped":
msg = f"{tag} Scenario {scenario_num}: {name} โ€” {error or 'skipped'} {suffix}"
logger.warning(msg)
else:
msg = f"{tag} Scenario {scenario_num}: {name} โ€” {error} {suffix}"
logger.error(msg)
if warnings:
for w in warnings:
logger.warning(f" [WARN] {w}")
entry = {
"id": scenario_num,
"name": name,
"phase": phase,
"status": status,
"attempts": attempts,
"elapsed_s": round(elapsed, 3),
"assertions": assertions or [],
"warnings": warnings or [],
"error": error,
"detail": detail,
}
_results.append(entry)
_save_intermediate_results()
return entry
def _session_id(scenario_num: int) -> str:
return f"e2e-{scenario_num}-{uuid4().hex[:8]}"
def _extract_text_from_events(events: list[dict]) -> str:
"""SSE ์ด๋ฒคํŠธ ๋ชฉ๋ก์—์„œ ์ตœ์ข… ํ…์ŠคํŠธ๋ฅผ ์ถ”์ถœํ•œ๋‹ค."""
for ev in reversed(events):
if ev.get("node") == "synthesis" and ev.get("final_text"):
return ev["final_text"]
for ev in reversed(events):
if ev.get("finished") and ev.get("text"):
return ev["text"]
chunks = [ev.get("text", "") or ev.get("final_text", "") for ev in events]
return "".join(c for c in chunks if c)
def _contains_legal_keyword(text: str) -> bool:
return any(re.search(pattern, text) for pattern in LEGAL_PATTERNS)
# ---------------------------------------------------------------------------
# Agent ํ˜ธ์ถœ ํ—ฌํผ: _call_agent_with_approval()
# ---------------------------------------------------------------------------
async def _call_agent_with_approval(
query: str,
session_id: str,
approve: bool = True,
timeout: float = 180,
) -> tuple[bool, str, dict, Optional[str]]:
"""์—์ด์ „ํŠธ SSE ์ŠคํŠธ๋ฆฌ๋ฐ์œผ๋กœ ํ˜ธ์ถœ โ†’ awaiting_approval๊นŒ์ง€ ํŒŒ์‹ฑ โ†’ approve/reject.
Returns: (success, text, metadata_dict, error)
metadata_dict keys: planned_tools, task_type, goal, reason, tool_results, adapter_mode, tool_args
"""
body = {"query": query, "session_id": session_id, "use_rag": False}
meta: dict[str, Any] = {
"planned_tools": [],
"task_type": None,
"goal": None,
"reason": None,
"tool_results": {},
"adapter_mode": None,
"tool_args": {},
}
logger.info("[Agent] ์š”์ฒญ: session=%s, query_len=%d", session_id, len(query))
# --- SSE ์ŠคํŠธ๋ฆฌ๋ฐ ์‹œ๋„ ---
try:
status_code, events = await http_post_sse("/v2/agent/stream", body, timeout=timeout)
logger.info(f"[Agent] SSE ์‘๋‹ต: HTTP {status_code}, events={len(events)}")
if status_code != 200:
raise RuntimeError(f"SSE HTTP {status_code}")
# ๋…ธ๋“œ๋ณ„ ํ๋ฆ„ ๋กœ๊น…
for ev in events:
node = ev.get("node", "?")
st = ev.get("status", "?")
logger.info(f" [SSE] node={node}, status={st}")
# awaiting_approval ๋˜๋Š” __interrupt__ ์ด๋ฒคํŠธ ํƒ์ƒ‰
awaiting = None
for ev in events:
if ev.get("status") == "awaiting_approval" or ev.get("node") == "__interrupt__":
awaiting = ev
break
# ํ”Œ๋ž˜๋„ˆ ๋…ธ๋“œ์—์„œ planned_tools ์ถ”์ถœ (nested approval_request ์šฐ์„ )
ev_approval = ev.get("approval_request", {})
if not isinstance(ev_approval, dict):
ev_approval = {}
if ev_approval.get("planned_tools"):
meta["planned_tools"] = ev_approval["planned_tools"]
elif ev.get("planned_tools"):
meta["planned_tools"] = ev["planned_tools"]
if ev_approval.get("task_type"):
meta["task_type"] = ev_approval["task_type"]
elif ev.get("task_type"):
meta["task_type"] = ev["task_type"]
if ev_approval.get("goal"):
meta["goal"] = ev_approval["goal"]
elif ev.get("goal"):
meta["goal"] = ev["goal"]
if ev_approval.get("reason"):
meta["reason"] = ev_approval["reason"]
elif ev.get("reason"):
meta["reason"] = ev["reason"]
# adapter_mode, tool_args are always top-level
if ev.get("adapter_mode"):
meta["adapter_mode"] = ev["adapter_mode"]
if ev.get("tool_args"):
meta["tool_args"] = ev["tool_args"]
if awaiting:
# awaiting ์ด๋ฒคํŠธ์—์„œ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ถ”์ถœ (nested approval_request ์šฐ์„ )
approval_req = awaiting.get("approval_request", {})
if not isinstance(approval_req, dict):
approval_req = {}
if approval_req.get("planned_tools"):
meta["planned_tools"] = approval_req["planned_tools"]
elif awaiting.get("planned_tools"):
meta["planned_tools"] = awaiting["planned_tools"]
if approval_req.get("task_type"):
meta["task_type"] = approval_req["task_type"]
elif awaiting.get("task_type"):
meta["task_type"] = awaiting["task_type"]
if approval_req.get("goal"):
meta["goal"] = approval_req["goal"]
elif awaiting.get("goal"):
meta["goal"] = awaiting["goal"]
if approval_req.get("reason"):
meta["reason"] = approval_req["reason"]
elif awaiting.get("reason"):
meta["reason"] = awaiting["reason"]
# adapter_mode, tool_args are always top-level
if awaiting.get("adapter_mode"):
meta["adapter_mode"] = awaiting["adapter_mode"]
if awaiting.get("tool_args"):
meta["tool_args"] = awaiting["tool_args"]
thread_id = awaiting.get("thread_id") or session_id
logger.info(" [Approval] planned_tools=%s", meta["planned_tools"])
tool_arg_keys = (
sorted(meta["tool_args"].keys())
if isinstance(meta["tool_args"], dict)
else str(type(meta["tool_args"]).__name__)
)
logger.info(
" [Approval] adapter_mode=%s, tool_arg_keys=%s",
meta["adapter_mode"],
tool_arg_keys,
)
logger.info(
f" [Approval] {'์Šน์ธ' if approve else '๊ฑฐ์ ˆ'} ์š”์ฒญ โ†’ thread_id={thread_id}"
)
# approve/reject
approve_code, approve_resp = await http_post(
f"/v2/agent/approve?thread_id={thread_id}&approved={'true' if approve else 'false'}",
{},
timeout=timeout,
)
if approve_code != 200:
return False, "", meta, f"approve HTTP {approve_code}: {approve_resp}"
# approve ์‘๋‹ต์—์„œ ์ตœ์ข… ํ…์ŠคํŠธ ๋ฐ ๋„๊ตฌ ๊ฒฐ๊ณผ ์ถ”์ถœ
logger.info(f" [Approve] HTTP {approve_code}, status={approve_resp.get('status')}")
final_text = approve_resp.get("text", "") or approve_resp.get("final_text", "") or ""
if approve_resp.get("tool_results"):
meta["tool_results"] = approve_resp["tool_results"]
if approve_resp.get("adapter_mode"):
meta["adapter_mode"] = approve_resp["adapter_mode"]
if approve_resp.get("status") == "rejected":
return True, final_text, meta, None
if final_text:
return True, final_text, meta, None
return False, "", meta, f"approve 200 but text ์—†์Œ: {approve_resp}"
# awaiting ์ด๋ฒคํŠธ ์—†์ด ์ตœ์ข… ํ…์ŠคํŠธ๊ฐ€ ์žˆ๋Š” ๊ฒฝ์šฐ (auto-approve ๋ชจ๋“œ)
text = _extract_text_from_events(events)
# ์ด๋ฒคํŠธ์—์„œ ์ถ”๊ฐ€ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ˆ˜์ง‘ (nested approval_request ์šฐ์„ )
for ev in events:
fallback_req = ev.get("approval_request", {})
if not isinstance(fallback_req, dict):
fallback_req = {}
if not meta["planned_tools"]:
if fallback_req.get("planned_tools"):
meta["planned_tools"] = fallback_req["planned_tools"]
elif ev.get("planned_tools"):
meta["planned_tools"] = ev["planned_tools"]
if not meta.get("task_type"):
if fallback_req.get("task_type"):
meta["task_type"] = fallback_req["task_type"]
elif ev.get("task_type"):
meta["task_type"] = ev["task_type"]
if not meta.get("goal"):
if fallback_req.get("goal"):
meta["goal"] = fallback_req["goal"]
elif ev.get("goal"):
meta["goal"] = ev["goal"]
if not meta.get("reason"):
if fallback_req.get("reason"):
meta["reason"] = fallback_req["reason"]
elif ev.get("reason"):
meta["reason"] = ev["reason"]
if ev.get("tool_results") and not meta["tool_results"]:
meta["tool_results"] = ev["tool_results"]
# adapter_mode, tool_args are always top-level
if ev.get("adapter_mode") and not meta["adapter_mode"]:
meta["adapter_mode"] = ev["adapter_mode"]
if ev.get("tool_args") and not meta["tool_args"]:
meta["tool_args"] = ev["tool_args"]
if text:
return True, text, meta, None
# error ์ด๋ฒคํŠธ ํ™•์ธ
for ev in events:
if ev.get("status") == "error":
return False, "", meta, ev.get("error", "unknown error")
return False, "", meta, f"SSE ์ด๋ฒคํŠธ ์ˆ˜์‹ ํ–ˆ์œผ๋‚˜ text/awaiting ์—†์Œ (events={len(events)})"
except Exception as sse_exc:
logger.warning("SSE stream failed: %s โ€” falling back to REST", sse_exc)
# --- REST fallback: /v2/agent/run ---
try:
status_code, resp = await http_post("/v2/agent/run", body, timeout=timeout)
if status_code != 200:
return False, "", meta, f"REST HTTP {status_code}: {resp}"
if resp.get("planned_tools"):
meta["planned_tools"] = resp["planned_tools"]
if resp.get("task_type"):
meta["task_type"] = resp["task_type"]
if resp.get("adapter_mode"):
meta["adapter_mode"] = resp["adapter_mode"]
if resp.get("tool_args"):
meta["tool_args"] = resp["tool_args"]
if resp.get("status") == "awaiting_approval":
thread_id = resp.get("thread_id") or session_id
approve_code, approve_resp = await http_post(
f"/v2/agent/approve?thread_id={thread_id}&approved={'true' if approve else 'false'}",
{},
timeout=timeout,
)
if approve_code != 200:
return False, "", meta, f"approve HTTP {approve_code}"
final_text = approve_resp.get("text", "") or approve_resp.get("final_text", "") or ""
if approve_resp.get("tool_results"):
meta["tool_results"] = approve_resp["tool_results"]
if approve_resp.get("status") == "rejected":
return True, final_text, meta, None
if final_text:
return True, final_text, meta, None
return False, "", meta, "approve 200 but text ์—†์Œ"
if resp.get("status") == "error":
return False, "", meta, resp.get("error", "agent run error")
text = resp.get("text", "") or resp.get("final_text", "")
if resp.get("tool_results"):
meta["tool_results"] = resp["tool_results"]
if text:
return True, text, meta, None
return False, "", meta, f"text ์—†์Œ, status={resp.get('status')}"
except Exception as exc:
return False, "", meta, str(exc)
# ---------------------------------------------------------------------------
# Phase 1: Infrastructure (hard gate)
# ---------------------------------------------------------------------------
async def scenario1_health_profile() -> dict:
"""Scenario 1: Health & Profile (retry 3x with backoff)."""
backoffs = [5, 10, 20]
attempts = 0
last_error = ""
for attempt_idx in range(3):
attempts += 1
t0 = time.monotonic()
try:
status_code, body = await http_get("/health", timeout=10)
elapsed = time.monotonic() - t0
assertions = []
if status_code != 200:
last_error = f"HTTP {status_code}"
if attempt_idx < 2:
await asyncio.sleep(backoffs[attempt_idx])
continue
return _record(
1,
"Health & Profile",
1,
"failed",
elapsed,
attempts,
assertions=["HTTP 200"],
error=last_error,
detail={"body": body},
)
assertions.append("HTTP 200: OK")
srv_status = body.get("status", "")
if srv_status not in ("ok", "healthy"):
last_error = f"status={srv_status!r}, expected ok/healthy"
if attempt_idx < 2:
await asyncio.sleep(backoffs[attempt_idx])
continue
return _record(
1,
"Health & Profile",
1,
"failed",
elapsed,
attempts,
assertions=assertions,
error=last_error,
detail={"body": body},
)
assertions.append(f"status={srv_status}: OK")
warnings = []
if "model" not in body:
warnings.append("model field not found in /health")
else:
assertions.append(f"model={body['model']}: OK")
if "profile" not in body:
warnings.append("profile field not found in /health")
else:
assertions.append(f"profile={body['profile']}: OK")
return _record(
1,
"Health & Profile",
1,
"passed",
elapsed,
attempts,
assertions=assertions,
warnings=warnings,
detail={
"status": srv_status,
"model": body.get("model"),
"profile": body.get("profile"),
},
)
except Exception as exc:
last_error = str(exc)
if attempt_idx < 2:
await asyncio.sleep(backoffs[attempt_idx])
continue
return _record(
1,
"Health & Profile",
1,
"failed",
time.monotonic() - t0,
attempts,
error=last_error,
)
return _record(1, "Health & Profile", 1, "failed", 0, attempts, error=last_error)
async def scenario2_base_model_generation() -> dict:
"""Scenario 2: Base Model Generation (retry 2x)."""
body_completions = {
"model": BASE_MODEL,
"prompt": "๋Œ€ํ•œ๋ฏผ๊ตญ์˜ ์ˆ˜๋„๋Š”",
"max_tokens": 32,
"temperature": 0.0,
}
last_error = ""
attempts = 0
for attempt_idx in range(2):
attempts += 1
t0 = time.monotonic()
try:
status_code, resp = await http_post("/v1/completions", body_completions, timeout=60)
elapsed = time.monotonic() - t0
if status_code == 200:
choices = resp.get("choices", [])
if choices and choices[0].get("text") is not None:
text = choices[0]["text"]
if text.strip():
return _record(
2,
"Base Model Generation",
1,
"passed",
elapsed,
attempts,
assertions=["HTTP 200", "non-empty text"],
detail={"endpoint": "/v1/completions", "text_preview": text[:100]},
)
# fallback /v1/generate
body_legacy = {
"prompt": "๋Œ€ํ•œ๋ฏผ๊ตญ์˜ ์ˆ˜๋„๋Š”",
"max_tokens": 32,
"temperature": 0.0,
"use_rag": False,
}
status_code2, resp2 = await http_post("/v1/generate", body_legacy, timeout=60)
elapsed2 = time.monotonic() - t0
if status_code2 == 200 and resp2.get("text", "").strip():
return _record(
2,
"Base Model Generation",
1,
"passed",
elapsed2,
attempts,
assertions=["HTTP 200 (fallback)", "non-empty text"],
detail={"endpoint": "/v1/generate", "text_preview": resp2["text"][:100]},
)
last_error = f"/v1/completions HTTP {status_code}, /v1/generate HTTP {status_code2}"
except Exception as exc:
last_error = str(exc)
return _record(
2, "Base Model Generation", 1, "failed", time.monotonic() - t0, attempts, error=last_error
)
async def scenario3_adapter_registry() -> dict:
"""Scenario 3: Adapter Registry via /v1/models."""
t0 = time.monotonic()
try:
status_code, resp = await http_get("/v1/models", timeout=10)
elapsed = time.monotonic() - t0
assertions = []
warnings = []
if status_code != 200:
return _record(
3,
"Adapter Registry",
1,
"passed",
elapsed,
assertions=[],
warnings=[
f"/v1/models HTTP {status_code} โ€” ์—”๋“œํฌ์ธํŠธ ๋ฏธ๋…ธ์ถœ (vLLM ์„ค์ •์— ๋”ฐ๋ผ ์ •์ƒ)"
],
detail={"resp": resp},
)
assertions.append("HTTP 200: OK")
data = resp.get("data", [])
if not isinstance(data, list):
return _record(
3,
"Adapter Registry",
1,
"failed",
elapsed,
assertions=assertions,
error="data array missing or invalid",
detail={"resp": resp},
)
assertions.append(f"data array: {len(data)} models")
model_ids = [m.get("id", "") for m in data]
civil_found = any("civil" in mid for mid in model_ids)
legal_found = any("legal" in mid for mid in model_ids)
if not civil_found:
warnings.append("civil adapter not detected in /v1/models (WARN, not FAIL)")
else:
assertions.append("civil adapter detected")
if not legal_found:
warnings.append("legal adapter not detected in /v1/models (WARN, not FAIL)")
else:
assertions.append("legal adapter detected")
return _record(
3,
"Adapter Registry",
1,
"passed",
elapsed,
assertions=assertions,
warnings=warnings,
detail={"model_ids": model_ids, "civil_found": civil_found, "legal_found": legal_found},
)
except Exception as exc:
return _record(3, "Adapter Registry", 1, "failed", time.monotonic() - t0, error=str(exc))
# ---------------------------------------------------------------------------
# Phase 2: Agent Pipeline Core
# ---------------------------------------------------------------------------
# Scenario 5/6 ๊ณต์œ  ์„ธ์…˜
_scenario5_session_id: Optional[str] = None
_scenario5_passed: bool = False
async def scenario4_planner_valid_plan() -> dict:
"""Scenario 4: Planner Produces Valid Plan (retry 2x)."""
query = "์„œ์šธ์‹œ ๋„๋กœ ํŒŒ์† ๋ฏผ์›์— ๋Œ€ํ•œ ๋‹ต๋ณ€ ์ดˆ์•ˆ์„ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”"
last_error = ""
attempts = 0
for attempt_idx in range(2):
attempts += 1
t0 = time.monotonic()
try:
sid = _session_id(4)
ok, text, meta, err = await _call_agent_with_approval(
query, sid, approve=True, timeout=120
)
elapsed = time.monotonic() - t0
planned = meta.get("planned_tools", [])
if planned:
_observed_tools.update(planned)
assertions = []
if not planned:
last_error = err or "planned_tools ๋น„์–ด์žˆ์Œ"
if attempt_idx < 1:
continue
return _record(
4,
"Planner Produces Valid Plan",
2,
"failed",
elapsed,
attempts,
assertions=["planned_tools non-empty"],
error=last_error,
detail={"meta": meta},
)
assertions.append(f"planned_tools: {planned}")
invalid = [t for t in planned if t not in VALID_TOOLS]
if invalid:
last_error = f"invalid tools: {invalid}"
if attempt_idx < 1:
continue
return _record(
4,
"Planner Produces Valid Plan",
2,
"failed",
elapsed,
attempts,
assertions=assertions,
error=last_error,
detail={"invalid_tools": invalid, "valid": list(VALID_TOOLS)},
)
assertions.append("all tools in VALID_TOOLS whitelist")
return _record(
4,
"Planner Produces Valid Plan",
2,
"passed",
elapsed,
attempts,
assertions=assertions,
detail={"planned_tools": planned, "meta": meta},
)
except Exception as exc:
last_error = str(exc)
return _record(4, "Planner Produces Valid Plan", 2, "failed", 0, attempts, error=last_error)
async def scenario5_civil_lora_draft() -> dict:
"""Scenario 5: Civil LoRA Draft Response (retry 2x)."""
global _scenario5_session_id, _scenario5_passed
query = "์•„ํŒŒํŠธ ์ธต๊ฐ„์†Œ์Œ ๋ฏผ์›์— ๋Œ€ํ•œ ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”"
last_error = ""
attempts = 0
for attempt_idx in range(2):
attempts += 1
t0 = time.monotonic()
try:
sid = _session_id(5)
ok, text, meta, err = await _call_agent_with_approval(
query, sid, approve=True, timeout=180
)
elapsed = time.monotonic() - t0
planned = meta.get("planned_tools", [])
if planned:
_observed_tools.update(planned)
assertions = []
if not ok:
last_error = err or "agent call failed"
if attempt_idx < 1:
continue
return _record(
5,
"Civil LoRA Draft Response",
2,
"failed",
elapsed,
attempts,
assertions=assertions,
error=last_error,
detail={"meta": meta},
)
has_draft = "draft_civil_response" in planned
if has_draft:
assertions.append("draft_civil_response in planned_tools")
else:
assertions.append(f"draft_civil_response NOT in planned_tools ({planned})")
if len(text) >= 50:
assertions.append(f"text length {len(text)} >= 50")
else:
assertions.append(f"text length {len(text)} < 50 (FAIL)")
task_type = meta.get("task_type")
if task_type == "draft_response":
assertions.append("task_type=draft_response")
else:
assertions.append(f"task_type={task_type} (expected draft_response)")
# ํ•ต์‹ฌ ๊ฒ€์ฆ: text >= 50 ์ด๋ฉด PASS (planned_tools์™€ task_type์€ soft ๊ฒ€์ฆ)
passed = len(text) >= 50
if passed:
_scenario5_session_id = sid
_scenario5_passed = True
warnings = []
if not has_draft:
warnings.append("draft_civil_response not in planned_tools")
if task_type != "draft_response":
warnings.append(f"task_type={task_type}, expected draft_response")
if passed:
return _record(
5,
"Civil LoRA Draft Response",
2,
"passed",
elapsed,
attempts,
assertions=assertions,
warnings=warnings,
detail={"text_preview": text[:200], "meta": meta},
)
last_error = "text < 50 chars"
if attempt_idx < 1:
continue
return _record(
5,
"Civil LoRA Draft Response",
2,
"failed",
elapsed,
attempts,
assertions=assertions,
warnings=warnings,
error=last_error,
detail={"text_preview": text[:200], "meta": meta},
)
except Exception as exc:
last_error = str(exc)
return _record(5, "Civil LoRA Draft Response", 2, "failed", 0, attempts, error=last_error)
async def scenario6_legal_lora_evidence() -> dict:
"""Scenario 6: Legal LoRA Evidence Augmentation (depends on Scenario 5)."""
if not _scenario5_passed:
return _record(
6,
"Legal LoRA Evidence Augmentation",
2,
"skipped",
0,
error="Scenario 5 failed โ€” dependency skip",
)
query = "์œ„ ๋‹ต๋ณ€์— ๊ด€๋ จ ๋ฒ•๋ น๊ณผ ํŒ๋ก€ ๊ทผ๊ฑฐ๋ฅผ ์ถ”๊ฐ€ํ•ด์ฃผ์„ธ์š”"
last_error = ""
attempts = 0
for attempt_idx in range(2):
attempts += 1
t0 = time.monotonic()
try:
ok, text, meta, err = await _call_agent_with_approval(
query, _scenario5_session_id, approve=True, timeout=180
)
elapsed = time.monotonic() - t0
planned = meta.get("planned_tools", [])
if planned:
_observed_tools.update(planned)
assertions = []
if not ok:
last_error = err or "agent call failed"
if attempt_idx < 1:
continue
return _record(
6,
"Legal LoRA Evidence Augmentation",
2,
"failed",
elapsed,
attempts,
assertions=assertions,
error=last_error,
detail={"meta": meta},
)
has_evidence = "append_evidence" in planned
if has_evidence:
assertions.append("append_evidence in planned_tools")
else:
assertions.append(f"append_evidence NOT in planned_tools ({planned})")
has_legal = _contains_legal_keyword(text)
matched = [p for p in LEGAL_PATTERNS if re.search(p, text)]
if has_legal:
assertions.append(f"legal patterns found: {matched[:3]}")
else:
assertions.append("no legal patterns found (FAIL)")
warnings = []
if not has_evidence:
warnings.append("append_evidence not in planned_tools")
if has_legal:
return _record(
6,
"Legal LoRA Evidence Augmentation",
2,
"passed",
elapsed,
attempts,
assertions=assertions,
warnings=warnings,
detail={"text_preview": text[:300], "matched_patterns": matched, "meta": meta},
)
last_error = "legal pattern not found in response"
if attempt_idx < 1:
continue
return _record(
6,
"Legal LoRA Evidence Augmentation",
2,
"failed",
elapsed,
attempts,
assertions=assertions,
warnings=warnings,
error=last_error,
detail={"text_preview": text[:300], "meta": meta},
)
except Exception as exc:
last_error = str(exc)
return _record(
6, "Legal LoRA Evidence Augmentation", 2, "failed", 0, attempts, error=last_error
)
# ---------------------------------------------------------------------------
# Legal LoRA ์นดํ…Œ๊ณ ๋ฆฌ๋ณ„ ํŒจํ„ด
# ---------------------------------------------------------------------------
CIVIL_LAW_PATTERNS = [
r"๋ฏผ๋ฒ•",
r"์ œ\s*\d+\s*์กฐ",
r"์ž„๋Œ€์ฐจ",
r"๊ณ„์•ฝ",
r"์†ํ•ด๋ฐฐ์ƒ",
r"์ฑ„๊ถŒ",
r"์ฑ„๋ฌด",
]
CRIMINAL_LAW_PATTERNS = [
r"ํ˜•๋ฒ•",
r"ํ˜•์‚ฌ",
r"์ฒ˜๋ฒŒ",
r"๋ฒŒ๊ธˆ",
r"์ง•์—ญ",
r"๋ณดํ˜ธ๋ฒ•",
r"์ œ\s*\d+\s*์กฐ",
]
IP_PATTERNS = [
r"์ƒํ‘œ๋ฒ•",
r"ํŠนํ—ˆ๋ฒ•",
r"์ €์ž‘๊ถŒ",
r"์ง€์‹์žฌ์‚ฐ",
r"์ œ\s*\d+\s*์กฐ",
r"์นจํ•ด",
]
PRECEDENT_PATTERNS = [
r"๋Œ€๋ฒ•์›",
r"ํŒ๋ก€",
r"ํŒ๊ฒฐ",
r"์„ ๊ณ ",
r"\d{4}\s*[๋‹ค๋‚˜]\s*\d+",
]
async def _legal_category_scenario(
scenario_id: int,
name: str,
civil_query: str,
legal_followup: str,
patterns: list[str],
) -> dict:
"""Legal LoRA ์นดํ…Œ๊ณ ๋ฆฌ๋ณ„ ์‹œ๋‚˜๋ฆฌ์˜ค ๊ณตํ†ต ๋กœ์ง.
1๋‹จ๊ณ„: civil draft ์„ ํ–‰ ์š”์ฒญ (์„ธ์…˜ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ)
2๋‹จ๊ณ„: ๋ฒ•์  ๊ทผ๊ฑฐ ๋ณด๊ฐ• ํ›„์† ์š”์ฒญ
"""
t0 = time.monotonic()
session_id = _session_id(scenario_id)
try:
# Step 1: Civil draft (์„ ํ–‰ ์š”์ฒญ์œผ๋กœ ์„ธ์…˜ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ)
ok_civil, _, _, err_civil = await _call_agent_with_approval(
query=civil_query,
session_id=session_id,
)
if not ok_civil:
elapsed = time.monotonic() - t0
return _record(
scenario_id,
name,
2,
"failed",
elapsed,
error=f"civil ์„ ํ–‰ ์‹คํŒจ: {err_civil}",
)
# Step 2: Legal follow-up (๋ฒ•์  ๊ทผ๊ฑฐ ๋ณด๊ฐ•)
ok, text, meta, err = await _call_agent_with_approval(
query=legal_followup,
session_id=session_id,
)
elapsed = time.monotonic() - t0
if not ok:
return _record(
scenario_id,
name,
2,
"failed",
elapsed,
error=err,
detail={"meta": meta},
)
# ๋ฒ•๋ น ํŒจํ„ด ๋งค์นญ
matched = [p for p in patterns if re.search(p, text)]
has_legal = len(matched) > 0
assertions: list[str] = []
warnings: list[str] = []
planned = meta.get("planned_tools", [])
if planned:
_observed_tools.update(planned)
if "append_evidence" in planned:
assertions.append("append_evidence in planned_tools")
else:
warnings.append("append_evidence not in planned_tools")
if has_legal:
assertions.append(f"๋ฒ•๋ น ํŒจํ„ด ๋ฐœ๊ฒฌ: {matched[:3]}")
else:
warnings.append("๋ฒ•๋ น ํŒจํ„ด ๋ฏธ๋ฐœ๊ฒฌ")
passed = bool(text and len(text) > 30)
return _record(
scenario_id,
name,
2,
"passed" if passed else "failed",
elapsed,
assertions=assertions,
warnings=warnings,
error=None if passed else "์‘๋‹ต ํ…์ŠคํŠธ ๋ถ€์กฑ",
detail={
"text_preview": text[:200] if text else "",
"matched_patterns": matched,
"meta": meta,
},
)
except Exception as exc:
return _record(
scenario_id,
name,
2,
"failed",
time.monotonic() - t0,
error=str(exc),
)
async def scenario6a_legal_civil_law() -> dict:
"""Scenario 6a: Legal LoRA โ€” ๋ฏผ์‚ฌ๋ฒ• ์งˆ์˜."""
return await _legal_category_scenario(
scenario_id=61,
name="Legal LoRA โ€” ๋ฏผ์‚ฌ๋ฒ• (Civil Law)",
civil_query="์ž„๋Œ€์ฐจ ๊ณ„์•ฝ์—์„œ ์ž„๋Œ€์ธ์˜ ์ˆ˜์„ ์˜๋ฌด ๋ฒ”์œ„์™€ ์ž„์ฐจ์ธ์˜ ๊ถŒ๋ฆฌ์— ๋Œ€ํ•ด ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”",
legal_followup="์œ„ ๋‹ต๋ณ€์— ๊ด€๋ จ ๋ฒ•๋ น ์กฐํ•ญ์„ ์ธ์šฉํ•˜์—ฌ ๋ฒ•์  ๊ทผ๊ฑฐ๋ฅผ ๋ณด๊ฐ•ํ•ด์ฃผ์„ธ์š”",
patterns=CIVIL_LAW_PATTERNS,
)
async def scenario6b_legal_criminal_law() -> dict:
"""Scenario 6b: Legal LoRA โ€” ํ˜•์‚ฌ๋ฒ• ์งˆ์˜."""
return await _legal_category_scenario(
scenario_id=62,
name="Legal LoRA โ€” ํ˜•์‚ฌ๋ฒ• (Criminal Law)",
civil_query="๊ฐœ์ธ์ •๋ณด๋ณดํ˜ธ๋ฒ• ์œ„๋ฐ˜ ์‹œ ํ˜•์‚ฌ์ฒ˜๋ฒŒ ๊ธฐ์ค€๊ณผ ๊ด€๋ จ ๋ฒ•๋ฅ  ์กฐํ•ญ์— ๋Œ€ํ•ด ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”",
legal_followup="์œ„ ๋‹ต๋ณ€์— ๊ด€๋ จ ๋ฒ•๋ น ์กฐํ•ญ์„ ์ธ์šฉํ•˜์—ฌ ๋ฒ•์  ๊ทผ๊ฑฐ๋ฅผ ๋ณด๊ฐ•ํ•ด์ฃผ์„ธ์š”",
patterns=CRIMINAL_LAW_PATTERNS,
)
async def scenario6c_legal_ip() -> dict:
"""Scenario 6c: Legal LoRA โ€” ์ง€์‹์žฌ์‚ฐ๊ถŒ ์งˆ์˜."""
return await _legal_category_scenario(
scenario_id=63,
name="Legal LoRA โ€” ์ง€์‹์žฌ์‚ฐ๊ถŒ (IP)",
civil_query="์ƒํ‘œ๊ถŒ ์นจํ•ด ํŒ๋‹จ ๊ธฐ์ค€๊ณผ ๊ตฌ์ œ ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”",
legal_followup="์œ„ ๋‹ต๋ณ€์— ์ƒํ‘œ๋ฒ• ์กฐํ•ญ์„ ์ธ์šฉํ•˜์—ฌ ๋ฒ•์  ๊ทผ๊ฑฐ๋ฅผ ๋ณด๊ฐ•ํ•ด์ฃผ์„ธ์š”",
patterns=IP_PATTERNS,
)
async def scenario6d_legal_precedent() -> dict:
"""Scenario 6d: Legal LoRA โ€” ํŒ๋ก€ ํ•ด์„ ์งˆ์˜."""
return await _legal_category_scenario(
scenario_id=64,
name="Legal LoRA โ€” ํŒ๋ก€ ํ•ด์„ (Precedent)",
civil_query="๊ทผ๋กœ๊ณ„์•ฝ ํ•ด์ง€ ์‹œ ๋ถ€๋‹นํ•ด๊ณ  ์—ฌ๋ถ€๋ฅผ ํŒ๋‹จํ•˜๋Š” ๊ธฐ์ค€์— ๋Œ€ํ•ด ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”",
legal_followup="์œ„ ๋‹ต๋ณ€์— ๋Œ€๋ฒ•์› ํŒ๋ก€์˜ ๊ธฐ์ค€์„ ์ธ์šฉํ•˜์—ฌ ๋ฒ•์  ๊ทผ๊ฑฐ๋ฅผ ๋ณด๊ฐ•ํ•ด์ฃผ์„ธ์š”",
patterns=PRECEDENT_PATTERNS,
)
async def scenario7_task_type_classification() -> dict:
"""Scenario 7: Task Type Classification (at least 2/3 correct)."""
test_cases = [
("๋ฏผ์› ๋‹ต๋ณ€ ์ดˆ์•ˆ์„ ์ž‘์„ฑํ•ด์ค˜", {"draft_response"}),
("๊ด€๋ จ ํ†ต๊ณ„ ๋ฐ์ดํ„ฐ๋ฅผ ์กฐํšŒํ•ด์ค˜", {"stats_query", "lookup_stats"}),
("์ด ๋ฏผ์›์˜ ๊ทผ๊ฑฐ๋ฅผ ๋ณด๊ฐ•ํ•ด์ค˜", {"append_evidence"}),
]
t0 = time.monotonic()
correct = 0
sub_results = []
for query, expected_types in test_cases:
try:
sid = _session_id(7)
ok, text, meta, err = await _call_agent_with_approval(
query, sid, approve=True, timeout=180
)
planned = meta.get("planned_tools", [])
if planned:
_observed_tools.update(planned)
actual_type = meta.get("task_type")
matched = actual_type in expected_types if actual_type else False
if matched:
correct += 1
sub_results.append(
{
"query": query[:30],
"expected": list(expected_types),
"actual": actual_type,
"matched": matched,
"ok": ok,
"error": err,
}
)
except Exception as exc:
sub_results.append(
{
"query": query[:30],
"expected": list(expected_types),
"actual": None,
"matched": False,
"error": str(exc),
}
)
elapsed = time.monotonic() - t0
assertions = [f"{correct}/3 task types correct (need >= 2)"]
if correct >= 2:
return _record(
7,
"Task Type Classification",
2,
"passed",
elapsed,
assertions=assertions,
detail={"sub_results": sub_results, "correct": correct},
)
return _record(
7,
"Task Type Classification",
2,
"failed",
elapsed,
assertions=assertions,
error=f"only {correct}/3 correct (need >= 2)",
detail={"sub_results": sub_results},
)
# ---------------------------------------------------------------------------
# Phase 3: data.go.kr API Tools (soft gate)
# ---------------------------------------------------------------------------
_datago_available: bool = False
async def _check_datago_connectivity() -> bool:
"""data.go.kr ์—ฐ๊ฒฐ ํ™•์ธ preflight."""
global _datago_available
try:
code, _ = await http_get_raw("https://www.data.go.kr", timeout=10)
_datago_available = code in (200, 301, 302, 403)
return _datago_available
except Exception:
_datago_available = False
return False
async def scenario8_external_api_tools() -> dict:
"""Scenario 8: External API Tool Invocation (4 sub-cases, accept 3/4)."""
if not _datago_available:
return _record(
8,
"External API Tool Invocation",
3,
"skipped",
0,
error="data.go.kr unreachable โ€” Phase 3 skipped",
)
sub_cases = [
("8a", "์ตœ๊ทผ ๋„๋กœ ๊ด€๋ จ ๋ฏผ์› ์ด์Šˆ๋ฅผ ๋ถ„์„ํ•ด์ค˜", "issue_detector"),
("8b", "์„œ์šธ์‹œ ๋ฏผ์› ํ†ต๊ณ„๋ฅผ ์กฐํšŒํ•ด์ค˜", "stats_lookup"),
("8c", "๋„๋กœ ๊ด€๋ จ ํ‚ค์›Œ๋“œ ํŠธ๋ Œ๋“œ๋ฅผ ๋ถ„์„ํ•ด์ค˜", "keyword_analyzer"),
("8d", "์„œ์šธ์‹œ ๊ฐ•๋‚จ๊ตฌ ๋ฏผ์› ์ธ๊ตฌํ†ต๊ณ„๋ฅผ ์กฐํšŒํ•ด์ค˜", "demographics_lookup"),
]
t0 = time.monotonic()
sub_passed = 0
sub_results = []
for label, query, expected_tool in sub_cases:
for attempt_idx in range(2): # retry 1x
try:
sid = _session_id(8)
ok, text, meta, err = await _call_agent_with_approval(
query, sid, approve=True, timeout=180
)
planned = meta.get("planned_tools", [])
if planned:
_observed_tools.update(planned)
tool_in_plan = expected_tool in planned
tool_results = meta.get("tool_results", {})
tool_in_results = expected_tool in tool_results
passed = tool_in_plan # tool in planned_tools suffices
if passed:
sub_passed += 1
sub_results.append(
{
"label": label,
"expected_tool": expected_tool,
"tool_in_plan": tool_in_plan,
"tool_in_results": tool_in_results,
"planned_tools": planned,
"passed": passed,
"attempt": attempt_idx + 1,
"error": err,
}
)
break # no retry needed if we got a response
except Exception as exc:
if attempt_idx == 1:
sub_results.append(
{
"label": label,
"expected_tool": expected_tool,
"passed": False,
"error": str(exc),
"attempt": attempt_idx + 1,
}
)
elapsed = time.monotonic() - t0
assertions = [f"{sub_passed}/4 sub-cases passed (need >= 3)"]
if sub_passed >= 3:
return _record(
8,
"External API Tool Invocation",
3,
"passed",
elapsed,
assertions=assertions,
detail={"sub_results": sub_results},
)
return _record(
8,
"External API Tool Invocation",
3,
"failed",
elapsed,
assertions=assertions,
error=f"only {sub_passed}/4 passed (need >= 3)",
detail={"sub_results": sub_results},
)
# ---------------------------------------------------------------------------
# Phase 4: Adapter Dynamics
# ---------------------------------------------------------------------------
async def scenario9_sequential_adapter_switching() -> dict:
"""Scenario 9: Sequential Adapter Switching (3 iterations, 3 requests each)."""
t0 = time.monotonic()
errors: list[str] = []
total_requests = 0
for i in range(1, 4):
sid = _session_id(9)
# Civil query
ok1, text1, meta1, err1 = await _call_agent_with_approval(
"์ฃผ์ฐจ ์œ„๋ฐ˜ ๊ณผํƒœ๋ฃŒ ์ด์˜์‹ ์ฒญ ๋ฏผ์› ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ค˜", sid, approve=True, timeout=180
)
total_requests += 1
if meta1.get("planned_tools"):
_observed_tools.update(meta1["planned_tools"])
if not ok1 or not text1.strip():
errors.append(f"iter {i} civil-1: {err1 or '๋นˆ ์‘๋‹ต'}")
continue
# Legal query (same session)
ok2, text2, meta2, err2 = await _call_agent_with_approval(
"์œ„ ๋‹ต๋ณ€์— ๊ด€๋ จ ๋ฒ•๋ น ๊ทผ๊ฑฐ๋ฅผ ์ถ”๊ฐ€ํ•ด์ค˜", sid, approve=True, timeout=180
)
total_requests += 1
if meta2.get("planned_tools"):
_observed_tools.update(meta2["planned_tools"])
if not ok2 or not text2.strip():
errors.append(f"iter {i} legal: {err2 or '๋นˆ ์‘๋‹ต'}")
continue
# Civil query again (same session)
ok3, text3, meta3, err3 = await _call_agent_with_approval(
"์ถ”๊ฐ€ ๋ฏผ์› ๋‹ต๋ณ€ ์ดˆ์•ˆ์„ ์ž‘์„ฑํ•ด์ค˜", sid, approve=True, timeout=180
)
total_requests += 1
if meta3.get("planned_tools"):
_observed_tools.update(meta3["planned_tools"])
if not ok3 or not text3.strip():
errors.append(f"iter {i} civil-2: {err3 or '๋นˆ ์‘๋‹ต'}")
elapsed = time.monotonic() - t0
assertions = [f"{total_requests} requests completed", f"{len(errors)} errors"]
if errors:
return _record(
9,
"Sequential Adapter Switching",
4,
"failed",
elapsed,
assertions=assertions,
error="; ".join(errors[:3]),
detail={"iterations": 3, "total_requests": total_requests, "errors": errors},
)
return _record(
9,
"Sequential Adapter Switching",
4,
"passed",
elapsed,
assertions=assertions,
detail={"iterations": 3, "total_requests": total_requests, "all_passed": True},
)
async def scenario10_lora_id_consistency() -> dict:
"""Scenario 10: LoRA ID Consistency (informational, always PASS)."""
t0 = time.monotonic()
try:
_, resp_before = await http_get("/v1/models", timeout=10)
models_before = [m.get("id", "") for m in resp_before.get("data", [])]
# Scenario 9 ์ด๋ฏธ ์™„๋ฃŒ๋œ ์ƒํƒœ์—์„œ ๋‹ค์‹œ ํ™•์ธ
_, resp_after = await http_get("/v1/models", timeout=10)
models_after = [m.get("id", "") for m in resp_after.get("data", [])]
elapsed = time.monotonic() - t0
stable = set(models_before) == set(models_after)
assertions = [
f"before: {len(models_before)} models",
f"after: {len(models_after)} models",
f"stable: {stable}",
]
warnings = [] if stable else ["adapter list changed between checks"]
return _record(
10,
"LoRA ID Consistency",
4,
"passed",
elapsed,
assertions=assertions,
warnings=warnings,
detail={"models_before": models_before, "models_after": models_after, "stable": stable},
)
except Exception as exc:
return _record(
10,
"LoRA ID Consistency",
4,
"passed",
time.monotonic() - t0,
assertions=["informational check"],
warnings=[f"could not verify: {exc}"],
)
# ---------------------------------------------------------------------------
# Phase 5: Robustness
# ---------------------------------------------------------------------------
async def scenario11_empty_query() -> dict:
"""Scenario 11: Empty Query Handling (expect 422, NOT 500)."""
t0 = time.monotonic()
assertions = []
last_error = ""
for attempt_idx in range(2):
try:
# REST endpoint
code_rest, resp_rest = await http_post("/v2/agent/run", {"query": ""}, timeout=10)
assertions.append(f"/v2/agent/run empty query: HTTP {code_rest}")
# SSE endpoint
code_sse, events_sse = await http_post_sse(
"/v2/agent/stream", {"query": ""}, timeout=10
)
assertions.append(f"/v2/agent/stream empty query: HTTP {code_sse}")
elapsed = time.monotonic() - t0
# 422 (Pydantic validation) ๋˜๋Š” 400 (Bad Request) ํ—ˆ์šฉ, 500์€ ๋ถˆ๊ฐ€
rest_ok = code_rest in (400, 422)
sse_ok = code_sse in (400, 422)
no_500 = code_rest != 500 and code_sse != 500
if no_500 and (rest_ok or sse_ok):
return _record(
11,
"Empty Query Handling",
5,
"passed",
elapsed,
attempt_idx + 1,
assertions=assertions,
detail={"rest_code": code_rest, "sse_code": code_sse},
)
if not no_500:
last_error = f"got 500 (rest={code_rest}, sse={code_sse})"
else:
last_error = f"unexpected codes: rest={code_rest}, sse={code_sse}"
if attempt_idx < 1:
continue
return _record(
11,
"Empty Query Handling",
5,
"failed",
elapsed,
attempt_idx + 1,
assertions=assertions,
error=last_error,
detail={"rest_code": code_rest, "sse_code": code_sse},
)
except Exception as exc:
last_error = str(exc)
return _record(
11, "Empty Query Handling", 5, "failed", time.monotonic() - t0, 2, error=last_error
)
async def scenario12_reject_flow() -> dict:
"""Scenario 12: Reject Flow Completeness."""
last_error = ""
for attempt_idx in range(2):
t0 = time.monotonic()
try:
sid = _session_id(12)
ok, text, meta, err = await _call_agent_with_approval(
"๋ฏผ์› ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ฃผ์„ธ์š”", sid, approve=False, timeout=30
)
elapsed = time.monotonic() - t0
assertions = []
# reject ํ›„์—๋Š” tool_results๊ฐ€ ๋น„์–ด์žˆ์–ด์•ผ ํ•จ
tool_results = meta.get("tool_results", {})
if ok:
assertions.append("reject flow completed")
if not tool_results:
assertions.append("tool_results empty after reject")
else:
assertions.append(f"tool_results NOT empty: {list(tool_results.keys())}")
if elapsed < 5:
assertions.append(f"response < 5s ({elapsed:.1f}s)")
else:
assertions.append(f"response >= 5s ({elapsed:.1f}s)")
return _record(
12,
"Reject Flow Completeness",
5,
"passed",
elapsed,
attempt_idx + 1,
assertions=assertions,
detail={"text_preview": text[:100], "tool_results": tool_results, "meta": meta},
)
last_error = err or "reject flow failed"
if attempt_idx < 1:
continue
return _record(
12,
"Reject Flow Completeness",
5,
"failed",
elapsed,
attempt_idx + 1,
assertions=assertions,
error=last_error,
detail={"meta": meta},
)
except Exception as exc:
last_error = str(exc)
return _record(
12, "Reject Flow Completeness", 5, "failed", time.monotonic() - t0, 2, error=last_error
)
async def scenario13_concurrent_isolation() -> dict:
"""Scenario 13: Concurrent Request Isolation (3 simultaneous requests)."""
t0 = time.monotonic()
queries = [
("์ฃผ์ฐจ ์œ„๋ฐ˜ ๋ฏผ์› ๋‹ต๋ณ€ ์ดˆ์•ˆ์„ ์ž‘์„ฑํ•ด์ค˜", _session_id(13)),
("์†Œ์Œ ๋ฏผ์›์— ๋Œ€ํ•œ ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ค˜", _session_id(13)),
("๋„๋กœ ํŒŒ์† ๋ฏผ์› ๋‹ต๋ณ€์„ ์ž‘์„ฑํ•ด์ค˜", _session_id(13)),
]
async def _run_one(query: str, sid: str) -> dict:
try:
ok, text, meta, err = await _call_agent_with_approval(
query, sid, approve=True, timeout=300
)
if meta.get("planned_tools"):
_observed_tools.update(meta["planned_tools"])
return {
"session_id": sid,
"ok": ok,
"text_len": len(text),
"error": err,
"query": query[:20],
}
except Exception as exc:
return {
"session_id": sid,
"ok": False,
"text_len": 0,
"error": str(exc),
"query": query[:20],
}
tasks = [_run_one(q, s) for q, s in queries]
results = await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.monotonic() - t0
# ๊ฒฐ๊ณผ ์ •๋ฆฌ
sub_results = []
valid_count = 0
for r in results:
if isinstance(r, Exception):
sub_results.append({"ok": False, "error": str(r)})
else:
sub_results.append(r)
if r.get("ok"):
valid_count += 1
# session_id ๊ต์ฐจ ์˜ค์—ผ ํ™•์ธ (์—ฌ๊ธฐ์„œ๋Š” ๊ฐ๊ฐ ๋…๋ฆฝ์  session_id)
session_ids = [q[1] for q in queries]
all_unique = len(set(session_ids)) == len(session_ids)
assertions = [
f"{valid_count}/3 concurrent requests succeeded",
f"session_ids unique: {all_unique}",
]
if valid_count == 3:
return _record(
13,
"Concurrent Request Isolation",
5,
"passed",
elapsed,
assertions=assertions,
detail={"sub_results": sub_results},
)
return _record(
13,
"Concurrent Request Isolation",
5,
"failed",
elapsed,
assertions=assertions,
error=f"only {valid_count}/3 succeeded",
detail={"sub_results": sub_results},
)
# ---------------------------------------------------------------------------
# Cold Start ๋Œ€๊ธฐ
# ---------------------------------------------------------------------------
async def _wait_cold_start() -> float:
"""์„œ๋ฒ„ cold start ๋Œ€๊ธฐ. ์ตœ๋Œ€ 10ํšŒ x 30์ดˆ ๊ฐ„๊ฒฉ. ๋Œ€๊ธฐํ•œ ์ด ์‹œ๊ฐ„์„ ๋ฐ˜ํ™˜."""
total_wait = 0.0
for i in range(10):
try:
code, body = await http_get("/health", timeout=10)
if code == 200 and body.get("status") in ("ok", "healthy"):
logger.info(f" ์„œ๋ฒ„ ์ค€๋น„ ์™„๋ฃŒ (๋Œ€๊ธฐ {total_wait:.0f}s)")
return total_wait
except Exception:
pass
if i < 9:
logger.info(f" ์„œ๋ฒ„ ๋Œ€๊ธฐ ์ค‘... ({i + 1}/10, 30s ํ›„ ์žฌ์‹œ๋„)")
await asyncio.sleep(30)
total_wait += 30
logger.info(" [WARN] ์„œ๋ฒ„ ์ค€๋น„ ํ™•์ธ ์‹คํŒจ โ€” ๊ณ„์† ์ง„ํ–‰")
return total_wait
# ---------------------------------------------------------------------------
# ๋ฉ”์ธ ๋Ÿฌ๋„ˆ
# ---------------------------------------------------------------------------
async def main() -> int:
logger.info("=" * 60)
logger.info("GovOn E2E Tool Calling + AdapterRegistry ๊ฒ€์ฆ")
logger.info("=" * 60)
logger.info(f" ๋Œ€์ƒ ์„œ๋ฒ„: {BASE_URL}")
logger.info(f" ์ธ์ฆ: {'API_KEY ์„ค์ •๋จ' if API_KEY else '๋ฏธ์„ค์ • (๋น„์ธ์ฆ)'}")
logger.info(f" HTTP ๋ฐฑ์—”๋“œ: {_HTTP_BACKEND}")
logger.info(f" ํƒ€์ž„์•„์›ƒ: {TIMEOUT}s / ์‹œ๋‚˜๋ฆฌ์˜ค")
logger.info(f" run_id: {_run_id}")
logger.info("-" * 60)
# Cold start ๋Œ€๊ธฐ
logger.info("[Cold Start] ์„œ๋ฒ„ ์ค€๋น„ ํ™•์ธ ์ค‘...")
cold_start_wait = await _wait_cold_start()
# ===== Phase 1: Infrastructure (hard gate) =====
logger.info("\n[Phase 1] Infrastructure (hard gate)")
logger.info("-" * 40)
phase1_scenarios = [
scenario1_health_profile,
scenario2_base_model_generation,
scenario3_adapter_registry,
]
phase1_failed = False
for fn in phase1_scenarios:
result = await fn()
if result["status"] == "failed":
phase1_failed = True
if phase1_failed:
logger.info("\n" + "!" * 60)
logger.info("ABORT: Infrastructure not ready โ€” Phase 1 failed")
logger.info("!" * 60)
_write_output(cold_start_wait)
return 1
# ===== Phase 2: Agent Pipeline Core =====
logger.info("\n[Phase 2] Agent Pipeline Core")
logger.info("-" * 40)
phase2_scenarios = [
scenario4_planner_valid_plan,
scenario5_civil_lora_draft,
scenario6_legal_lora_evidence,
scenario7_task_type_classification,
]
for fn in phase2_scenarios:
await fn()
# Legal LoRA ์นดํ…Œ๊ณ ๋ฆฌ๋ณ„ ํ…Œ์ŠคํŠธ (4๊ฐœ: ๋ฏผ์‚ฌ๋ฒ•, ํ˜•์‚ฌ๋ฒ•, ์ง€์‹์žฌ์‚ฐ๊ถŒ, ํŒ๋ก€)
legal_scenarios = [
scenario6a_legal_civil_law,
scenario6b_legal_criminal_law,
scenario6c_legal_ip,
scenario6d_legal_precedent,
]
for legal_fn in legal_scenarios:
await legal_fn()
# ===== Phase 3: data.go.kr API Tools (soft gate) =====
logger.info("\n[Phase 3] data.go.kr API Tools (soft gate)")
logger.info("-" * 40)
logger.info(" data.go.kr ์—ฐ๊ฒฐ ํ™•์ธ...")
datago_ok = await _check_datago_connectivity()
if datago_ok:
logger.info(" data.go.kr ์—ฐ๊ฒฐ ๊ฐ€๋Šฅ")
else:
logger.info(" data.go.kr ์—ฐ๊ฒฐ ๋ถˆ๊ฐ€ โ€” Phase 3 ์Šคํ‚ต")
await scenario8_external_api_tools()
# ===== Phase 4: Adapter Dynamics =====
logger.info("\n[Phase 4] Adapter Dynamics")
logger.info("-" * 40)
await scenario9_sequential_adapter_switching()
await scenario10_lora_id_consistency()
# ===== Phase 5: Robustness =====
logger.info("\n[Phase 5] Robustness")
logger.info("-" * 40)
phase5_scenarios = [
scenario11_empty_query,
scenario12_reject_flow,
scenario13_concurrent_isolation,
]
for fn in phase5_scenarios:
await fn()
# ===== ์š”์•ฝ =====
logger.info("\n" + "=" * 60)
passed = sum(1 for r in _results if r["status"] == "passed")
failed = sum(1 for r in _results if r["status"] == "failed")
skipped = sum(1 for r in _results if r["status"] == "skipped")
total = len(_results)
logger.info(f"๊ฒฐ๊ณผ: {passed}/{total} ํ†ต๊ณผ, {failed} ์‹คํŒจ, {skipped} ์Šคํ‚ต")
tool_ratio = len(_observed_tools) / len(VALID_TOOLS) if VALID_TOOLS else 0
logger.info(f"๋„๊ตฌ ์ปค๋ฒ„๋ฆฌ์ง€: {len(_observed_tools)}/{len(VALID_TOOLS)} ({tool_ratio:.0%})")
if _observed_tools:
logger.info(f" ๊ด€์ธก๋œ ๋„๊ตฌ: {sorted(_observed_tools)}")
_write_output(cold_start_wait)
return 0 if failed == 0 else 1
def _write_output(cold_start_wait: float) -> None:
"""JSON ๊ฒฐ๊ณผ ํŒŒ์ผ ์ถœ๋ ฅ."""
from datetime import datetime, timezone
passed = sum(1 for r in _results if r["status"] == "passed")
failed = sum(1 for r in _results if r["status"] == "failed")
skipped = sum(1 for r in _results if r["status"] == "skipped")
tool_ratio = len(_observed_tools) / len(VALID_TOOLS) if VALID_TOOLS else 0
output = {
"meta": {
"run_id": _run_id,
"timestamp_utc": datetime.now(timezone.utc).isoformat(),
"target_url": BASE_URL,
"cold_start_wait_seconds": cold_start_wait,
},
"summary": {
"total": len(_results),
"passed": passed,
"failed": failed,
"skipped": skipped,
"tool_coverage": {
"observed": sorted(_observed_tools),
"ratio": round(tool_ratio, 2),
},
},
"scenarios": _results,
"server_url": BASE_URL,
"http_backend": _HTTP_BACKEND,
}
with open(RESULTS_PATH, "w", encoding="utf-8") as f:
json.dump(output, f, ensure_ascii=False, indent=2)
logger.info(f"\n๊ฒฐ๊ณผ ์ €์žฅ: {RESULTS_PATH}")
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)