Spaces:
Paused
Paused
| """ํ์ดํ๋ผ์ธ ํ๋ฆ ์ถ์ ๋ฐ ๊ฒ์ฆ. | |
| SSE ์ด๋ฒคํธ ์คํธ๋ฆผ์์ ๋ ธ๋ ์ ์ด๋ฅผ ์ถ์ถํ๊ณ , | |
| ๊ธฐ๋ ํ๋ฆ๊ณผ ๋น๊ตํ์ฌ ํ์ดํ๋ผ์ธ ๋ฌด๊ฒฐ์ฑ์ ๊ฒ์ฆํ๋ค. | |
| """ | |
| from __future__ import annotations | |
| import statistics | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import Any, Dict, List, Optional | |
| from .config import EXPECTED_APPROVED_FLOW, EXPECTED_REJECTED_FLOW | |
| class NodeTransition: | |
| """๋ ธ๋ ์ ์ด ๊ธฐ๋ก.""" | |
| node: str | |
| status: str # "started" | "completed" | "error" | |
| timestamp: float | |
| latency_ms: float = 0.0 | |
| detail: Dict[str, Any] = field(default_factory=dict) | |
| class PipelineFlowTracker: | |
| """SSE ์ด๋ฒคํธ์์ ๋ ธ๋ ์ ์ด๋ฅผ ์ถ์ถํ๊ณ ์ถ์ ํ๋ค.""" | |
| def __init__(self) -> None: | |
| self._transitions: List[NodeTransition] = [] | |
| self._start_time: float = time.monotonic() | |
| self._node_starts: Dict[str, float] = {} | |
| def reset(self) -> None: | |
| self._transitions = [] | |
| self._start_time = time.monotonic() | |
| self._node_starts = {} | |
| def track_event(self, event: Dict[str, Any]) -> None: | |
| """SSE ์ด๋ฒคํธ๋ฅผ ํ์ฑํ์ฌ ๋ ธ๋ ์ ์ด๋ฅผ ๊ธฐ๋กํ๋ค.""" | |
| node = event.get("node", "") | |
| status = event.get("status", "") | |
| now = time.monotonic() | |
| if not node: | |
| return | |
| if status in ("started", "running"): | |
| self._node_starts[node] = now | |
| self._transitions.append( | |
| NodeTransition( | |
| node=node, | |
| status="started", | |
| timestamp=now - self._start_time, | |
| ) | |
| ) | |
| elif status in ("completed", "done", "awaiting_approval"): | |
| start = self._node_starts.pop(node, now) | |
| latency_ms = (now - start) * 1000 | |
| self._transitions.append( | |
| NodeTransition( | |
| node=node, | |
| status="completed", | |
| timestamp=now - self._start_time, | |
| latency_ms=latency_ms, | |
| detail=event, | |
| ) | |
| ) | |
| elif status == "error": | |
| self._transitions.append( | |
| NodeTransition( | |
| node=node, | |
| status="error", | |
| timestamp=now - self._start_time, | |
| detail=event, | |
| ) | |
| ) | |
| def node_sequence(self) -> List[str]: | |
| """๊ด์ธก๋ ๋ ธ๋ ์์ (์ค๋ณต ์ ๊ฑฐ, ์์ ์ ์ง).""" | |
| seen = set() | |
| result = [] | |
| for t in self._transitions: | |
| if t.node not in seen: | |
| seen.add(t.node) | |
| result.append(t.node) | |
| return result | |
| def transitions(self) -> List[NodeTransition]: | |
| return list(self._transitions) | |
| def to_text(self) -> str: | |
| """๋ ธ๋ ํ๋ฆ์ ํ ์คํธ๋ก ํํํ๋ค. | |
| ์: "session_load(2ms) -> planner(1.2s) -> approval_wait -> ..." | |
| """ | |
| parts = [] | |
| for t in self._transitions: | |
| if t.status == "completed" and t.latency_ms > 0: | |
| if t.latency_ms >= 1000: | |
| parts.append(f"{t.node}({t.latency_ms / 1000:.1f}s)") | |
| else: | |
| parts.append(f"{t.node}({t.latency_ms:.0f}ms)") | |
| elif t.status == "started": | |
| continue # completed์์ ์ฒ๋ฆฌ | |
| else: | |
| parts.append(t.node) | |
| return " -> ".join(parts) if parts else "(no transitions)" | |
| class FlowValidator: | |
| """๊ธฐ๋ ํ๋ฆ๊ณผ ์ค์ ํ๋ฆ์ ๋น๊ตํ๋ค.""" | |
| def validate_approved_flow(actual_nodes: List[str]) -> tuple[bool, List[str]]: | |
| """์น์ธ ๊ฒฝ๋ก ๊ฒ์ฆ. (valid, issues) ๋ฐํ.""" | |
| return FlowValidator._validate(actual_nodes, EXPECTED_APPROVED_FLOW) | |
| def validate_rejected_flow(actual_nodes: List[str]) -> tuple[bool, List[str]]: | |
| """๊ฑฐ์ ๊ฒฝ๋ก ๊ฒ์ฆ.""" | |
| return FlowValidator._validate(actual_nodes, EXPECTED_REJECTED_FLOW) | |
| def _validate(actual: List[str], expected: List[str]) -> tuple[bool, List[str]]: | |
| issues: List[str] = [] | |
| # ๊ธฐ๋ ๋ ธ๋๊ฐ ์ค์ ์ ์์๋๋ก ํฌํจ๋์ด ์๋์ง ํ์ธ | |
| expected_idx = 0 | |
| for node in actual: | |
| if expected_idx < len(expected) and node == expected[expected_idx]: | |
| expected_idx += 1 | |
| if expected_idx < len(expected): | |
| missing = expected[expected_idx:] | |
| issues.append(f"๋๋ฝ๋ ๋ ธ๋: {missing}") | |
| # ๋น์ ์ ๋ ธ๋ ๊ฐ์ง | |
| unexpected = [n for n in actual if n not in expected and n != "__interrupt__"] | |
| if unexpected: | |
| issues.append(f"์์ ์ธ ๋ ธ๋: {unexpected}") | |
| return len(issues) == 0, issues | |
| class LatencyAggregator: | |
| """๋ ธ๋๋ณ ๋ ์ดํด์ ํต๊ณ๋ฅผ ์์งํ๋ค.""" | |
| def __init__(self) -> None: | |
| self._latencies: Dict[str, List[float]] = {} | |
| def record(self, node: str, latency_ms: float) -> None: | |
| self._latencies.setdefault(node, []).append(latency_ms) | |
| def record_from_tracker(self, tracker: PipelineFlowTracker) -> None: | |
| """FlowTracker์ ์ ์ด์์ ๋ ์ดํด์๋ฅผ ์์งํ๋ค.""" | |
| for t in tracker.transitions: | |
| if t.status == "completed" and t.latency_ms > 0: | |
| self.record(t.node, t.latency_ms) | |
| def stats(self, node: str) -> Dict[str, float]: | |
| """๋ ธ๋์ min/max/avg/p95/count๋ฅผ ๋ฐํํ๋ค.""" | |
| values = self._latencies.get(node, []) | |
| if not values: | |
| return {"count": 0, "min": 0, "max": 0, "avg": 0, "p95": 0} | |
| sorted_v = sorted(values) | |
| p95_idx = max(0, int(len(sorted_v) * 0.95) - 1) | |
| return { | |
| "count": len(values), | |
| "min": round(min(values), 2), | |
| "max": round(max(values), 2), | |
| "avg": round(statistics.mean(values), 2), | |
| "p95": round(sorted_v[p95_idx], 2), | |
| } | |
| def all_stats(self) -> Dict[str, Dict[str, float]]: | |
| return {node: self.stats(node) for node in sorted(self._latencies.keys())} | |
| def summary_text(self) -> str: | |
| """๋ ์ดํด์ ์์ฝ์ ํ ์คํธ๋ก ๋ฐํ.""" | |
| lines = [] | |
| for node, s in self.all_stats().items(): | |
| lines.append( | |
| f" {node}: avg={s['avg']:.0f}ms p95={s['p95']:.0f}ms " | |
| f"min={s['min']:.0f}ms max={s['max']:.0f}ms (n={s['count']})" | |
| ) | |
| return "\n".join(lines) if lines else " (no latency data)" | |