| """ |
| Layer 2 β Metrics Engine & Difficulty Controller. |
| |
| Simulates realistic service metrics that evolve based on: |
| β’ which task is active |
| β’ what step we are on |
| β’ whether any fixes have been applied (VFS state) |
| """ |
| from __future__ import annotations |
| import random |
| from typing import Dict, List, Optional, Tuple |
|
|
| from app.models import ServiceMetrics, Alert, DifficultyState |
|
|
|
|
| |
| |
| |
|
|
| HEALTHY_METRICS: Dict[str, dict] = { |
| "ad_ranking": dict( |
| cpu_percent=12.0, memory_mb=256.0, error_rate=0.0, |
| p99_latency_ms=45.0, request_queue=3, last_deploy="2026-04-23 01:00 UTC", |
| status="healthy", |
| ), |
| "capi_pipeline": dict( |
| cpu_percent=8.0, memory_mb=180.0, error_rate=0.0, |
| p99_latency_ms=20.0, request_queue=0, last_deploy="2026-04-23 02:14 UTC", |
| status="healthy", |
| ), |
| "whatsapp_sync": dict( |
| cpu_percent=10.0, memory_mb=200.0, error_rate=0.0, |
| p99_latency_ms=35.0, request_queue=5, last_deploy="2026-04-22 18:30 UTC", |
| status="healthy", |
| ), |
| } |
|
|
|
|
| def _jitter(val: float, pct: float = 0.05) -> float: |
| return round(val * (1 + random.uniform(-pct, pct)), 2) |
|
|
|
|
| class MetricsEngine: |
| """Generates per-step system metrics driven by task state.""" |
|
|
| def __init__(self): |
| self._task_id: int = 0 |
| self._fixed_services: set = set() |
|
|
| |
| |
| |
|
|
| def reset(self, task_id: int) -> None: |
| self._task_id = task_id |
| self._fixed_services.clear() |
|
|
| def mark_fixed(self, service: str) -> None: |
| self._fixed_services.add(service) |
|
|
| def mark_unfixed(self, service: str) -> None: |
| self._fixed_services.discard(service) |
|
|
| |
| |
| |
|
|
| def get_metrics(self, step: int) -> Dict[str, ServiceMetrics]: |
| builders = { |
| 1: self._task1_metrics, |
| 2: self._task2_metrics, |
| 3: self._task3_metrics, |
| 4: self._task4_metrics, |
| 5: self._task5_metrics, |
| } |
| fn = builders.get(self._task_id, self._all_healthy) |
| return fn(step) |
|
|
| def _all_healthy(self, step: int) -> Dict[str, ServiceMetrics]: |
| return { |
| svc: ServiceMetrics(**{k: _jitter(v) if isinstance(v, float) else v |
| for k, v in metrics.items()}) |
| for svc, metrics in HEALTHY_METRICS.items() |
| } |
|
|
| |
| |
| |
| def _task1_metrics(self, step: int) -> Dict[str, ServiceMetrics]: |
| fixed = "ad_ranking" in self._fixed_services |
| return { |
| "ad_ranking": ServiceMetrics( |
| cpu_percent=_jitter(5.0 if fixed else 2.0), |
| memory_mb=_jitter(256.0), |
| error_rate=0.0 if fixed else _jitter(12.0), |
| p99_latency_ms=_jitter(45.0 if fixed else 0.0), |
| request_queue=3 if fixed else 0, |
| last_deploy="2026-04-23 02:14 UTC", |
| status="healthy" if fixed else "critical", |
| ), |
| "capi_pipeline": ServiceMetrics(**{ |
| k: _jitter(v) if isinstance(v, float) else v |
| for k, v in HEALTHY_METRICS["capi_pipeline"].items() |
| }), |
| "whatsapp_sync": ServiceMetrics(**{ |
| k: _jitter(v) if isinstance(v, float) else v |
| for k, v in HEALTHY_METRICS["whatsapp_sync"].items() |
| }), |
| } |
|
|
| |
| |
| |
| def _task2_metrics(self, step: int) -> Dict[str, ServiceMetrics]: |
| capi_fixed = "capi_pipeline" in self._fixed_services |
| ad_recovering = capi_fixed and step > 2 |
| return { |
| "capi_pipeline": ServiceMetrics( |
| cpu_percent=_jitter(8.0), |
| memory_mb=_jitter(180.0), |
| error_rate=0.0, |
| p99_latency_ms=_jitter(20.0), |
| request_queue=0, |
| last_deploy="2026-04-23 02:14 UTC", |
| status="healthy", |
| ), |
| "ad_ranking": ServiceMetrics( |
| cpu_percent=_jitter(12.0), |
| memory_mb=_jitter(256.0), |
| error_rate=0.0, |
| p99_latency_ms=_jitter(45.0), |
| request_queue=3, |
| last_deploy="2026-04-22 18:00 UTC", |
| |
| status="healthy" if ad_recovering else "degraded", |
| ), |
| "whatsapp_sync": ServiceMetrics(**{ |
| k: _jitter(v) if isinstance(v, float) else v |
| for k, v in HEALTHY_METRICS["whatsapp_sync"].items() |
| }), |
| } |
|
|
| |
| |
| |
| def _task3_metrics(self, step: int) -> Dict[str, ServiceMetrics]: |
| fixed = "whatsapp_sync" in self._fixed_services |
| |
| leaked_mb = min(128.0 + (step * 50.0), 1800.0) |
| return { |
| "ad_ranking": ServiceMetrics(**{ |
| k: _jitter(v) if isinstance(v, float) else v |
| for k, v in HEALTHY_METRICS["ad_ranking"].items() |
| }), |
| "capi_pipeline": ServiceMetrics(**{ |
| k: _jitter(v) if isinstance(v, float) else v |
| for k, v in HEALTHY_METRICS["capi_pipeline"].items() |
| }), |
| "whatsapp_sync": ServiceMetrics( |
| cpu_percent=_jitter(10.0 if fixed else min(15 + step * 3, 90)), |
| memory_mb=_jitter(256.0 if fixed else leaked_mb), |
| error_rate=0.0 if fixed else _jitter(0.05 * max(step - 3, 0)), |
| p99_latency_ms=_jitter(35.0 if fixed else min(35 + step * 80, 8000)), |
| request_queue=5 if fixed else min(5 + step * 20, 500), |
| last_deploy="2026-04-22 18:30 UTC", |
| status="healthy" if fixed else ( |
| "critical" if leaked_mb > 1200 else "degraded" |
| ), |
| ), |
| } |
|
|
| |
| |
| |
| def _task4_metrics(self, step: int) -> Dict[str, ServiceMetrics]: |
| migration_rolled_back = "whatsapp_sync" in self._fixed_services |
| return { |
| "ad_ranking": ServiceMetrics( |
| cpu_percent=_jitter(12.0), |
| memory_mb=_jitter(256.0), |
| error_rate=0.0 if migration_rolled_back else _jitter(3.5), |
| p99_latency_ms=_jitter(45.0 if migration_rolled_back else 2200.0), |
| request_queue=3 if migration_rolled_back else 150, |
| last_deploy="2026-04-23 02:00 UTC", |
| status="healthy" if migration_rolled_back else "degraded", |
| ), |
| "capi_pipeline": ServiceMetrics( |
| cpu_percent=_jitter(8.0), |
| memory_mb=_jitter(180.0), |
| error_rate=0.0 if migration_rolled_back else _jitter(2.1), |
| p99_latency_ms=_jitter(20.0 if migration_rolled_back else 1100.0), |
| request_queue=0 if migration_rolled_back else 80, |
| last_deploy="2026-04-23 02:14 UTC", |
| status="healthy" if migration_rolled_back else "degraded", |
| ), |
| "whatsapp_sync": ServiceMetrics( |
| cpu_percent=_jitter(10.0), |
| memory_mb=_jitter(200.0), |
| error_rate=0.0 if migration_rolled_back else _jitter(8.0), |
| p99_latency_ms=_jitter(35.0 if migration_rolled_back else 5000.0), |
| request_queue=5 if migration_rolled_back else 400, |
| last_deploy="2026-04-23 02:14 UTC", |
| status="healthy" if migration_rolled_back else "critical", |
| ), |
| } |
|
|
| |
| |
| |
| def _task5_metrics(self, step: int) -> Dict[str, ServiceMetrics]: |
| fixed = "capi_pipeline" in self._fixed_services |
| return { |
| "capi_pipeline": ServiceMetrics( |
| cpu_percent=_jitter(8.0), |
| memory_mb=_jitter(180.0), |
| error_rate=0.0, |
| p99_latency_ms=_jitter(20.0), |
| request_queue=0, |
| last_deploy="2026-04-23 02:14 UTC", |
| status="healthy", |
| ), |
| "ad_ranking": ServiceMetrics(**{ |
| k: _jitter(v) if isinstance(v, float) else v |
| for k, v in HEALTHY_METRICS["ad_ranking"].items() |
| }), |
| "whatsapp_sync": ServiceMetrics(**{ |
| k: _jitter(v) if isinstance(v, float) else v |
| for k, v in HEALTHY_METRICS["whatsapp_sync"].items() |
| }), |
| } |
|
|
| |
| |
| |
|
|
| def get_alerts(self, step: int) -> List[Alert]: |
| alert_map = { |
| 1: self._task1_alerts, |
| 2: self._task2_alerts, |
| 3: self._task3_alerts, |
| 4: self._task4_alerts, |
| 5: self._task5_alerts, |
| } |
| fn = alert_map.get(self._task_id, lambda s: []) |
| return fn(step) |
|
|
| def _task1_alerts(self, step: int) -> List[Alert]: |
| if "ad_ranking" in self._fixed_services: |
| return [] |
| return [ |
| Alert( |
| alert_id="ALT-001", |
| severity="P0", |
| service="ad_ranking", |
| message=( |
| "AttributeError: 'dict' object has no attribute 'get_clicks' " |
| "in ranker.py score_ads() β all ranking requests failing" |
| ), |
| triggered_at_step=0, |
| is_red_herring=False, |
| ) |
| ] |
|
|
| def _task2_alerts(self, step: int) -> List[Alert]: |
| alerts = [] |
| if "capi_pipeline" not in self._fixed_services: |
| alerts.append(Alert( |
| alert_id="ALT-002", |
| severity="P1", |
| service="ad_ranking", |
| message="ROAS dropped 68% vs 7-day average β attribution model seeing events from 1970", |
| triggered_at_step=0, |
| is_red_herring=False, |
| )) |
| |
| alerts.append(Alert( |
| alert_id="ALT-003", |
| severity="P2", |
| service="ad_ranking", |
| message="High memory pressure on ad-ranking pod β possible cache thrash", |
| triggered_at_step=0, |
| is_red_herring=True, |
| )) |
| return alerts |
|
|
| def _task3_alerts(self, step: int) -> List[Alert]: |
| if "whatsapp_sync" in self._fixed_services: |
| return [] |
| alerts = [Alert( |
| alert_id="ALT-004", |
| severity="P1" if step < 4 else "P0", |
| service="whatsapp_sync", |
| message=f"DB connection pool exhausted ({min(step * 20, 500)}/500 connections in use) β sync requests queuing", |
| triggered_at_step=1, |
| is_red_herring=False, |
| )] |
| if step > 3: |
| alerts.append(Alert( |
| alert_id="ALT-005", |
| severity="P1", |
| service="whatsapp_sync", |
| message="p99 latency > 5 s β SLA breach imminent", |
| triggered_at_step=4, |
| is_red_herring=False, |
| )) |
| return alerts |
|
|
| def _task4_alerts(self, step: int) -> List[Alert]: |
| if "whatsapp_sync" in self._fixed_services: |
| return [] |
| return [ |
| Alert( |
| alert_id="ALT-006", |
| severity="P0", |
| service="whatsapp_sync", |
| message="FK violation: insert into user_preferences fails β migration 003 circular FK", |
| triggered_at_step=0, |
| is_red_herring=False, |
| ), |
| |
| Alert( |
| alert_id="ALT-007", |
| severity="P1", |
| service="ad_ranking", |
| message="High error rate on /rank endpoint β upstream DB errors propagating", |
| triggered_at_step=0, |
| is_red_herring=True, |
| ), |
| Alert( |
| alert_id="ALT-008", |
| severity="P1", |
| service="capi_pipeline", |
| message="Event ingest latency spike β shared DB pool contention", |
| triggered_at_step=0, |
| is_red_herring=True, |
| ), |
| ] |
|
|
| def _task5_alerts(self, step: int) -> List[Alert]: |
| if "capi_pipeline" in self._fixed_services: |
| return [] |
| return [ |
| Alert( |
| alert_id="ALT-009", |
| severity="P0", |
| service="capi_pipeline", |
| message=( |
| "SECURITY: Unusual response payload size on /ingest (avg 14 KB vs 0.2 KB) " |
| "β possible PII exposure in debug response body" |
| ), |
| triggered_at_step=0, |
| is_red_herring=False, |
| ), |
| Alert( |
| alert_id="ALT-010", |
| severity="P2", |
| service="capi_pipeline", |
| message="Slightly elevated memory on ingestor pod β likely buffer growth", |
| triggered_at_step=0, |
| is_red_herring=True, |
| ), |
| ] |
|
|
| |
| |
| |
|
|
| def get_terminal_output(self, step: int, last_test_result: Optional[str] = None) -> str: |
| if last_test_result: |
| return last_test_result |
|
|
| outputs = { |
| 1: ( |
| "Traceback (most recent call last):\n" |
| " File 'ad_ranking/ranker.py', line 22, in score_ads\n" |
| " click_rate = ad.get_clicks() / max(ad.get('impressions', 1), 1)\n" |
| "AttributeError: 'dict' object has no attribute 'get_clicks'\n" |
| "[CRITICAL] /rank endpoint returning 500 for all requests" |
| ), |
| 2: ( |
| "[WARNING] ad_ranking: ROAS attribution anomaly detected\n" |
| " Expected event_time range: 1700000000 β 1745500000\n" |
| " Actual event_time range: 1700 β 1745500 (β timestamps in seconds / 1000!)\n" |
| "[INFO] capi_pipeline: All unit tests PASS\n" |
| "[INFO] capi_pipeline: Throughput 12,000 events/s β nominal\n" |
| "[WARNING] ad_ranking: Conversion window showing data from 1970-01-20" |
| ), |
| 3: ( |
| "[INFO] whatsapp_sync: process_queue started\n" |
| "[ERROR] asyncpg.exceptions.TooManyConnectionsError: " |
| "connection pool exhausted (max=100)\n" |
| " Traceback: handler.py:sync_user_messages β acquire() blocked\n" |
| "[ERROR] Sync request for user 8841923 timed out after 30s\n" |
| "[CRITICAL] 487 pending sync requests queued" |
| ), |
| 4: ( |
| "[ERROR] asyncpg.exceptions.ForeignKeyViolationError:\n" |
| " insert into user_preferences violates FK constraint " |
| "\"user_preferences_user_id_fkey\"\n" |
| " DETAIL: Key (user_id)=(48291) is not present in table \"users\".\n" |
| "[ERROR] whatsapp_sync: message thread creation failing\n" |
| "[WARNING] ad_ranking: upstream DB pool returning errors\n" |
| "[WARNING] capi_pipeline: event association latency +340ms\n" |
| " [HINT] Last DB migration was version 003 at 02:14 UTC today" |
| ), |
| 5: ( |
| "[SECURITY SCAN] capi_pipeline /ingest endpoint\n" |
| " Response body contains keys: ['status', 'processed', 'debug_data']\n" |
| " debug_data.user_emails contains raw PII hashes + plaintext fields\n" |
| " debug_data.raw_payload contains full user submission data\n" |
| "[FAIL] Security test suite: test_no_pii_in_response FAILED\n" |
| "[INFO] Unit tests: all PASSING β bug invisible to standard tests" |
| ), |
| } |
| return outputs.get(self._task_id, "[INFO] All systems operational") |
|
|
|
|
| |
| |
| |
|
|
| class DifficultyController: |
| """ |
| After each episode, analyse which bug categories the agent failed on. |
| Weight those categories higher so the next generated episode targets |
| the agent's current weaknesses. |
| """ |
|
|
| BUG_CATEGORY_MAP: Dict[int, str] = { |
| 1: "data_corruption", |
| 2: "data_corruption", |
| 3: "async_bugs", |
| 4: "red_herrings", |
| 5: "security_bugs", |
| } |
|
|
| def __init__(self): |
| self.state = DifficultyState() |
|
|
| def update(self, task_id: int, normalized_score: float) -> None: |
| """Increase weight for the bug category this agent struggled with.""" |
| category = self.BUG_CATEGORY_MAP.get(task_id) |
| if category is None: |
| return |
| current = getattr(self.state, category) |
| if normalized_score < 0.5: |
| |
| setattr(self.state, category, min(current * 1.25, 3.0)) |
| elif normalized_score > 0.8: |
| |
| setattr(self.state, category, max(current * 0.9, 0.3)) |
|
|
| def next_task_id(self) -> int: |
| """Sample next task weighted by current weakness scores.""" |
| import random |
| weights = [ |
| (1, self.state.data_corruption), |
| (2, self.state.data_corruption), |
| (3, self.state.async_bugs), |
| (4, self.state.red_herrings), |
| (5, self.state.security_bugs), |
| ] |
| task_ids, task_weights = zip(*weights) |
| total = sum(task_weights) |
| probs = [w / total for w in task_weights] |
| return random.choices(task_ids, weights=probs, k=1)[0] |
|
|
| def weakness_tags(self) -> List[str]: |
| d = self.state.dict() |
| return [k for k, v in d.items() if v > 0.7] |
|
|