Meta-SRE / app /engine /observability.py
Anvit25's picture
Deploy Meta-SRE OpenEnv benchmark FastAPI server
ad6248e
"""
Layer 2 – Metrics Engine & Difficulty Controller.
Simulates realistic service metrics that evolve based on:
β€’ which task is active
β€’ what step we are on
β€’ whether any fixes have been applied (VFS state)
"""
from __future__ import annotations
import random
from typing import Dict, List, Optional, Tuple
from app.models import ServiceMetrics, Alert, DifficultyState
# ---------------------------------------------------------------------------
# Per-task metric profiles
# ---------------------------------------------------------------------------
HEALTHY_METRICS: Dict[str, dict] = {
"ad_ranking": dict(
cpu_percent=12.0, memory_mb=256.0, error_rate=0.0,
p99_latency_ms=45.0, request_queue=3, last_deploy="2026-04-23 01:00 UTC",
status="healthy",
),
"capi_pipeline": dict(
cpu_percent=8.0, memory_mb=180.0, error_rate=0.0,
p99_latency_ms=20.0, request_queue=0, last_deploy="2026-04-23 02:14 UTC",
status="healthy",
),
"whatsapp_sync": dict(
cpu_percent=10.0, memory_mb=200.0, error_rate=0.0,
p99_latency_ms=35.0, request_queue=5, last_deploy="2026-04-22 18:30 UTC",
status="healthy",
),
}
def _jitter(val: float, pct: float = 0.05) -> float:
return round(val * (1 + random.uniform(-pct, pct)), 2)
class MetricsEngine:
"""Generates per-step system metrics driven by task state."""
def __init__(self):
self._task_id: int = 0
self._fixed_services: set = set()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def reset(self, task_id: int) -> None:
self._task_id = task_id
self._fixed_services.clear()
def mark_fixed(self, service: str) -> None:
self._fixed_services.add(service)
def mark_unfixed(self, service: str) -> None:
self._fixed_services.discard(service)
# ------------------------------------------------------------------
# Core metric generation
# ------------------------------------------------------------------
def get_metrics(self, step: int) -> Dict[str, ServiceMetrics]:
builders = {
1: self._task1_metrics,
2: self._task2_metrics,
3: self._task3_metrics,
4: self._task4_metrics,
5: self._task5_metrics,
}
fn = builders.get(self._task_id, self._all_healthy)
return fn(step)
def _all_healthy(self, step: int) -> Dict[str, ServiceMetrics]:
return {
svc: ServiceMetrics(**{k: _jitter(v) if isinstance(v, float) else v
for k, v in metrics.items()})
for svc, metrics in HEALTHY_METRICS.items()
}
# ------------------------------------------------------------------
# Task 1 – ad_ranking crashes with AttributeError
# ------------------------------------------------------------------
def _task1_metrics(self, step: int) -> Dict[str, ServiceMetrics]:
fixed = "ad_ranking" in self._fixed_services
return {
"ad_ranking": ServiceMetrics(
cpu_percent=_jitter(5.0 if fixed else 2.0),
memory_mb=_jitter(256.0),
error_rate=0.0 if fixed else _jitter(12.0),
p99_latency_ms=_jitter(45.0 if fixed else 0.0),
request_queue=3 if fixed else 0,
last_deploy="2026-04-23 02:14 UTC",
status="healthy" if fixed else "critical",
),
"capi_pipeline": ServiceMetrics(**{
k: _jitter(v) if isinstance(v, float) else v
for k, v in HEALTHY_METRICS["capi_pipeline"].items()
}),
"whatsapp_sync": ServiceMetrics(**{
k: _jitter(v) if isinstance(v, float) else v
for k, v in HEALTHY_METRICS["whatsapp_sync"].items()
}),
}
# ------------------------------------------------------------------
# Task 2 – silent CAPI data corruption β†’ ROAS degradation
# ------------------------------------------------------------------
def _task2_metrics(self, step: int) -> Dict[str, ServiceMetrics]:
capi_fixed = "capi_pipeline" in self._fixed_services
ad_recovering = capi_fixed and step > 2 # needs a few steps to propagate
return {
"capi_pipeline": ServiceMetrics(
cpu_percent=_jitter(8.0),
memory_mb=_jitter(180.0),
error_rate=0.0, # no crash – silent corruption
p99_latency_ms=_jitter(20.0),
request_queue=0,
last_deploy="2026-04-23 02:14 UTC",
status="healthy", # deceptive – looks fine
),
"ad_ranking": ServiceMetrics(
cpu_percent=_jitter(12.0),
memory_mb=_jitter(256.0),
error_rate=0.0,
p99_latency_ms=_jitter(45.0),
request_queue=3,
last_deploy="2026-04-22 18:00 UTC",
# ROAS in custom_data would be degraded but not visible here
status="healthy" if ad_recovering else "degraded",
),
"whatsapp_sync": ServiceMetrics(**{
k: _jitter(v) if isinstance(v, float) else v
for k, v in HEALTHY_METRICS["whatsapp_sync"].items()
}),
}
# ------------------------------------------------------------------
# Task 3 – memory leak in whatsapp_sync under load
# ------------------------------------------------------------------
def _task3_metrics(self, step: int) -> Dict[str, ServiceMetrics]:
fixed = "whatsapp_sync" in self._fixed_services
# Memory climbs 50 MB per step until fixed
leaked_mb = min(128.0 + (step * 50.0), 1800.0)
return {
"ad_ranking": ServiceMetrics(**{
k: _jitter(v) if isinstance(v, float) else v
for k, v in HEALTHY_METRICS["ad_ranking"].items()
}),
"capi_pipeline": ServiceMetrics(**{
k: _jitter(v) if isinstance(v, float) else v
for k, v in HEALTHY_METRICS["capi_pipeline"].items()
}),
"whatsapp_sync": ServiceMetrics(
cpu_percent=_jitter(10.0 if fixed else min(15 + step * 3, 90)),
memory_mb=_jitter(256.0 if fixed else leaked_mb),
error_rate=0.0 if fixed else _jitter(0.05 * max(step - 3, 0)),
p99_latency_ms=_jitter(35.0 if fixed else min(35 + step * 80, 8000)),
request_queue=5 if fixed else min(5 + step * 20, 500),
last_deploy="2026-04-22 18:30 UTC",
status="healthy" if fixed else (
"critical" if leaked_mb > 1200 else "degraded"
),
),
}
# ------------------------------------------------------------------
# Task 4 – bad migration cascades to all three services
# ------------------------------------------------------------------
def _task4_metrics(self, step: int) -> Dict[str, ServiceMetrics]:
migration_rolled_back = "whatsapp_sync" in self._fixed_services
return {
"ad_ranking": ServiceMetrics(
cpu_percent=_jitter(12.0),
memory_mb=_jitter(256.0),
error_rate=0.0 if migration_rolled_back else _jitter(3.5),
p99_latency_ms=_jitter(45.0 if migration_rolled_back else 2200.0),
request_queue=3 if migration_rolled_back else 150,
last_deploy="2026-04-23 02:00 UTC",
status="healthy" if migration_rolled_back else "degraded",
),
"capi_pipeline": ServiceMetrics(
cpu_percent=_jitter(8.0),
memory_mb=_jitter(180.0),
error_rate=0.0 if migration_rolled_back else _jitter(2.1),
p99_latency_ms=_jitter(20.0 if migration_rolled_back else 1100.0),
request_queue=0 if migration_rolled_back else 80,
last_deploy="2026-04-23 02:14 UTC",
status="healthy" if migration_rolled_back else "degraded",
),
"whatsapp_sync": ServiceMetrics(
cpu_percent=_jitter(10.0),
memory_mb=_jitter(200.0),
error_rate=0.0 if migration_rolled_back else _jitter(8.0),
p99_latency_ms=_jitter(35.0 if migration_rolled_back else 5000.0),
request_queue=5 if migration_rolled_back else 400,
last_deploy="2026-04-23 02:14 UTC",
status="healthy" if migration_rolled_back else "critical",
),
}
# ------------------------------------------------------------------
# Task 5 – PII data-leak (metrics look fine but security tests fail)
# ------------------------------------------------------------------
def _task5_metrics(self, step: int) -> Dict[str, ServiceMetrics]:
fixed = "capi_pipeline" in self._fixed_services
return {
"capi_pipeline": ServiceMetrics(
cpu_percent=_jitter(8.0),
memory_mb=_jitter(180.0),
error_rate=0.0, # no crash – silent security hole
p99_latency_ms=_jitter(20.0),
request_queue=0,
last_deploy="2026-04-23 02:14 UTC",
status="healthy", # deliberately deceptive
),
"ad_ranking": ServiceMetrics(**{
k: _jitter(v) if isinstance(v, float) else v
for k, v in HEALTHY_METRICS["ad_ranking"].items()
}),
"whatsapp_sync": ServiceMetrics(**{
k: _jitter(v) if isinstance(v, float) else v
for k, v in HEALTHY_METRICS["whatsapp_sync"].items()
}),
}
# ------------------------------------------------------------------
# Alerts
# ------------------------------------------------------------------
def get_alerts(self, step: int) -> List[Alert]:
alert_map = {
1: self._task1_alerts,
2: self._task2_alerts,
3: self._task3_alerts,
4: self._task4_alerts,
5: self._task5_alerts,
}
fn = alert_map.get(self._task_id, lambda s: [])
return fn(step)
def _task1_alerts(self, step: int) -> List[Alert]:
if "ad_ranking" in self._fixed_services:
return []
return [
Alert(
alert_id="ALT-001",
severity="P0",
service="ad_ranking",
message=(
"AttributeError: 'dict' object has no attribute 'get_clicks' "
"in ranker.py score_ads() β€” all ranking requests failing"
),
triggered_at_step=0,
is_red_herring=False,
)
]
def _task2_alerts(self, step: int) -> List[Alert]:
alerts = []
if "capi_pipeline" not in self._fixed_services:
alerts.append(Alert(
alert_id="ALT-002",
severity="P1",
service="ad_ranking",
message="ROAS dropped 68% vs 7-day average β€” attribution model seeing events from 1970",
triggered_at_step=0,
is_red_herring=False,
))
# Red herring – ad_ranking looks degraded but it's CAPI's fault
alerts.append(Alert(
alert_id="ALT-003",
severity="P2",
service="ad_ranking",
message="High memory pressure on ad-ranking pod β€” possible cache thrash",
triggered_at_step=0,
is_red_herring=True,
))
return alerts
def _task3_alerts(self, step: int) -> List[Alert]:
if "whatsapp_sync" in self._fixed_services:
return []
alerts = [Alert(
alert_id="ALT-004",
severity="P1" if step < 4 else "P0",
service="whatsapp_sync",
message=f"DB connection pool exhausted ({min(step * 20, 500)}/500 connections in use) β€” sync requests queuing",
triggered_at_step=1,
is_red_herring=False,
)]
if step > 3:
alerts.append(Alert(
alert_id="ALT-005",
severity="P1",
service="whatsapp_sync",
message="p99 latency > 5 s β€” SLA breach imminent",
triggered_at_step=4,
is_red_herring=False,
))
return alerts
def _task4_alerts(self, step: int) -> List[Alert]:
if "whatsapp_sync" in self._fixed_services:
return []
return [
Alert(
alert_id="ALT-006",
severity="P0",
service="whatsapp_sync",
message="FK violation: insert into user_preferences fails β€” migration 003 circular FK",
triggered_at_step=0,
is_red_herring=False,
),
# Red herrings – symptoms of the underlying migration failure
Alert(
alert_id="ALT-007",
severity="P1",
service="ad_ranking",
message="High error rate on /rank endpoint β€” upstream DB errors propagating",
triggered_at_step=0,
is_red_herring=True,
),
Alert(
alert_id="ALT-008",
severity="P1",
service="capi_pipeline",
message="Event ingest latency spike β€” shared DB pool contention",
triggered_at_step=0,
is_red_herring=True,
),
]
def _task5_alerts(self, step: int) -> List[Alert]:
if "capi_pipeline" in self._fixed_services:
return []
return [
Alert(
alert_id="ALT-009",
severity="P0",
service="capi_pipeline",
message=(
"SECURITY: Unusual response payload size on /ingest (avg 14 KB vs 0.2 KB) "
"β€” possible PII exposure in debug response body"
),
triggered_at_step=0,
is_red_herring=False,
),
Alert(
alert_id="ALT-010",
severity="P2",
service="capi_pipeline",
message="Slightly elevated memory on ingestor pod β€” likely buffer growth",
triggered_at_step=0,
is_red_herring=True,
),
]
# ------------------------------------------------------------------
# Terminal output (simulated stack traces / logs)
# ------------------------------------------------------------------
def get_terminal_output(self, step: int, last_test_result: Optional[str] = None) -> str:
if last_test_result:
return last_test_result
outputs = {
1: (
"Traceback (most recent call last):\n"
" File 'ad_ranking/ranker.py', line 22, in score_ads\n"
" click_rate = ad.get_clicks() / max(ad.get('impressions', 1), 1)\n"
"AttributeError: 'dict' object has no attribute 'get_clicks'\n"
"[CRITICAL] /rank endpoint returning 500 for all requests"
),
2: (
"[WARNING] ad_ranking: ROAS attribution anomaly detected\n"
" Expected event_time range: 1700000000 – 1745500000\n"
" Actual event_time range: 1700 – 1745500 (← timestamps in seconds / 1000!)\n"
"[INFO] capi_pipeline: All unit tests PASS\n"
"[INFO] capi_pipeline: Throughput 12,000 events/s β€” nominal\n"
"[WARNING] ad_ranking: Conversion window showing data from 1970-01-20"
),
3: (
"[INFO] whatsapp_sync: process_queue started\n"
"[ERROR] asyncpg.exceptions.TooManyConnectionsError: "
"connection pool exhausted (max=100)\n"
" Traceback: handler.py:sync_user_messages β€” acquire() blocked\n"
"[ERROR] Sync request for user 8841923 timed out after 30s\n"
"[CRITICAL] 487 pending sync requests queued"
),
4: (
"[ERROR] asyncpg.exceptions.ForeignKeyViolationError:\n"
" insert into user_preferences violates FK constraint "
"\"user_preferences_user_id_fkey\"\n"
" DETAIL: Key (user_id)=(48291) is not present in table \"users\".\n"
"[ERROR] whatsapp_sync: message thread creation failing\n"
"[WARNING] ad_ranking: upstream DB pool returning errors\n"
"[WARNING] capi_pipeline: event association latency +340ms\n"
" [HINT] Last DB migration was version 003 at 02:14 UTC today"
),
5: (
"[SECURITY SCAN] capi_pipeline /ingest endpoint\n"
" Response body contains keys: ['status', 'processed', 'debug_data']\n"
" debug_data.user_emails contains raw PII hashes + plaintext fields\n"
" debug_data.raw_payload contains full user submission data\n"
"[FAIL] Security test suite: test_no_pii_in_response FAILED\n"
"[INFO] Unit tests: all PASSING β€” bug invisible to standard tests"
),
}
return outputs.get(self._task_id, "[INFO] All systems operational")
# ---------------------------------------------------------------------------
# Difficulty Controller (Theme 4 – Self-Improvement Loop)
# ---------------------------------------------------------------------------
class DifficultyController:
"""
After each episode, analyse which bug categories the agent failed on.
Weight those categories higher so the next generated episode targets
the agent's current weaknesses.
"""
BUG_CATEGORY_MAP: Dict[int, str] = {
1: "data_corruption", # hallucinated attribute
2: "data_corruption", # silent timestamp corruption
3: "async_bugs", # connection leak
4: "red_herrings", # cascading failure + red herrings
5: "security_bugs", # PII leak
}
def __init__(self):
self.state = DifficultyState()
def update(self, task_id: int, normalized_score: float) -> None:
"""Increase weight for the bug category this agent struggled with."""
category = self.BUG_CATEGORY_MAP.get(task_id)
if category is None:
return
current = getattr(self.state, category)
if normalized_score < 0.5:
# Agent struggled – raise difficulty weight
setattr(self.state, category, min(current * 1.25, 3.0))
elif normalized_score > 0.8:
# Agent mastered it – slightly reduce weight
setattr(self.state, category, max(current * 0.9, 0.3))
def next_task_id(self) -> int:
"""Sample next task weighted by current weakness scores."""
import random
weights = [
(1, self.state.data_corruption),
(2, self.state.data_corruption),
(3, self.state.async_bugs),
(4, self.state.red_herrings),
(5, self.state.security_bugs),
]
task_ids, task_weights = zip(*weights)
total = sum(task_weights)
probs = [w / total for w in task_weights]
return random.choices(task_ids, weights=probs, k=1)[0]
def weakness_tags(self) -> List[str]:
d = self.state.dict()
return [k for k, v in d.items() if v > 0.7]