""" Layer 2 – Metrics Engine & Difficulty Controller. Simulates realistic service metrics that evolve based on: • which task is active • what step we are on • whether any fixes have been applied (VFS state) """ from __future__ import annotations import random from typing import Dict, List, Optional, Tuple from app.models import ServiceMetrics, Alert, DifficultyState # --------------------------------------------------------------------------- # Per-task metric profiles # --------------------------------------------------------------------------- HEALTHY_METRICS: Dict[str, dict] = { "ad_ranking": dict( cpu_percent=12.0, memory_mb=256.0, error_rate=0.0, p99_latency_ms=45.0, request_queue=3, last_deploy="2026-04-23 01:00 UTC", status="healthy", ), "capi_pipeline": dict( cpu_percent=8.0, memory_mb=180.0, error_rate=0.0, p99_latency_ms=20.0, request_queue=0, last_deploy="2026-04-23 02:14 UTC", status="healthy", ), "whatsapp_sync": dict( cpu_percent=10.0, memory_mb=200.0, error_rate=0.0, p99_latency_ms=35.0, request_queue=5, last_deploy="2026-04-22 18:30 UTC", status="healthy", ), } def _jitter(val: float, pct: float = 0.05) -> float: return round(val * (1 + random.uniform(-pct, pct)), 2) class MetricsEngine: """Generates per-step system metrics driven by task state.""" def __init__(self): self._task_id: int = 0 self._fixed_services: set = set() # ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------ def reset(self, task_id: int) -> None: self._task_id = task_id self._fixed_services.clear() def mark_fixed(self, service: str) -> None: self._fixed_services.add(service) def mark_unfixed(self, service: str) -> None: self._fixed_services.discard(service) # ------------------------------------------------------------------ # Core metric generation # ------------------------------------------------------------------ def get_metrics(self, step: int) -> Dict[str, ServiceMetrics]: builders = { 1: self._task1_metrics, 2: self._task2_metrics, 3: self._task3_metrics, 4: self._task4_metrics, 5: self._task5_metrics, } fn = builders.get(self._task_id, self._all_healthy) return fn(step) def _all_healthy(self, step: int) -> Dict[str, ServiceMetrics]: return { svc: ServiceMetrics(**{k: _jitter(v) if isinstance(v, float) else v for k, v in metrics.items()}) for svc, metrics in HEALTHY_METRICS.items() } # ------------------------------------------------------------------ # Task 1 – ad_ranking crashes with AttributeError # ------------------------------------------------------------------ def _task1_metrics(self, step: int) -> Dict[str, ServiceMetrics]: fixed = "ad_ranking" in self._fixed_services return { "ad_ranking": ServiceMetrics( cpu_percent=_jitter(5.0 if fixed else 2.0), memory_mb=_jitter(256.0), error_rate=0.0 if fixed else _jitter(12.0), p99_latency_ms=_jitter(45.0 if fixed else 0.0), request_queue=3 if fixed else 0, last_deploy="2026-04-23 02:14 UTC", status="healthy" if fixed else "critical", ), "capi_pipeline": ServiceMetrics(**{ k: _jitter(v) if isinstance(v, float) else v for k, v in HEALTHY_METRICS["capi_pipeline"].items() }), "whatsapp_sync": ServiceMetrics(**{ k: _jitter(v) if isinstance(v, float) else v for k, v in HEALTHY_METRICS["whatsapp_sync"].items() }), } # ------------------------------------------------------------------ # Task 2 – silent CAPI data corruption → ROAS degradation # ------------------------------------------------------------------ def _task2_metrics(self, step: int) -> Dict[str, ServiceMetrics]: capi_fixed = "capi_pipeline" in self._fixed_services ad_recovering = capi_fixed and step > 2 # needs a few steps to propagate return { "capi_pipeline": ServiceMetrics( cpu_percent=_jitter(8.0), memory_mb=_jitter(180.0), error_rate=0.0, # no crash – silent corruption p99_latency_ms=_jitter(20.0), request_queue=0, last_deploy="2026-04-23 02:14 UTC", status="healthy", # deceptive – looks fine ), "ad_ranking": ServiceMetrics( cpu_percent=_jitter(12.0), memory_mb=_jitter(256.0), error_rate=0.0, p99_latency_ms=_jitter(45.0), request_queue=3, last_deploy="2026-04-22 18:00 UTC", # ROAS in custom_data would be degraded but not visible here status="healthy" if ad_recovering else "degraded", ), "whatsapp_sync": ServiceMetrics(**{ k: _jitter(v) if isinstance(v, float) else v for k, v in HEALTHY_METRICS["whatsapp_sync"].items() }), } # ------------------------------------------------------------------ # Task 3 – memory leak in whatsapp_sync under load # ------------------------------------------------------------------ def _task3_metrics(self, step: int) -> Dict[str, ServiceMetrics]: fixed = "whatsapp_sync" in self._fixed_services # Memory climbs 50 MB per step until fixed leaked_mb = min(128.0 + (step * 50.0), 1800.0) return { "ad_ranking": ServiceMetrics(**{ k: _jitter(v) if isinstance(v, float) else v for k, v in HEALTHY_METRICS["ad_ranking"].items() }), "capi_pipeline": ServiceMetrics(**{ k: _jitter(v) if isinstance(v, float) else v for k, v in HEALTHY_METRICS["capi_pipeline"].items() }), "whatsapp_sync": ServiceMetrics( cpu_percent=_jitter(10.0 if fixed else min(15 + step * 3, 90)), memory_mb=_jitter(256.0 if fixed else leaked_mb), error_rate=0.0 if fixed else _jitter(0.05 * max(step - 3, 0)), p99_latency_ms=_jitter(35.0 if fixed else min(35 + step * 80, 8000)), request_queue=5 if fixed else min(5 + step * 20, 500), last_deploy="2026-04-22 18:30 UTC", status="healthy" if fixed else ( "critical" if leaked_mb > 1200 else "degraded" ), ), } # ------------------------------------------------------------------ # Task 4 – bad migration cascades to all three services # ------------------------------------------------------------------ def _task4_metrics(self, step: int) -> Dict[str, ServiceMetrics]: migration_rolled_back = "whatsapp_sync" in self._fixed_services return { "ad_ranking": ServiceMetrics( cpu_percent=_jitter(12.0), memory_mb=_jitter(256.0), error_rate=0.0 if migration_rolled_back else _jitter(3.5), p99_latency_ms=_jitter(45.0 if migration_rolled_back else 2200.0), request_queue=3 if migration_rolled_back else 150, last_deploy="2026-04-23 02:00 UTC", status="healthy" if migration_rolled_back else "degraded", ), "capi_pipeline": ServiceMetrics( cpu_percent=_jitter(8.0), memory_mb=_jitter(180.0), error_rate=0.0 if migration_rolled_back else _jitter(2.1), p99_latency_ms=_jitter(20.0 if migration_rolled_back else 1100.0), request_queue=0 if migration_rolled_back else 80, last_deploy="2026-04-23 02:14 UTC", status="healthy" if migration_rolled_back else "degraded", ), "whatsapp_sync": ServiceMetrics( cpu_percent=_jitter(10.0), memory_mb=_jitter(200.0), error_rate=0.0 if migration_rolled_back else _jitter(8.0), p99_latency_ms=_jitter(35.0 if migration_rolled_back else 5000.0), request_queue=5 if migration_rolled_back else 400, last_deploy="2026-04-23 02:14 UTC", status="healthy" if migration_rolled_back else "critical", ), } # ------------------------------------------------------------------ # Task 5 – PII data-leak (metrics look fine but security tests fail) # ------------------------------------------------------------------ def _task5_metrics(self, step: int) -> Dict[str, ServiceMetrics]: fixed = "capi_pipeline" in self._fixed_services return { "capi_pipeline": ServiceMetrics( cpu_percent=_jitter(8.0), memory_mb=_jitter(180.0), error_rate=0.0, # no crash – silent security hole p99_latency_ms=_jitter(20.0), request_queue=0, last_deploy="2026-04-23 02:14 UTC", status="healthy", # deliberately deceptive ), "ad_ranking": ServiceMetrics(**{ k: _jitter(v) if isinstance(v, float) else v for k, v in HEALTHY_METRICS["ad_ranking"].items() }), "whatsapp_sync": ServiceMetrics(**{ k: _jitter(v) if isinstance(v, float) else v for k, v in HEALTHY_METRICS["whatsapp_sync"].items() }), } # ------------------------------------------------------------------ # Alerts # ------------------------------------------------------------------ def get_alerts(self, step: int) -> List[Alert]: alert_map = { 1: self._task1_alerts, 2: self._task2_alerts, 3: self._task3_alerts, 4: self._task4_alerts, 5: self._task5_alerts, } fn = alert_map.get(self._task_id, lambda s: []) return fn(step) def _task1_alerts(self, step: int) -> List[Alert]: if "ad_ranking" in self._fixed_services: return [] return [ Alert( alert_id="ALT-001", severity="P0", service="ad_ranking", message=( "AttributeError: 'dict' object has no attribute 'get_clicks' " "in ranker.py score_ads() — all ranking requests failing" ), triggered_at_step=0, is_red_herring=False, ) ] def _task2_alerts(self, step: int) -> List[Alert]: alerts = [] if "capi_pipeline" not in self._fixed_services: alerts.append(Alert( alert_id="ALT-002", severity="P1", service="ad_ranking", message="ROAS dropped 68% vs 7-day average — attribution model seeing events from 1970", triggered_at_step=0, is_red_herring=False, )) # Red herring – ad_ranking looks degraded but it's CAPI's fault alerts.append(Alert( alert_id="ALT-003", severity="P2", service="ad_ranking", message="High memory pressure on ad-ranking pod — possible cache thrash", triggered_at_step=0, is_red_herring=True, )) return alerts def _task3_alerts(self, step: int) -> List[Alert]: if "whatsapp_sync" in self._fixed_services: return [] alerts = [Alert( alert_id="ALT-004", severity="P1" if step < 4 else "P0", service="whatsapp_sync", message=f"DB connection pool exhausted ({min(step * 20, 500)}/500 connections in use) — sync requests queuing", triggered_at_step=1, is_red_herring=False, )] if step > 3: alerts.append(Alert( alert_id="ALT-005", severity="P1", service="whatsapp_sync", message="p99 latency > 5 s — SLA breach imminent", triggered_at_step=4, is_red_herring=False, )) return alerts def _task4_alerts(self, step: int) -> List[Alert]: if "whatsapp_sync" in self._fixed_services: return [] return [ Alert( alert_id="ALT-006", severity="P0", service="whatsapp_sync", message="FK violation: insert into user_preferences fails — migration 003 circular FK", triggered_at_step=0, is_red_herring=False, ), # Red herrings – symptoms of the underlying migration failure Alert( alert_id="ALT-007", severity="P1", service="ad_ranking", message="High error rate on /rank endpoint — upstream DB errors propagating", triggered_at_step=0, is_red_herring=True, ), Alert( alert_id="ALT-008", severity="P1", service="capi_pipeline", message="Event ingest latency spike — shared DB pool contention", triggered_at_step=0, is_red_herring=True, ), ] def _task5_alerts(self, step: int) -> List[Alert]: if "capi_pipeline" in self._fixed_services: return [] return [ Alert( alert_id="ALT-009", severity="P0", service="capi_pipeline", message=( "SECURITY: Unusual response payload size on /ingest (avg 14 KB vs 0.2 KB) " "— possible PII exposure in debug response body" ), triggered_at_step=0, is_red_herring=False, ), Alert( alert_id="ALT-010", severity="P2", service="capi_pipeline", message="Slightly elevated memory on ingestor pod — likely buffer growth", triggered_at_step=0, is_red_herring=True, ), ] # ------------------------------------------------------------------ # Terminal output (simulated stack traces / logs) # ------------------------------------------------------------------ def get_terminal_output(self, step: int, last_test_result: Optional[str] = None) -> str: if last_test_result: return last_test_result outputs = { 1: ( "Traceback (most recent call last):\n" " File 'ad_ranking/ranker.py', line 22, in score_ads\n" " click_rate = ad.get_clicks() / max(ad.get('impressions', 1), 1)\n" "AttributeError: 'dict' object has no attribute 'get_clicks'\n" "[CRITICAL] /rank endpoint returning 500 for all requests" ), 2: ( "[WARNING] ad_ranking: ROAS attribution anomaly detected\n" " Expected event_time range: 1700000000 – 1745500000\n" " Actual event_time range: 1700 – 1745500 (← timestamps in seconds / 1000!)\n" "[INFO] capi_pipeline: All unit tests PASS\n" "[INFO] capi_pipeline: Throughput 12,000 events/s — nominal\n" "[WARNING] ad_ranking: Conversion window showing data from 1970-01-20" ), 3: ( "[INFO] whatsapp_sync: process_queue started\n" "[ERROR] asyncpg.exceptions.TooManyConnectionsError: " "connection pool exhausted (max=100)\n" " Traceback: handler.py:sync_user_messages — acquire() blocked\n" "[ERROR] Sync request for user 8841923 timed out after 30s\n" "[CRITICAL] 487 pending sync requests queued" ), 4: ( "[ERROR] asyncpg.exceptions.ForeignKeyViolationError:\n" " insert into user_preferences violates FK constraint " "\"user_preferences_user_id_fkey\"\n" " DETAIL: Key (user_id)=(48291) is not present in table \"users\".\n" "[ERROR] whatsapp_sync: message thread creation failing\n" "[WARNING] ad_ranking: upstream DB pool returning errors\n" "[WARNING] capi_pipeline: event association latency +340ms\n" " [HINT] Last DB migration was version 003 at 02:14 UTC today" ), 5: ( "[SECURITY SCAN] capi_pipeline /ingest endpoint\n" " Response body contains keys: ['status', 'processed', 'debug_data']\n" " debug_data.user_emails contains raw PII hashes + plaintext fields\n" " debug_data.raw_payload contains full user submission data\n" "[FAIL] Security test suite: test_no_pii_in_response FAILED\n" "[INFO] Unit tests: all PASSING — bug invisible to standard tests" ), } return outputs.get(self._task_id, "[INFO] All systems operational") # --------------------------------------------------------------------------- # Difficulty Controller (Theme 4 – Self-Improvement Loop) # --------------------------------------------------------------------------- class DifficultyController: """ After each episode, analyse which bug categories the agent failed on. Weight those categories higher so the next generated episode targets the agent's current weaknesses. """ BUG_CATEGORY_MAP: Dict[int, str] = { 1: "data_corruption", # hallucinated attribute 2: "data_corruption", # silent timestamp corruption 3: "async_bugs", # connection leak 4: "red_herrings", # cascading failure + red herrings 5: "security_bugs", # PII leak } def __init__(self): self.state = DifficultyState() def update(self, task_id: int, normalized_score: float) -> None: """Increase weight for the bug category this agent struggled with.""" category = self.BUG_CATEGORY_MAP.get(task_id) if category is None: return current = getattr(self.state, category) if normalized_score < 0.5: # Agent struggled – raise difficulty weight setattr(self.state, category, min(current * 1.25, 3.0)) elif normalized_score > 0.8: # Agent mastered it – slightly reduce weight setattr(self.state, category, max(current * 0.9, 0.3)) def next_task_id(self) -> int: """Sample next task weighted by current weakness scores.""" import random weights = [ (1, self.state.data_corruption), (2, self.state.data_corruption), (3, self.state.async_bugs), (4, self.state.red_herrings), (5, self.state.security_bugs), ] task_ids, task_weights = zip(*weights) total = sum(task_weights) probs = [w / total for w in task_weights] return random.choices(task_ids, weights=probs, k=1)[0] def weakness_tags(self) -> List[str]: d = self.state.dict() return [k for k, v in d.items() if v > 0.7]