umyunsang's picture
Upload folder using huggingface_hub
d2585c1 verified
"""구조화 로그 시스템 — JSON Lines + Rich 터미널 동시 출력.
로그 레벨:
FLOW — 파이프라인 노드 전이 (session_load -> planner -> ...)
METRIC — 레이턴시, 토큰 수, 메모리 사용량
ASSERT — 시나리오 검증 결과 (PASS/FAIL/WARN)
DEBUG — HTTP 요청/응답 상세
"""
from __future__ import annotations
import json
import sys
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
try:
from rich.console import Console
from rich.live import Live
from rich.table import Table
from rich.text import Text
RICH_AVAILABLE = True
except ImportError:
RICH_AVAILABLE = False
@dataclass
class LogEntry:
"""단일 로그 엔트리."""
timestamp: str
level: str # FLOW | METRIC | ASSERT | INFO | WARN | ERROR | DEBUG
phase: int
scenario_id: int
message: str
data: Dict[str, Any] = field(default_factory=dict)
def to_json(self) -> str:
return json.dumps(asdict(self), ensure_ascii=False)
class E2ELogger:
"""듀얼 출력 로거: JSON Lines 파일 + 터미널."""
def __init__(self, log_path: str, verbose: bool = True) -> None:
self._log_path = Path(log_path)
self._verbose = verbose
self._entries: List[LogEntry] = []
self._file = open(self._log_path, "w", encoding="utf-8") # noqa: SIM115
self._console = Console(stderr=True) if RICH_AVAILABLE else None
self._phase = 0
self._scenario_id = 0
def set_context(self, phase: int = 0, scenario_id: int = 0) -> None:
self._phase = phase
self._scenario_id = scenario_id
def _write(self, entry: LogEntry) -> None:
self._entries.append(entry)
line = entry.to_json()
self._file.write(line + "\n")
self._file.flush()
if self._verbose:
self._print_terminal(entry)
def _print_terminal(self, entry: LogEntry) -> None:
level = entry.level
tag_map = {
"FLOW": "[cyan][FLOW][/cyan]" if RICH_AVAILABLE else "[FLOW]",
"METRIC": "[blue][METRIC][/blue]" if RICH_AVAILABLE else "[METRIC]",
"ASSERT": "[green][ASSERT][/green]" if RICH_AVAILABLE else "[ASSERT]",
"INFO": "[white][INFO][/white]" if RICH_AVAILABLE else "[INFO]",
"WARN": "[yellow][WARN][/yellow]" if RICH_AVAILABLE else "[WARN]",
"ERROR": "[red][ERROR][/red]" if RICH_AVAILABLE else "[ERROR]",
"DEBUG": "[dim][DEBUG][/dim]" if RICH_AVAILABLE else "[DEBUG]",
}
tag = tag_map.get(level, f"[{level}]")
if self._console and RICH_AVAILABLE:
self._console.print(f"{entry.timestamp} {tag} {entry.message}")
else:
plain_tag = f"[{level}]"
print(f"{entry.timestamp} {plain_tag} {entry.message}", file=sys.stderr)
def _now(self) -> str:
return time.strftime("%Y-%m-%d %H:%M:%S")
def flow(self, message: str, **data: Any) -> None:
self._write(LogEntry(self._now(), "FLOW", self._phase, self._scenario_id, message, data))
def metric(self, message: str, **data: Any) -> None:
self._write(LogEntry(self._now(), "METRIC", self._phase, self._scenario_id, message, data))
def assertion(self, message: str, **data: Any) -> None:
self._write(LogEntry(self._now(), "ASSERT", self._phase, self._scenario_id, message, data))
def info(self, message: str, **data: Any) -> None:
self._write(LogEntry(self._now(), "INFO", self._phase, self._scenario_id, message, data))
def warn(self, message: str, **data: Any) -> None:
self._write(LogEntry(self._now(), "WARN", self._phase, self._scenario_id, message, data))
def error(self, message: str, **data: Any) -> None:
self._write(LogEntry(self._now(), "ERROR", self._phase, self._scenario_id, message, data))
def debug(self, message: str, **data: Any) -> None:
if self._verbose:
self._write(
LogEntry(self._now(), "DEBUG", self._phase, self._scenario_id, message, data)
)
def scenario_result(
self,
scenario_id: int,
name: str,
phase: int,
status: str,
elapsed: float,
attempts: int = 1,
assertions: Optional[List[str]] = None,
warnings: Optional[List[str]] = None,
error: Optional[str] = None,
detail: Optional[Any] = None,
) -> dict:
"""시나리오 결과를 로그에 기록하고 결과 dict를 반환한다."""
tag = {"passed": "PASS", "failed": "FAIL", "skipped": "SKIP"}.get(status, "????")
msg = f"[{tag}] Scenario {scenario_id}: {name} ({elapsed:.2f}s)"
if status == "passed":
self.assertion(msg)
elif status == "skipped":
self.warn(f"{msg} -- {error or 'skipped'}")
else:
self.error(f"{msg} -- {error}")
if warnings:
for w in warnings:
self.warn(f" [WARN] {w}")
entry = {
"id": scenario_id,
"name": name,
"phase": phase,
"status": status,
"attempts": attempts,
"elapsed_s": round(elapsed, 3),
"assertions": assertions or [],
"warnings": warnings or [],
"error": error,
"detail": detail,
}
return entry
def close(self) -> None:
self._file.close()
@property
def entries(self) -> List[LogEntry]:
return list(self._entries)