Spaces:
Paused
Paused
| """구조화 로그 시스템 — JSON Lines + Rich 터미널 동시 출력. | |
| 로그 레벨: | |
| FLOW — 파이프라인 노드 전이 (session_load -> planner -> ...) | |
| METRIC — 레이턴시, 토큰 수, 메모리 사용량 | |
| ASSERT — 시나리오 검증 결과 (PASS/FAIL/WARN) | |
| DEBUG — HTTP 요청/응답 상세 | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import sys | |
| import time | |
| from dataclasses import asdict, dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| try: | |
| from rich.console import Console | |
| from rich.live import Live | |
| from rich.table import Table | |
| from rich.text import Text | |
| RICH_AVAILABLE = True | |
| except ImportError: | |
| RICH_AVAILABLE = False | |
| class LogEntry: | |
| """단일 로그 엔트리.""" | |
| timestamp: str | |
| level: str # FLOW | METRIC | ASSERT | INFO | WARN | ERROR | DEBUG | |
| phase: int | |
| scenario_id: int | |
| message: str | |
| data: Dict[str, Any] = field(default_factory=dict) | |
| def to_json(self) -> str: | |
| return json.dumps(asdict(self), ensure_ascii=False) | |
| class E2ELogger: | |
| """듀얼 출력 로거: JSON Lines 파일 + 터미널.""" | |
| def __init__(self, log_path: str, verbose: bool = True) -> None: | |
| self._log_path = Path(log_path) | |
| self._verbose = verbose | |
| self._entries: List[LogEntry] = [] | |
| self._file = open(self._log_path, "w", encoding="utf-8") # noqa: SIM115 | |
| self._console = Console(stderr=True) if RICH_AVAILABLE else None | |
| self._phase = 0 | |
| self._scenario_id = 0 | |
| def set_context(self, phase: int = 0, scenario_id: int = 0) -> None: | |
| self._phase = phase | |
| self._scenario_id = scenario_id | |
| def _write(self, entry: LogEntry) -> None: | |
| self._entries.append(entry) | |
| line = entry.to_json() | |
| self._file.write(line + "\n") | |
| self._file.flush() | |
| if self._verbose: | |
| self._print_terminal(entry) | |
| def _print_terminal(self, entry: LogEntry) -> None: | |
| level = entry.level | |
| tag_map = { | |
| "FLOW": "[cyan][FLOW][/cyan]" if RICH_AVAILABLE else "[FLOW]", | |
| "METRIC": "[blue][METRIC][/blue]" if RICH_AVAILABLE else "[METRIC]", | |
| "ASSERT": "[green][ASSERT][/green]" if RICH_AVAILABLE else "[ASSERT]", | |
| "INFO": "[white][INFO][/white]" if RICH_AVAILABLE else "[INFO]", | |
| "WARN": "[yellow][WARN][/yellow]" if RICH_AVAILABLE else "[WARN]", | |
| "ERROR": "[red][ERROR][/red]" if RICH_AVAILABLE else "[ERROR]", | |
| "DEBUG": "[dim][DEBUG][/dim]" if RICH_AVAILABLE else "[DEBUG]", | |
| } | |
| tag = tag_map.get(level, f"[{level}]") | |
| if self._console and RICH_AVAILABLE: | |
| self._console.print(f"{entry.timestamp} {tag} {entry.message}") | |
| else: | |
| plain_tag = f"[{level}]" | |
| print(f"{entry.timestamp} {plain_tag} {entry.message}", file=sys.stderr) | |
| def _now(self) -> str: | |
| return time.strftime("%Y-%m-%d %H:%M:%S") | |
| def flow(self, message: str, **data: Any) -> None: | |
| self._write(LogEntry(self._now(), "FLOW", self._phase, self._scenario_id, message, data)) | |
| def metric(self, message: str, **data: Any) -> None: | |
| self._write(LogEntry(self._now(), "METRIC", self._phase, self._scenario_id, message, data)) | |
| def assertion(self, message: str, **data: Any) -> None: | |
| self._write(LogEntry(self._now(), "ASSERT", self._phase, self._scenario_id, message, data)) | |
| def info(self, message: str, **data: Any) -> None: | |
| self._write(LogEntry(self._now(), "INFO", self._phase, self._scenario_id, message, data)) | |
| def warn(self, message: str, **data: Any) -> None: | |
| self._write(LogEntry(self._now(), "WARN", self._phase, self._scenario_id, message, data)) | |
| def error(self, message: str, **data: Any) -> None: | |
| self._write(LogEntry(self._now(), "ERROR", self._phase, self._scenario_id, message, data)) | |
| def debug(self, message: str, **data: Any) -> None: | |
| if self._verbose: | |
| self._write( | |
| LogEntry(self._now(), "DEBUG", self._phase, self._scenario_id, message, data) | |
| ) | |
| def scenario_result( | |
| self, | |
| scenario_id: int, | |
| name: str, | |
| phase: int, | |
| status: str, | |
| elapsed: float, | |
| attempts: int = 1, | |
| assertions: Optional[List[str]] = None, | |
| warnings: Optional[List[str]] = None, | |
| error: Optional[str] = None, | |
| detail: Optional[Any] = None, | |
| ) -> dict: | |
| """시나리오 결과를 로그에 기록하고 결과 dict를 반환한다.""" | |
| tag = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}.get(status, "????") | |
| msg = f"[{tag}] Scenario {scenario_id}: {name} ({elapsed:.2f}s)" | |
| if status == "passed": | |
| self.assertion(msg) | |
| elif status == "skipped": | |
| self.warn(f"{msg} -- {error or 'skipped'}") | |
| else: | |
| self.error(f"{msg} -- {error}") | |
| if warnings: | |
| for w in warnings: | |
| self.warn(f" [WARN] {w}") | |
| entry = { | |
| "id": scenario_id, | |
| "name": name, | |
| "phase": phase, | |
| "status": status, | |
| "attempts": attempts, | |
| "elapsed_s": round(elapsed, 3), | |
| "assertions": assertions or [], | |
| "warnings": warnings or [], | |
| "error": error, | |
| "detail": detail, | |
| } | |
| return entry | |
| def close(self) -> None: | |
| self._file.close() | |
| def entries(self) -> List[LogEntry]: | |
| return list(self._entries) | |