govon-runtime / scripts /e2e_gpu_test /flow_tracker.py
umyunsang's picture
Upload folder using huggingface_hub
d2585c1 verified
"""ํŒŒ์ดํ”„๋ผ์ธ ํ๋ฆ„ ์ถ”์  ๋ฐ ๊ฒ€์ฆ.
SSE ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆผ์—์„œ ๋…ธ๋“œ ์ „์ด๋ฅผ ์ถ”์ถœํ•˜๊ณ ,
๊ธฐ๋Œ€ ํ๋ฆ„๊ณผ ๋น„๊ตํ•˜์—ฌ ํŒŒ์ดํ”„๋ผ์ธ ๋ฌด๊ฒฐ์„ฑ์„ ๊ฒ€์ฆํ•œ๋‹ค.
"""
from __future__ import annotations
import statistics
import time
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from .config import EXPECTED_APPROVED_FLOW, EXPECTED_REJECTED_FLOW
@dataclass
class NodeTransition:
"""๋…ธ๋“œ ์ „์ด ๊ธฐ๋ก."""
node: str
status: str # "started" | "completed" | "error"
timestamp: float
latency_ms: float = 0.0
detail: Dict[str, Any] = field(default_factory=dict)
class PipelineFlowTracker:
"""SSE ์ด๋ฒคํŠธ์—์„œ ๋…ธ๋“œ ์ „์ด๋ฅผ ์ถ”์ถœํ•˜๊ณ  ์ถ”์ ํ•œ๋‹ค."""
def __init__(self) -> None:
self._transitions: List[NodeTransition] = []
self._start_time: float = time.monotonic()
self._node_starts: Dict[str, float] = {}
def reset(self) -> None:
self._transitions = []
self._start_time = time.monotonic()
self._node_starts = {}
def track_event(self, event: Dict[str, Any]) -> None:
"""SSE ์ด๋ฒคํŠธ๋ฅผ ํŒŒ์‹ฑํ•˜์—ฌ ๋…ธ๋“œ ์ „์ด๋ฅผ ๊ธฐ๋กํ•œ๋‹ค."""
node = event.get("node", "")
status = event.get("status", "")
now = time.monotonic()
if not node:
return
if status in ("started", "running"):
self._node_starts[node] = now
self._transitions.append(
NodeTransition(
node=node,
status="started",
timestamp=now - self._start_time,
)
)
elif status in ("completed", "done", "awaiting_approval"):
start = self._node_starts.pop(node, now)
latency_ms = (now - start) * 1000
self._transitions.append(
NodeTransition(
node=node,
status="completed",
timestamp=now - self._start_time,
latency_ms=latency_ms,
detail=event,
)
)
elif status == "error":
self._transitions.append(
NodeTransition(
node=node,
status="error",
timestamp=now - self._start_time,
detail=event,
)
)
@property
def node_sequence(self) -> List[str]:
"""๊ด€์ธก๋œ ๋…ธ๋“œ ์ˆœ์„œ (์ค‘๋ณต ์ œ๊ฑฐ, ์ˆœ์„œ ์œ ์ง€)."""
seen = set()
result = []
for t in self._transitions:
if t.node not in seen:
seen.add(t.node)
result.append(t.node)
return result
@property
def transitions(self) -> List[NodeTransition]:
return list(self._transitions)
def to_text(self) -> str:
"""๋…ธ๋“œ ํ๋ฆ„์„ ํ…์ŠคํŠธ๋กœ ํ‘œํ˜„ํ•œ๋‹ค.
์˜ˆ: "session_load(2ms) -> planner(1.2s) -> approval_wait -> ..."
"""
parts = []
for t in self._transitions:
if t.status == "completed" and t.latency_ms > 0:
if t.latency_ms >= 1000:
parts.append(f"{t.node}({t.latency_ms / 1000:.1f}s)")
else:
parts.append(f"{t.node}({t.latency_ms:.0f}ms)")
elif t.status == "started":
continue # completed์—์„œ ์ฒ˜๋ฆฌ
else:
parts.append(t.node)
return " -> ".join(parts) if parts else "(no transitions)"
class FlowValidator:
"""๊ธฐ๋Œ€ ํ๋ฆ„๊ณผ ์‹ค์ œ ํ๋ฆ„์„ ๋น„๊ตํ•œ๋‹ค."""
@staticmethod
def validate_approved_flow(actual_nodes: List[str]) -> tuple[bool, List[str]]:
"""์Šน์ธ ๊ฒฝ๋กœ ๊ฒ€์ฆ. (valid, issues) ๋ฐ˜ํ™˜."""
return FlowValidator._validate(actual_nodes, EXPECTED_APPROVED_FLOW)
@staticmethod
def validate_rejected_flow(actual_nodes: List[str]) -> tuple[bool, List[str]]:
"""๊ฑฐ์ ˆ ๊ฒฝ๋กœ ๊ฒ€์ฆ."""
return FlowValidator._validate(actual_nodes, EXPECTED_REJECTED_FLOW)
@staticmethod
def _validate(actual: List[str], expected: List[str]) -> tuple[bool, List[str]]:
issues: List[str] = []
# ๊ธฐ๋Œ€ ๋…ธ๋“œ๊ฐ€ ์‹ค์ œ์— ์ˆœ์„œ๋Œ€๋กœ ํฌํ•จ๋˜์–ด ์žˆ๋Š”์ง€ ํ™•์ธ
expected_idx = 0
for node in actual:
if expected_idx < len(expected) and node == expected[expected_idx]:
expected_idx += 1
if expected_idx < len(expected):
missing = expected[expected_idx:]
issues.append(f"๋ˆ„๋ฝ๋œ ๋…ธ๋“œ: {missing}")
# ๋น„์ •์ƒ ๋…ธ๋“œ ๊ฐ์ง€
unexpected = [n for n in actual if n not in expected and n != "__interrupt__"]
if unexpected:
issues.append(f"์˜ˆ์ƒ ์™ธ ๋…ธ๋“œ: {unexpected}")
return len(issues) == 0, issues
class LatencyAggregator:
"""๋…ธ๋“œ๋ณ„ ๋ ˆ์ดํ„ด์‹œ ํ†ต๊ณ„๋ฅผ ์ˆ˜์ง‘ํ•œ๋‹ค."""
def __init__(self) -> None:
self._latencies: Dict[str, List[float]] = {}
def record(self, node: str, latency_ms: float) -> None:
self._latencies.setdefault(node, []).append(latency_ms)
def record_from_tracker(self, tracker: PipelineFlowTracker) -> None:
"""FlowTracker์˜ ์ „์ด์—์„œ ๋ ˆ์ดํ„ด์‹œ๋ฅผ ์ˆ˜์ง‘ํ•œ๋‹ค."""
for t in tracker.transitions:
if t.status == "completed" and t.latency_ms > 0:
self.record(t.node, t.latency_ms)
def stats(self, node: str) -> Dict[str, float]:
"""๋…ธ๋“œ์˜ min/max/avg/p95/count๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค."""
values = self._latencies.get(node, [])
if not values:
return {"count": 0, "min": 0, "max": 0, "avg": 0, "p95": 0}
sorted_v = sorted(values)
p95_idx = max(0, int(len(sorted_v) * 0.95) - 1)
return {
"count": len(values),
"min": round(min(values), 2),
"max": round(max(values), 2),
"avg": round(statistics.mean(values), 2),
"p95": round(sorted_v[p95_idx], 2),
}
def all_stats(self) -> Dict[str, Dict[str, float]]:
return {node: self.stats(node) for node in sorted(self._latencies.keys())}
def summary_text(self) -> str:
"""๋ ˆ์ดํ„ด์‹œ ์š”์•ฝ์„ ํ…์ŠคํŠธ๋กœ ๋ฐ˜ํ™˜."""
lines = []
for node, s in self.all_stats().items():
lines.append(
f" {node}: avg={s['avg']:.0f}ms p95={s['p95']:.0f}ms "
f"min={s['min']:.0f}ms max={s['max']:.0f}ms (n={s['count']})"
)
return "\n".join(lines) if lines else " (no latency data)"