AIBRUH commited on
Commit
ac8abe2
·
verified ·
1 Parent(s): 12ed199

Upload folder using huggingface_hub

Browse files
Files changed (4) hide show
  1. router/__init__.py +0 -0
  2. router/agents.py +344 -0
  3. router/main.py +504 -0
  4. router/metrics.py +150 -0
router/__init__.py ADDED
File without changes
router/agents.py ADDED
@@ -0,0 +1,344 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """EDEN OS V2 — Grok-Powered Intelligent Pipeline Agents.
2
+
3
+ Four specialized agents + AgentManager coordinator.
4
+ Primary: xAI Grok API for fast intelligent pipeline switching.
5
+ Fallback: Static SIZE_ORDER routing with zero API calls.
6
+
7
+ All agents are optional — if XAI_API_KEY not set, falls back to static routing.
8
+ Uses Grok-4-fast (non-reasoning) for sub-second decisions with 2s timeout, 10s caching.
9
+ """
10
+
11
+ import asyncio
12
+ import json
13
+ import logging
14
+ import os
15
+ import time
16
+
17
+ from openai import AsyncOpenAI
18
+
19
+ from .metrics import MetricsStore, SIZE_ORDER, PIPELINE_NAMES, PIPELINE_SIZES_GB
20
+
21
+ logger = logging.getLogger("eden.agents")
22
+
23
+ _xai_key = os.environ.get("XAI_API_KEY", "")
24
+ AGENTS_ENABLED = bool(_xai_key)
25
+
26
+
27
+ class BaseAgent:
28
+ """Base class for all Grok-powered agents."""
29
+
30
+ def __init__(self, name: str):
31
+ self.name = name
32
+ self.enabled = AGENTS_ENABLED
33
+ self._client: AsyncOpenAI | None = None
34
+ self._cache: dict[str, tuple[float, any]] = {}
35
+ self._cache_ttl = 10.0 # seconds
36
+
37
+ def _get_client(self) -> AsyncOpenAI | None:
38
+ if self._client is None and self.enabled:
39
+ self._client = AsyncOpenAI(
40
+ api_key=_xai_key,
41
+ base_url="https://api.x.ai/v1",
42
+ )
43
+ return self._client
44
+
45
+ def _get_cached(self, key: str):
46
+ if key in self._cache:
47
+ ts, val = self._cache[key]
48
+ if time.time() - ts < self._cache_ttl:
49
+ return val
50
+ return None
51
+
52
+ def _set_cached(self, key: str, val):
53
+ self._cache[key] = (time.time(), val)
54
+
55
+ async def _ask_grok(self, system: str, prompt: str, max_tokens: int = 200) -> str:
56
+ """Send a message to Grok-4-fast with 2s timeout. Returns empty string on failure."""
57
+ client = self._get_client()
58
+ if not client:
59
+ return ""
60
+ try:
61
+ resp = await asyncio.wait_for(
62
+ client.chat.completions.create(
63
+ model="grok-4-fast-non-reasoning",
64
+ messages=[
65
+ {"role": "system", "content": system},
66
+ {"role": "user", "content": prompt},
67
+ ],
68
+ max_tokens=max_tokens,
69
+ temperature=0.0,
70
+ ),
71
+ timeout=2.0,
72
+ )
73
+ return resp.choices[0].message.content
74
+ except asyncio.TimeoutError:
75
+ logger.warning(f"Agent {self.name}: Grok timeout (2s)")
76
+ return ""
77
+ except Exception as e:
78
+ logger.warning(f"Agent {self.name}: Grok error: {e}")
79
+ return ""
80
+
81
+ def _parse_json(self, text: str) -> dict | list | None:
82
+ """Safely parse JSON from Grok response, handling markdown fences."""
83
+ if not text:
84
+ return None
85
+ # Strip markdown code fences if present
86
+ cleaned = text.strip()
87
+ if cleaned.startswith("```"):
88
+ lines = cleaned.split("\n")
89
+ lines = [l for l in lines if not l.strip().startswith("```")]
90
+ cleaned = "\n".join(lines).strip()
91
+ try:
92
+ return json.loads(cleaned)
93
+ except (json.JSONDecodeError, ValueError):
94
+ return None
95
+
96
+
97
+ class PipelineSpeedAgent(BaseAgent):
98
+ """Monitors response times, dynamically reranks pipelines for fastest switching."""
99
+
100
+ def __init__(self):
101
+ super().__init__("speed")
102
+
103
+ async def evaluate(self, metrics_store: MetricsStore) -> list[int]:
104
+ """Return recommended pipeline order based on speed + success rate."""
105
+ cached = self._get_cached("order")
106
+ if cached is not None:
107
+ return cached
108
+
109
+ if not self.enabled:
110
+ return list(SIZE_ORDER)
111
+
112
+ all_metrics = metrics_store.get_all_metrics()
113
+
114
+ # Don't call Grok if we have insufficient data
115
+ total_calls = sum(all_metrics[pid].total_calls for pid in range(5))
116
+ if total_calls < 5:
117
+ return list(SIZE_ORDER)
118
+
119
+ summary_lines = []
120
+ for pid in SIZE_ORDER:
121
+ m = all_metrics[pid]
122
+ summary_lines.append(
123
+ f"P{pid} ({m.name}): size={m.model_size_gb}GB, "
124
+ f"avg_response={m.avg_response_time_ms:.0f}ms, "
125
+ f"success_rate={m.success_rate:.1%}, "
126
+ f"avg_motion={m.avg_motion_score:.3f}, "
127
+ f"calls={m.total_calls}"
128
+ )
129
+
130
+ result = await self._ask_grok(
131
+ system="You are an AI pipeline optimizer. Return ONLY a JSON array of integer pipeline IDs. No explanation.",
132
+ prompt=(
133
+ "Given these pipeline performance metrics:\n"
134
+ + "\n".join(summary_lines)
135
+ + "\n\nReturn the optimal pipeline order as a JSON array. "
136
+ "Prioritize: 1) fastest response time, 2) highest success rate, "
137
+ "3) smallest model size as tiebreaker."
138
+ ),
139
+ )
140
+
141
+ parsed = self._parse_json(result)
142
+ if isinstance(parsed, list) and len(parsed) == 5 and all(isinstance(x, int) for x in parsed):
143
+ self._set_cached("order", parsed)
144
+ logger.info(f"Speed agent recommended order: {parsed}")
145
+ return parsed
146
+
147
+ return list(SIZE_ORDER)
148
+
149
+
150
+ class QualityIntelligenceAgent(BaseAgent):
151
+ """Analyzes motion score trends, predicts best pipeline for current conditions."""
152
+
153
+ def __init__(self):
154
+ super().__init__("quality")
155
+
156
+ async def evaluate(self, metrics_store: MetricsStore) -> dict:
157
+ """Return best pipeline recommendation."""
158
+ cached = self._get_cached("best")
159
+ if cached is not None:
160
+ return cached
161
+
162
+ all_metrics = metrics_store.get_all_metrics()
163
+
164
+ if not self.enabled:
165
+ best_pid = max(
166
+ SIZE_ORDER,
167
+ key=lambda pid: all_metrics[pid].avg_motion_score if all_metrics[pid].total_calls > 0 else 0.0,
168
+ )
169
+ return {"best_pipeline": best_pid, "confidence": 0.5, "reason": "static_fallback"}
170
+
171
+ total_calls = sum(all_metrics[pid].total_calls for pid in range(5))
172
+ if total_calls < 5:
173
+ return {"best_pipeline": SIZE_ORDER[0], "confidence": 0.3, "reason": "insufficient_data"}
174
+
175
+ summary_lines = []
176
+ for pid in range(5):
177
+ m = all_metrics[pid]
178
+ summary_lines.append(
179
+ f"P{pid} ({m.name}): avg_motion={m.avg_motion_score:.3f}, "
180
+ f"trend={m.trend}, calls={m.total_calls}, "
181
+ f"success_rate={m.success_rate:.1%}"
182
+ )
183
+
184
+ result = await self._ask_grok(
185
+ system="You are an animation quality analyst. Return ONLY JSON, no explanation.",
186
+ prompt=(
187
+ "Pipeline motion quality metrics:\n"
188
+ + "\n".join(summary_lines)
189
+ + '\n\nWhich pipeline produces the best animation? '
190
+ 'Return: {"best_pipeline": <int>, "confidence": <0-1>, "reason": "<brief>"}'
191
+ ),
192
+ )
193
+
194
+ parsed = self._parse_json(result)
195
+ if isinstance(parsed, dict) and "best_pipeline" in parsed:
196
+ self._set_cached("best", parsed)
197
+ logger.info(f"Quality agent: best=P{parsed['best_pipeline']}, reason={parsed.get('reason')}")
198
+ return parsed
199
+
200
+ return {"best_pipeline": SIZE_ORDER[0], "confidence": 0.3, "reason": "agent_error"}
201
+
202
+
203
+ class FailoverDecisionAgent(BaseAgent):
204
+ """Makes intelligent failover decisions instead of hard thresholds."""
205
+
206
+ def __init__(self):
207
+ super().__init__("failover")
208
+
209
+ async def evaluate(
210
+ self,
211
+ motion_score: float,
212
+ pipeline_id: int,
213
+ consecutive_bad: int,
214
+ metrics_store: MetricsStore,
215
+ ) -> dict:
216
+ """Decide whether to failover — smarter than a dumb threshold."""
217
+ # Hard failover: always trigger if completely static for 2+ checks
218
+ if motion_score == 0.0 and consecutive_bad >= 2:
219
+ return {"should_failover": True, "reason": "completely_static"}
220
+
221
+ if not self.enabled:
222
+ return {"should_failover": motion_score < 0.05, "reason": "threshold_fallback"}
223
+
224
+ m = metrics_store.get_metrics(pipeline_id)
225
+ if not m:
226
+ return {"should_failover": motion_score < 0.05, "reason": "no_metrics"}
227
+
228
+ result = await self._ask_grok(
229
+ system="You are a real-time systems reliability engineer. Minimize unnecessary failovers. Return ONLY JSON.",
230
+ prompt=(
231
+ f"Pipeline P{pipeline_id} ({PIPELINE_NAMES.get(pipeline_id, '?')}) status:\n"
232
+ f"- Current motion score: {motion_score:.3f} (threshold: 0.05)\n"
233
+ f"- Consecutive bad checks: {consecutive_bad}\n"
234
+ f"- Average motion score: {m.avg_motion_score:.3f}\n"
235
+ f"- Trend: {m.trend}\n"
236
+ f"- Success rate: {m.success_rate:.1%}\n"
237
+ f"- Total calls: {m.total_calls}\n\n"
238
+ "Should we failover? Consider: is it warming up? Is the trend improving? "
239
+ "Is unnecessary switching costly?\n"
240
+ 'Return: {"should_failover": true/false, "reason": "<brief>"}'
241
+ ),
242
+ )
243
+
244
+ parsed = self._parse_json(result)
245
+ if isinstance(parsed, dict) and "should_failover" in parsed:
246
+ logger.info(
247
+ f"Failover agent: P{pipeline_id} motion={motion_score:.3f} → "
248
+ f"failover={parsed['should_failover']}, reason={parsed.get('reason')}"
249
+ )
250
+ return parsed
251
+
252
+ # Fallback to threshold
253
+ return {"should_failover": motion_score < 0.05, "reason": "agent_fallback"}
254
+
255
+
256
+ class PreemptiveWarmupAgent(BaseAgent):
257
+ """Predicts which pipeline will be needed next and recommends pre-warming."""
258
+
259
+ def __init__(self):
260
+ super().__init__("warmup")
261
+
262
+ async def evaluate(self, current_pipeline_id: int, metrics_store: MetricsStore) -> dict:
263
+ """Recommend which pipeline to pre-warm."""
264
+ idx = SIZE_ORDER.index(current_pipeline_id) if current_pipeline_id in SIZE_ORDER else 0
265
+ next_pid = SIZE_ORDER[idx + 1] if idx + 1 < len(SIZE_ORDER) else None
266
+
267
+ if not self.enabled:
268
+ return {"warmup_pipeline": next_pid, "reason": "static_next_in_order"}
269
+
270
+ m = metrics_store.get_metrics(current_pipeline_id)
271
+ if not m:
272
+ return {"warmup_pipeline": next_pid, "reason": "no_metrics"}
273
+
274
+ result = await self._ask_grok(
275
+ system="You are a predictive infrastructure manager. Return ONLY JSON.",
276
+ prompt=(
277
+ f"Current pipeline: P{current_pipeline_id} ({PIPELINE_NAMES.get(current_pipeline_id)})\n"
278
+ f"- Trend: {m.trend}, Success rate: {m.success_rate:.1%}, Avg motion: {m.avg_motion_score:.3f}\n"
279
+ f"Available (by size): {[f'P{pid} ({PIPELINE_SIZES_GB[pid]}GB)' for pid in SIZE_ORDER]}\n"
280
+ 'Which to pre-warm? Return: {"warmup_pipeline": <int or null>, "reason": "<brief>"}'
281
+ ),
282
+ )
283
+
284
+ parsed = self._parse_json(result)
285
+ if isinstance(parsed, dict) and "warmup_pipeline" in parsed:
286
+ logger.info(f"Warmup agent: recommend P{parsed['warmup_pipeline']}, reason={parsed.get('reason')}")
287
+ return parsed
288
+
289
+ return {"warmup_pipeline": next_pid, "reason": "agent_fallback"}
290
+
291
+
292
+ class AgentManager:
293
+ """Coordinator for all pipeline intelligence agents.
294
+
295
+ Primary: Grok-4-fast via xAI API (fast, cheap, already paid for)
296
+ Fallback: Static SIZE_ORDER routing (zero API calls)
297
+ """
298
+
299
+ def __init__(self, metrics_store: MetricsStore):
300
+ self.metrics_store = metrics_store
301
+ self.speed_agent = PipelineSpeedAgent()
302
+ self.quality_agent = QualityIntelligenceAgent()
303
+ self.failover_agent = FailoverDecisionAgent()
304
+ self.warmup_agent = PreemptiveWarmupAgent()
305
+ self.enabled = AGENTS_ENABLED
306
+ logger.info(f"AgentManager initialized. Grok agents enabled: {self.enabled}")
307
+
308
+ async def get_routing_order(self, force_strong: bool = False) -> list[int]:
309
+ """Get agent-recommended pipeline order."""
310
+ if force_strong:
311
+ return list(reversed(SIZE_ORDER))
312
+ return await self.speed_agent.evaluate(self.metrics_store)
313
+
314
+ async def should_failover(self, motion_score: float, pipeline_id: int, consecutive_bad: int = 0) -> bool:
315
+ """Agent-enhanced failover decision."""
316
+ result = await self.failover_agent.evaluate(
317
+ motion_score=motion_score,
318
+ pipeline_id=pipeline_id,
319
+ consecutive_bad=consecutive_bad,
320
+ metrics_store=self.metrics_store,
321
+ )
322
+ return result.get("should_failover", motion_score < 0.05)
323
+
324
+ async def recommend_warmup(self, current_pipeline_id: int) -> int | None:
325
+ """Get warmup recommendation."""
326
+ result = await self.warmup_agent.evaluate(current_pipeline_id, self.metrics_store)
327
+ return result.get("warmup_pipeline")
328
+
329
+ async def get_quality_recommendation(self) -> dict:
330
+ """Get quality agent's pipeline recommendation."""
331
+ return await self.quality_agent.evaluate(self.metrics_store)
332
+
333
+ def status(self) -> dict:
334
+ return {
335
+ "agents_enabled": self.enabled,
336
+ "engine": "grok-4-fast" if self.enabled else "static",
337
+ "xai_key_set": bool(_xai_key),
338
+ "agents": {
339
+ "speed": self.speed_agent.enabled,
340
+ "quality": self.quality_agent.enabled,
341
+ "failover": self.failover_agent.enabled,
342
+ "warmup": self.warmup_agent.enabled,
343
+ },
344
+ }
router/main.py ADDED
@@ -0,0 +1,504 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """EDEN OS V2 — Intelligent Pipeline Router.
2
+
3
+ Dual-Track Architecture:
4
+ Track 1 (Main Pipeline): Smallest/fastest pipeline first (P4 → P0 → P2)
5
+ Track 2 (Backup Router): Escalates through remaining (P3 → P1)
6
+
7
+ 5 Auto-Routing Features (preserved):
8
+ Feature 1: Pre-Warm Greeting Sequence
9
+ Feature 2: Dedicated Eve-Greeting Sub-Pipeline (force strongest)
10
+ Feature 3: Auto-Retry with Immediate Failover (up to 3 retries)
11
+ Feature 4: Heartbeat Pre-Check (quality gate before client delivery)
12
+ Feature 5: Client-Side Animation Fallback flag + server push
13
+
14
+ Intelligence Layer:
15
+ - 4 Claude-powered agents for speed, quality, failover, and warmup decisions
16
+ - Graceful fallback to static SIZE_ORDER if agents unavailable
17
+
18
+ RunPod Cost Management:
19
+ - 5-minute idle sleep timer (auto-stop pod after inactivity)
20
+ """
21
+
22
+ import asyncio
23
+ import base64
24
+ import logging
25
+ import os
26
+ import time
27
+ from enum import Enum
28
+
29
+ import httpx
30
+ import numpy as np
31
+ from fastapi import FastAPI
32
+ from pydantic import BaseModel
33
+ from pydantic_settings import BaseSettings
34
+
35
+ from .metrics import MetricsStore, SIZE_ORDER, PIPELINE_SIZES_GB, PIPELINE_NAMES
36
+ from .agents import AgentManager
37
+
38
+ logger = logging.getLogger("eden.router")
39
+
40
+
41
+ class RouterSettings(BaseSettings):
42
+ p0_url: str = "http://pipeline0:8010"
43
+ p1_url: str = "http://pipeline1:8011"
44
+ p2_url: str = "http://pipeline2:8012"
45
+ p3_url: str = "http://pipeline3:8013"
46
+ p4_url: str = "http://pipeline4:8014"
47
+ models_dir: str = "/models"
48
+ xai_api_key: str = ""
49
+ runpod_idle_timeout_s: float = 600.0 # 10 minutes
50
+ runpod_api_key: str = ""
51
+ runpod_pod_id: str = ""
52
+ model_config = {"env_file": ".env", "extra": "ignore"}
53
+
54
+
55
+ cfg = RouterSettings()
56
+
57
+ app = FastAPI(title="EDEN Router", version="2.1.0")
58
+
59
+
60
+ # ── Pipeline Status ──────────────────────────────────────────────────────────
61
+ class PipelineStatus(str, Enum):
62
+ COLD = "cold"
63
+ WARMING = "warming"
64
+ READY = "ready"
65
+ BUSY = "busy"
66
+ FAILED = "failed"
67
+
68
+
69
+ class PipelineInfo:
70
+ def __init__(self, pid: int, name: str, url: str, size_gb: float):
71
+ self.pid = pid
72
+ self.name = name
73
+ self.url = url
74
+ self.size_gb = size_gb
75
+ self.status = PipelineStatus.COLD
76
+ self.last_motion_score: float = 0.0
77
+ self.fail_count: int = 0
78
+
79
+ def to_dict(self):
80
+ return {
81
+ "id": self.pid,
82
+ "name": self.name,
83
+ "status": self.status.value,
84
+ "size_gb": self.size_gb,
85
+ "last_motion_score": self.last_motion_score,
86
+ "fail_count": self.fail_count,
87
+ }
88
+
89
+
90
+ # ── Pipeline Registry (ordered by SIZE: smallest → largest) ──────────────────
91
+ _pipeline_urls = {
92
+ 0: cfg.p0_url,
93
+ 1: cfg.p1_url,
94
+ 2: cfg.p2_url,
95
+ 3: cfg.p3_url,
96
+ 4: cfg.p4_url,
97
+ }
98
+
99
+ pipelines_by_id: dict[int, PipelineInfo] = {
100
+ pid: PipelineInfo(
101
+ pid=pid,
102
+ name=PIPELINE_NAMES[pid],
103
+ url=_pipeline_urls[pid],
104
+ size_gb=PIPELINE_SIZES_GB[pid],
105
+ )
106
+ for pid in range(5)
107
+ }
108
+
109
+ # Canonical order: smallest → largest (for iteration and failover)
110
+ pipelines: list[PipelineInfo] = [pipelines_by_id[pid] for pid in SIZE_ORDER]
111
+
112
+
113
+ # ── Intelligence Layer ───────────────────────────────────────────────────────
114
+ metrics_store = MetricsStore()
115
+ agent_manager = AgentManager(metrics_store)
116
+
117
+
118
+ # ── RunPod Idle Sleep Timer ──────────────────────────────────────────────────
119
+ _last_activity: float = time.time()
120
+ _sleep_task: asyncio.Task | None = None
121
+
122
+
123
+ def _touch_activity():
124
+ """Reset the idle timer on any request."""
125
+ global _last_activity
126
+ _last_activity = time.time()
127
+
128
+
129
+ async def _idle_sleep_loop():
130
+ """Monitor for inactivity and stop RunPod pod after 5 minutes idle."""
131
+ while True:
132
+ await asyncio.sleep(30) # check every 30s
133
+ idle_seconds = time.time() - _last_activity
134
+ if idle_seconds >= cfg.runpod_idle_timeout_s:
135
+ logger.warning(
136
+ f"RunPod idle for {idle_seconds:.0f}s (limit={cfg.runpod_idle_timeout_s:.0f}s). "
137
+ "Requesting pod stop..."
138
+ )
139
+ await _stop_runpod_pod()
140
+ break # stop the loop after requesting shutdown
141
+
142
+
143
+ async def _stop_runpod_pod():
144
+ """Stop the RunPod pod via API to save costs."""
145
+ if not cfg.runpod_api_key or not cfg.runpod_pod_id:
146
+ logger.info("RunPod auto-sleep: no API key or pod ID configured, skipping")
147
+ return
148
+ try:
149
+ async with httpx.AsyncClient(timeout=10.0) as client:
150
+ resp = await client.post(
151
+ f"https://api.runpod.io/v2/{cfg.runpod_pod_id}/stop",
152
+ headers={"Authorization": f"Bearer {cfg.runpod_api_key}"},
153
+ )
154
+ logger.info(f"RunPod pod stop requested: {resp.status_code}")
155
+ except Exception as e:
156
+ logger.error(f"Failed to stop RunPod pod: {e}")
157
+
158
+
159
+ # ── Request Models ───────────────────────────────────────────────────────────
160
+ class AnimateRequest(BaseModel):
161
+ audio_b64: str
162
+ reference_image: str = "eve-NATURAL.png"
163
+ force_strong: bool = False
164
+ request_id: str = ""
165
+
166
+
167
+ class EvaluateFailoverRequest(BaseModel):
168
+ pipeline_id: int
169
+ motion_score: float
170
+ consecutive_bad: int = 0
171
+ frame_count: int = 0
172
+
173
+
174
+ # ── Motion Scoring ───────────────────────────────────────────────────────────
175
+ def compute_motion_score(frames_b64: list[str]) -> float:
176
+ """Compute motion score from base64-encoded frames. Returns 0.0–1.0."""
177
+ if len(frames_b64) < 2:
178
+ return 0.0
179
+ try:
180
+ decoded = []
181
+ for f in frames_b64[:15]:
182
+ raw = base64.b64decode(f)
183
+ arr = np.frombuffer(raw, dtype=np.uint8)
184
+ decoded.append(arr)
185
+
186
+ diffs = []
187
+ for i in range(1, len(decoded)):
188
+ min_len = min(len(decoded[i - 1]), len(decoded[i]))
189
+ diff = np.mean(np.abs(decoded[i][:min_len].astype(float) - decoded[i - 1][:min_len].astype(float)))
190
+ diffs.append(diff)
191
+
192
+ avg_diff = np.mean(diffs) if diffs else 0.0
193
+ return float(min(1.0, avg_diff / 25.0))
194
+ except Exception as e:
195
+ logger.warning(f"Motion score computation failed: {e}")
196
+ return 0.5
197
+
198
+
199
+ # ── Dual-Track Architecture ─────────────────────────────────────────────────
200
+ def _reset_stale_failures():
201
+ """Reset pipelines that were marked failed during startup or transiently.
202
+ This allows recovery when a pipeline comes online after the router starts."""
203
+ for p in pipelines:
204
+ if p.status == PipelineStatus.FAILED and p.fail_count < 10:
205
+ p.status = PipelineStatus.COLD
206
+ p.fail_count = 0
207
+ logger.info(f"Reset stale failure for {p.name}")
208
+
209
+
210
+ class MainPipeline:
211
+ """Track 1: Always tries the smallest/fastest available pipeline."""
212
+
213
+ @staticmethod
214
+ def get_primary(agent_order: list[int] | None = None) -> PipelineInfo | None:
215
+ order = agent_order or SIZE_ORDER
216
+ for pid in order:
217
+ p = pipelines_by_id.get(pid)
218
+ if p and p.status != PipelineStatus.FAILED:
219
+ return p
220
+ # If all failed, reset and try again (recovery from startup failures)
221
+ _reset_stale_failures()
222
+ for pid in order:
223
+ p = pipelines_by_id.get(pid)
224
+ if p:
225
+ return p
226
+ return None
227
+
228
+
229
+ class BackupRouter:
230
+ """Track 2: Escalates through remaining pipelines on failure."""
231
+
232
+ @staticmethod
233
+ def get_escalation_order(exclude_pid: int, agent_order: list[int] | None = None) -> list[PipelineInfo]:
234
+ order = agent_order or SIZE_ORDER
235
+ result = []
236
+ for pid in order:
237
+ if pid == exclude_pid:
238
+ continue
239
+ p = pipelines_by_id.get(pid)
240
+ if p and p.status != PipelineStatus.FAILED:
241
+ result.append(p)
242
+ # If empty, reset failures and try again
243
+ if not result:
244
+ _reset_stale_failures()
245
+ for pid in order:
246
+ if pid == exclude_pid:
247
+ continue
248
+ p = pipelines_by_id.get(pid)
249
+ if p:
250
+ result.append(p)
251
+ return result
252
+
253
+
254
+ # ── Pipeline Communication ───────────────────────────────────────────────────
255
+ async def _call_pipeline(pipeline: PipelineInfo, request: AnimateRequest) -> dict | None:
256
+ """Call a single pipeline and return result or None on failure."""
257
+ t0 = time.time()
258
+ try:
259
+ async with httpx.AsyncClient(timeout=90.0) as client:
260
+ resp = await client.post(
261
+ f"{pipeline.url}/animate",
262
+ json={
263
+ "audio_b64": request.audio_b64,
264
+ "reference_image": request.reference_image,
265
+ "request_id": request.request_id,
266
+ },
267
+ )
268
+ elapsed_ms = (time.time() - t0) * 1000
269
+ if resp.status_code == 200:
270
+ result = resp.json()
271
+ pipeline.status = PipelineStatus.READY
272
+ pipeline.fail_count = 0
273
+ # Record success metrics
274
+ motion = compute_motion_score(result.get("frames", []))
275
+ metrics_store.record_call(pipeline.pid, elapsed_ms, motion, success=True)
276
+ return result
277
+ else:
278
+ pipeline.fail_count += 1
279
+ metrics_store.record_call(pipeline.pid, elapsed_ms, 0.0, success=False)
280
+ logger.warning(f"Pipeline {pipeline.name} returned {resp.status_code}")
281
+ return None
282
+ except Exception as e:
283
+ elapsed_ms = (time.time() - t0) * 1000
284
+ pipeline.fail_count += 1
285
+ pipeline.status = PipelineStatus.FAILED
286
+ metrics_store.record_call(pipeline.pid, elapsed_ms, 0.0, success=False)
287
+ logger.error(f"Pipeline {pipeline.name} call failed: {e}")
288
+ return None
289
+
290
+
291
+ # ── Feature 1: Pre-Warm ─────────────────────────────────────────────────────
292
+ prewarm_scores: dict[int, float] = {}
293
+
294
+
295
+ async def _prewarm_pipeline(pipeline: PipelineInfo):
296
+ """Run a silent test animation to warm up the pipeline."""
297
+ t0 = time.time()
298
+ try:
299
+ pipeline.status = PipelineStatus.WARMING
300
+ async with httpx.AsyncClient(timeout=30.0) as client:
301
+ resp = await client.post(
302
+ f"{pipeline.url}/warmup",
303
+ json={"reference_image": "eve-NATURAL.png"},
304
+ )
305
+ load_ms = (time.time() - t0) * 1000
306
+ if resp.status_code == 200:
307
+ data = resp.json()
308
+ score = data.get("motion_score", 0.0)
309
+ prewarm_scores[pipeline.pid] = score
310
+ pipeline.status = PipelineStatus.READY
311
+ pipeline.last_motion_score = score
312
+ metrics_store.record_load_time(pipeline.pid, load_ms)
313
+ logger.info(f"Pre-warm {pipeline.name} ({pipeline.size_gb}GB): score={score:.2f}, load={load_ms:.0f}ms")
314
+ else:
315
+ pipeline.status = PipelineStatus.COLD
316
+ except Exception as e:
317
+ logger.warning(f"Pre-warm {pipeline.name} failed: {e}")
318
+ pipeline.status = PipelineStatus.COLD
319
+
320
+
321
+ # ── Main Animation Endpoint ─────────────────────────────────────────────────
322
+ @app.post("/animate")
323
+ async def animate(request: AnimateRequest):
324
+ """Route animation through dual-track architecture with agent intelligence."""
325
+ _touch_activity()
326
+ t0 = time.time()
327
+
328
+ # Get agent-recommended pipeline order
329
+ agent_order = await agent_manager.get_routing_order(force_strong=request.force_strong)
330
+ logger.info(f"Routing order: {[PIPELINE_NAMES[pid] for pid in agent_order]} (force_strong={request.force_strong})")
331
+
332
+ # Track 1: Try primary pipeline (smallest/fastest)
333
+ primary = MainPipeline.get_primary(agent_order)
334
+ if not primary:
335
+ return {"frames": [], "pipeline_used": "none", "error": "No pipelines available"}
336
+
337
+ logger.info(f"Track 1 — Primary: {primary.name} ({primary.size_gb}GB)")
338
+ result = await _call_pipeline(primary, request)
339
+
340
+ if result:
341
+ frames = result.get("frames", [])
342
+ if frames:
343
+ motion_score = compute_motion_score(frames)
344
+ primary.last_motion_score = motion_score
345
+
346
+ # Feature 4: Heartbeat Pre-Check
347
+ should_fail = await agent_manager.should_failover(motion_score, primary.pid)
348
+ if not should_fail:
349
+ elapsed = time.time() - t0
350
+ logger.info(f"Track 1 SUCCESS: {primary.name}, motion={motion_score:.3f}")
351
+
352
+ # Fire-and-forget: agent recommends next warmup
353
+ asyncio.create_task(_agent_warmup(primary.pid))
354
+
355
+ return {
356
+ "frames": frames,
357
+ "pipeline_used": primary.name,
358
+ "pipeline_id": primary.pid,
359
+ "motion_score": round(motion_score, 3),
360
+ "attempt": 1,
361
+ "elapsed_s": round(elapsed, 2),
362
+ "track": "main",
363
+ "force_strong_pipeline": request.force_strong,
364
+ }
365
+ else:
366
+ logger.warning(f"Track 1 QUALITY FAIL: {primary.name} motion={motion_score:.3f}, escalating to Track 2")
367
+ primary.status = PipelineStatus.FAILED
368
+
369
+ # Track 2: Backup Router — escalate through remaining pipelines
370
+ backups = BackupRouter.get_escalation_order(primary.pid, agent_order)
371
+ logger.info(f"Track 2 — Backups: {[p.name for p in backups]}")
372
+
373
+ for attempt, backup in enumerate(backups[:3]): # Feature 3: up to 3 retries
374
+ logger.info(f"Track 2 attempt {attempt + 1}: trying {backup.name} ({backup.size_gb}GB)")
375
+
376
+ result = await _call_pipeline(backup, request)
377
+ if result is None:
378
+ continue
379
+
380
+ frames = result.get("frames", [])
381
+ if not frames:
382
+ continue
383
+
384
+ motion_score = compute_motion_score(frames)
385
+ backup.last_motion_score = motion_score
386
+
387
+ should_fail = await agent_manager.should_failover(motion_score, backup.pid)
388
+ if should_fail:
389
+ logger.warning(f"Track 2: {backup.name} motion={motion_score:.3f}, trying next...")
390
+ backup.status = PipelineStatus.FAILED
391
+ continue
392
+
393
+ elapsed = time.time() - t0
394
+ logger.info(f"Track 2 SUCCESS: {backup.name}, motion={motion_score:.3f}")
395
+ return {
396
+ "frames": frames,
397
+ "pipeline_used": backup.name,
398
+ "pipeline_id": backup.pid,
399
+ "motion_score": round(motion_score, 3),
400
+ "attempt": attempt + 2,
401
+ "elapsed_s": round(elapsed, 2),
402
+ "track": "backup",
403
+ "force_strong_pipeline": request.force_strong,
404
+ }
405
+
406
+ # All failed
407
+ elapsed = time.time() - t0
408
+ return {
409
+ "frames": [],
410
+ "pipeline_used": "none",
411
+ "error": "All pipelines failed or produced static frames",
412
+ "elapsed_s": round(elapsed, 2),
413
+ "force_strong_pipeline": True, # Feature 5: CSS fallback
414
+ }
415
+
416
+
417
+ async def _agent_warmup(current_pid: int):
418
+ """Fire-and-forget: ask agent which pipeline to pre-warm next."""
419
+ try:
420
+ warmup_pid = await agent_manager.recommend_warmup(current_pid)
421
+ if warmup_pid is not None and warmup_pid in pipelines_by_id:
422
+ p = pipelines_by_id[warmup_pid]
423
+ if p.status == PipelineStatus.COLD:
424
+ logger.info(f"Agent warmup: pre-warming {p.name}")
425
+ await _prewarm_pipeline(p)
426
+ except Exception as e:
427
+ logger.warning(f"Agent warmup failed: {e}")
428
+
429
+
430
+ # ── Failover Endpoints ──────────────────────────────────────────────────────
431
+ @app.post("/failover")
432
+ async def failover(pipeline_id: int):
433
+ """Watchdog-triggered failover: mark pipeline as failed."""
434
+ _touch_activity()
435
+ p = pipelines_by_id.get(pipeline_id)
436
+ if p:
437
+ p.status = PipelineStatus.FAILED
438
+ p.fail_count += 1
439
+ logger.warning(f"Failover: {p.name} marked FAILED (count={p.fail_count})")
440
+ return {"status": "ok", "pipelines": [p.to_dict() for p in pipelines]}
441
+
442
+
443
+ @app.post("/evaluate-failover")
444
+ async def evaluate_failover(request: EvaluateFailoverRequest):
445
+ """Agent-enhanced failover decision (called by watchdog)."""
446
+ _touch_activity()
447
+ should = await agent_manager.should_failover(
448
+ motion_score=request.motion_score,
449
+ pipeline_id=request.pipeline_id,
450
+ consecutive_bad=request.consecutive_bad,
451
+ )
452
+ return {"should_failover": should}
453
+
454
+
455
+ # ── Status & Metrics Endpoints ───────────────────────────────────────────────
456
+ @app.get("/health")
457
+ async def health():
458
+ return {
459
+ "status": "ok",
460
+ "version": "2.1.0",
461
+ "pipelines": [p.to_dict() for p in pipelines],
462
+ "pipeline_order": [PIPELINE_NAMES[pid] for pid in SIZE_ORDER],
463
+ "prewarm_scores": prewarm_scores,
464
+ "agents": agent_manager.status(),
465
+ "idle_timeout_s": cfg.runpod_idle_timeout_s,
466
+ }
467
+
468
+
469
+ @app.get("/status")
470
+ async def status():
471
+ return {
472
+ "pipelines": [p.to_dict() for p in pipelines],
473
+ "pipeline_order": SIZE_ORDER,
474
+ "agents": agent_manager.status(),
475
+ }
476
+
477
+
478
+ @app.get("/metrics")
479
+ async def metrics():
480
+ """Pipeline performance metrics."""
481
+ return metrics_store.get_summary()
482
+
483
+
484
+ # ── Startup ─────────────────────────────────────────────────────────────────
485
+ @app.on_event("startup")
486
+ async def startup():
487
+ global _sleep_task
488
+ logger.info("=" * 60)
489
+ logger.info("EDEN Router v2.1.0 starting...")
490
+ logger.info(f" Pipeline order (size): {[f'{PIPELINE_NAMES[pid]} ({PIPELINE_SIZES_GB[pid]}GB)' for pid in SIZE_ORDER]}")
491
+ logger.info(f" Agents enabled: {agent_manager.enabled}")
492
+ logger.info(f" RunPod idle timeout: {cfg.runpod_idle_timeout_s}s")
493
+ logger.info("=" * 60)
494
+
495
+ # Feature 1: Pre-Warm in SIZE_ORDER (smallest first = fastest to warm)
496
+ logger.info("Pre-warming pipelines (smallest first)...")
497
+ for pid in SIZE_ORDER:
498
+ p = pipelines_by_id[pid]
499
+ await _prewarm_pipeline(p)
500
+ logger.info(f"Pre-warm complete. Scores: {prewarm_scores}")
501
+
502
+ # Start RunPod idle sleep timer
503
+ _sleep_task = asyncio.create_task(_idle_sleep_loop())
504
+ logger.info("RunPod idle sleep timer started (5 min)")
router/metrics.py ADDED
@@ -0,0 +1,150 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """EDEN OS V2 — Pipeline Performance Metrics.
2
+
3
+ Tracks per-pipeline response times, motion scores, success rates,
4
+ and load times in a thread-safe sliding window.
5
+ """
6
+
7
+ import threading
8
+ import time
9
+ from collections import deque
10
+ from dataclasses import dataclass, field
11
+
12
+
13
+ # Pipeline sizes in GB — determines failover order (smallest = fastest to load)
14
+ PIPELINE_SIZES_GB: dict[int, float] = {
15
+ 4: 1.26, # LiveAvatar FP8
16
+ 0: 6.37, # MuseTalk
17
+ 2: 6.45, # Ditto
18
+ 3: 18.49, # StableAvatar
19
+ 1: 85.02, # InfiniteTalk
20
+ }
21
+
22
+ # Canonical failover order: smallest → largest
23
+ SIZE_ORDER: list[int] = [4, 0, 2, 3, 1]
24
+
25
+ PIPELINE_NAMES: dict[int, str] = {
26
+ 0: "musetalk",
27
+ 1: "infinitetalk",
28
+ 2: "ditto",
29
+ 3: "stableavatar",
30
+ 4: "liveavatar",
31
+ }
32
+
33
+
34
+ @dataclass
35
+ class PipelineMetrics:
36
+ """Performance metrics for a single pipeline."""
37
+
38
+ pipeline_id: int
39
+ name: str = ""
40
+ model_size_gb: float = 0.0
41
+ response_times_ms: deque = field(default_factory=lambda: deque(maxlen=50))
42
+ motion_scores: deque = field(default_factory=lambda: deque(maxlen=50))
43
+ success_count: int = 0
44
+ failure_count: int = 0
45
+ total_calls: int = 0
46
+ last_load_time_ms: float = 0.0
47
+ estimated_memory_mb: float = 0.0
48
+ last_updated: float = 0.0
49
+
50
+ @property
51
+ def avg_response_time_ms(self) -> float:
52
+ if not self.response_times_ms:
53
+ return 0.0
54
+ return sum(self.response_times_ms) / len(self.response_times_ms)
55
+
56
+ @property
57
+ def success_rate(self) -> float:
58
+ if self.total_calls == 0:
59
+ return 0.0
60
+ return self.success_count / self.total_calls
61
+
62
+ @property
63
+ def avg_motion_score(self) -> float:
64
+ if not self.motion_scores:
65
+ return 0.0
66
+ return sum(self.motion_scores) / len(self.motion_scores)
67
+
68
+ @property
69
+ def trend(self) -> str:
70
+ """Compare last 10 motion scores to previous 10."""
71
+ scores = list(self.motion_scores)
72
+ if len(scores) < 10:
73
+ return "insufficient_data"
74
+ recent = scores[-10:]
75
+ previous = scores[-20:-10] if len(scores) >= 20 else scores[:10]
76
+ recent_avg = sum(recent) / len(recent)
77
+ previous_avg = sum(previous) / len(previous)
78
+ diff = recent_avg - previous_avg
79
+ if diff > 0.05:
80
+ return "improving"
81
+ elif diff < -0.05:
82
+ return "degrading"
83
+ return "stable"
84
+
85
+ def to_dict(self) -> dict:
86
+ return {
87
+ "pipeline_id": self.pipeline_id,
88
+ "name": self.name,
89
+ "model_size_gb": self.model_size_gb,
90
+ "avg_response_time_ms": round(self.avg_response_time_ms, 1),
91
+ "avg_motion_score": round(self.avg_motion_score, 3),
92
+ "success_rate": round(self.success_rate, 3),
93
+ "success_count": self.success_count,
94
+ "failure_count": self.failure_count,
95
+ "total_calls": self.total_calls,
96
+ "last_load_time_ms": round(self.last_load_time_ms, 1),
97
+ "trend": self.trend,
98
+ "last_updated": self.last_updated,
99
+ }
100
+
101
+
102
+ class MetricsStore:
103
+ """Thread-safe metrics storage for all pipelines."""
104
+
105
+ def __init__(self):
106
+ self._lock = threading.Lock()
107
+ self._metrics: dict[int, PipelineMetrics] = {}
108
+ for pid in range(5):
109
+ self._metrics[pid] = PipelineMetrics(
110
+ pipeline_id=pid,
111
+ name=PIPELINE_NAMES.get(pid, f"pipeline{pid}"),
112
+ model_size_gb=PIPELINE_SIZES_GB.get(pid, 0.0),
113
+ )
114
+
115
+ def record_call(self, pid: int, response_time_ms: float, motion_score: float, success: bool):
116
+ with self._lock:
117
+ m = self._metrics.get(pid)
118
+ if not m:
119
+ return
120
+ m.response_times_ms.append(response_time_ms)
121
+ m.motion_scores.append(motion_score)
122
+ m.total_calls += 1
123
+ if success:
124
+ m.success_count += 1
125
+ else:
126
+ m.failure_count += 1
127
+ m.last_updated = time.time()
128
+
129
+ def record_load_time(self, pid: int, ms: float):
130
+ with self._lock:
131
+ m = self._metrics.get(pid)
132
+ if m:
133
+ m.last_load_time_ms = ms
134
+ m.estimated_memory_mb = PIPELINE_SIZES_GB.get(pid, 0) * 1024 * 0.6
135
+
136
+ def get_metrics(self, pid: int) -> PipelineMetrics | None:
137
+ with self._lock:
138
+ return self._metrics.get(pid)
139
+
140
+ def get_all_metrics(self) -> dict[int, PipelineMetrics]:
141
+ with self._lock:
142
+ return dict(self._metrics)
143
+
144
+ def get_summary(self) -> dict:
145
+ with self._lock:
146
+ return {
147
+ "pipelines": {pid: m.to_dict() for pid, m in self._metrics.items()},
148
+ "size_order": SIZE_ORDER,
149
+ "total_calls": sum(m.total_calls for m in self._metrics.values()),
150
+ }