meta-signal / tests /test_server.py
Anvit25's picture
fix: Q4 simultaneous-placement fix across env, models, privacy, data_loader
aff2229
"""
End-to-end tests for the Meta-Signal FastAPI server.
Uses FastAPI's TestClient -- no separate server process needed.
Run with: pytest tests/test_server.py -v
"""
import pytest
from fastapi.testclient import TestClient
from app.main import app
client = TestClient(app)
# ---------------------------------------------------------------------------
# /health
# ---------------------------------------------------------------------------
def test_health():
r = client.get("/health")
assert r.status_code == 200
assert r.json()["status"] == "healthy"
# ---------------------------------------------------------------------------
# /tasks
# ---------------------------------------------------------------------------
def test_tasks_returns_three():
r = client.get("/tasks")
assert r.status_code == 200
tasks = r.json()
assert len(tasks) == 7
def test_tasks_ids_are_1_2_3():
tasks = client.get("/tasks").json()
ids = {t["task_id"] for t in tasks}
assert ids == {1, 2, 3, 4, 5, 6, 7}
def test_tasks_have_required_fields():
for task in client.get("/tasks").json():
assert "name" in task
assert "max_steps" in task
assert "target_roas" in task
assert "grader_weights" in task
assert "initial_epsilon" in task
# ---------------------------------------------------------------------------
# /reset
# ---------------------------------------------------------------------------
def test_reset_task1():
r = client.post("/reset", json={"task_id": 1, "seed": 42})
assert r.status_code == 200
obs = r.json()
assert obs["step"] == 0
assert obs["total_budget_remaining"] == 1000.0
assert obs["epsilon_remaining"] == 3.0
assert obs["privacy_regime"] == "standard"
assert len(obs["campaigns"]) == 3
def test_reset_task3_regime_is_minimal_data():
r = client.post("/reset", json={"task_id": 3, "seed": 42})
assert r.status_code == 200
assert r.json()["privacy_regime"] == "minimal_data"
def test_reset_invalid_task_id():
r = client.post("/reset", json={"task_id": 99})
assert r.status_code == 422 # Pydantic validation: ge=1 le=4
def test_reset_reproducible_with_seed():
obs_a = client.post("/reset", json={"task_id": 1, "seed": 7}).json()
obs_b = client.post("/reset", json={"task_id": 1, "seed": 7}).json()
assert obs_a["epsilon_remaining"] == obs_b["epsilon_remaining"]
assert obs_a["total_budget_remaining"] == obs_b["total_budget_remaining"]
# ---------------------------------------------------------------------------
# /step
# ---------------------------------------------------------------------------
VALID_STEP = {
"allocations": {"camp_feed": 100.0, "camp_reels": 50.0, "camp_stories": 50.0},
"attribution": "last_click",
"feature_mask": ["I1"],
}
def test_step_returns_correct_shape():
client.post("/reset", json={"task_id": 1, "seed": 42})
r = client.post("/step", json=VALID_STEP)
assert r.status_code == 200
result = r.json()
assert "observation" in result
assert "reward" in result
assert "done" in result
assert "info" in result
def test_step_reduces_budget():
client.post("/reset", json={"task_id": 1, "seed": 42})
r = client.post("/step", json=VALID_STEP)
obs = r.json()["observation"]
assert obs["total_budget_remaining"] < 1000.0
def test_step_reduces_epsilon():
client.post("/reset", json={"task_id": 1, "seed": 42})
r = client.post("/step", json=VALID_STEP)
obs = r.json()["observation"]
assert obs["epsilon_remaining"] < 3.0
def test_step_without_reset_raises():
# Force a fresh env by importing directly and calling state before reset
from app.main import _env
_env._state = None
r = client.post("/step", json=VALID_STEP)
assert r.status_code == 400
def test_step_negative_allocation_rejected():
client.post("/reset", json={"task_id": 1, "seed": 42})
bad_action = {
"allocations": {"camp_feed": -100.0, "camp_reels": 50.0, "camp_stories": 50.0},
"attribution": "last_click",
"feature_mask": ["I1"],
}
r = client.post("/step", json=bad_action)
assert r.status_code == 422
def test_step_invalid_feature_raises():
client.post("/reset", json={"task_id": 1, "seed": 42})
bad = dict(VALID_STEP)
bad["feature_mask"] = ["FAKE_FEATURE"]
r = client.post("/step", json=bad)
assert r.status_code == 400
def test_step_info_has_oracle_roas():
client.post("/reset", json={"task_id": 1, "seed": 42})
result = client.post("/step", json=VALID_STEP).json()
assert "oracle_roas" in result["info"]
assert result["info"]["oracle_roas"] >= 0.0
def test_step_regulatory_violation_flagged():
# Task 3 allows only 1 feature -- send 3
client.post("/reset", json={"task_id": 3, "seed": 42})
action = {
"allocations": {"camp_feed": 100.0, "camp_reels": 50.0, "camp_stories": 50.0},
"attribution": "last_click",
"feature_mask": ["I1", "I2", "I3"], # 3 > max_features=1
}
result = client.post("/step", json=action).json()
assert result["observation"]["regulatory_violation"] is True
# ---------------------------------------------------------------------------
# /state
# ---------------------------------------------------------------------------
def test_state_after_reset():
client.post("/reset", json={"task_id": 2, "seed": 10})
r = client.get("/state")
assert r.status_code == 200
state = r.json()
assert state["task_id"] == 2
assert state["step"] == 0
assert state["total_steps"] == 15
def test_state_step_increments():
client.post("/reset", json={"task_id": 1, "seed": 42})
client.post("/step", json=VALID_STEP)
client.post("/step", json=VALID_STEP)
state = client.get("/state").json()
assert state["step"] == 2
def test_state_history_grows():
client.post("/reset", json={"task_id": 1, "seed": 42})
for _ in range(3):
client.post("/step", json=VALID_STEP)
state = client.get("/state").json()
assert len(state["history"]) == 3
# ---------------------------------------------------------------------------
# /grader
# ---------------------------------------------------------------------------
def test_grader_returns_score_in_range():
client.post("/reset", json={"task_id": 1, "seed": 42})
for _ in range(10):
client.post("/step", json=VALID_STEP)
r = client.post("/grader", json={"task_id": 1})
assert r.status_code == 200
result = r.json()
assert 0.0 <= result["score"] <= 1.0
def test_grader_returns_breakdown():
client.post("/reset", json={"task_id": 1, "seed": 42})
for _ in range(10):
client.post("/step", json=VALID_STEP)
result = client.post("/grader", json={"task_id": 1}).json()
assert "roas_score" in result["breakdown"]
assert "allocation_trend" in result["breakdown"]
def test_grader_wrong_task_id_rejected():
client.post("/reset", json={"task_id": 1, "seed": 42})
r = client.post("/grader", json={"task_id": 2})
assert r.status_code == 400
def test_grader_task2_breakdown_keys():
client.post("/reset", json={"task_id": 2, "seed": 42})
for _ in range(15):
client.post("/step", json=VALID_STEP)
result = client.post("/grader", json={"task_id": 2}).json()
bd = result["breakdown"]
assert "oracle_proximity" in bd
assert "budget_efficiency" in bd
assert "clean_run" in bd
def test_grader_task3_breakdown_keys():
client.post("/reset", json={"task_id": 3, "seed": 42})
action = {
"allocations": {"camp_feed": 20.0, "camp_reels": 10.0, "camp_stories": 10.0},
"attribution": "last_click",
"feature_mask": ["I1"],
}
for _ in range(15):
client.post("/step", json=action)
result = client.post("/grader", json={"task_id": 3}).json()
bd = result["breakdown"]
assert "roas_score" in bd
assert "compliance_score" in bd
assert "epsilon_remaining" in bd
# ---------------------------------------------------------------------------
# Full episode flow (all three tasks)
# ---------------------------------------------------------------------------
def _run_full_episode(task_id: int, feature_mask: list) -> dict:
from app.tasks import TASK_CONFIGS
cfg = TASK_CONFIGS[task_id]
client.post("/reset", json={"task_id": task_id, "seed": 42})
for _ in range(cfg.max_steps):
action = {
"allocations": {"camp_feed": 20.0, "camp_reels": 10.0, "camp_stories": 10.0},
"attribution": "last_click",
"feature_mask": feature_mask,
}
result = client.post("/step", json=action).json()
if result["done"]:
break
return client.post("/grader", json={"task_id": task_id}).json()
def test_full_episode_task1():
grade = _run_full_episode(1, ["I1", "I2"])
assert 0.0 <= grade["score"] <= 1.0
def test_full_episode_task2_high_noise_fires():
"""After step 3, regime should be high_noise or exhausted."""
client.post("/reset", json={"task_id": 2, "seed": 42})
for _ in range(3):
client.post("/step", json=VALID_STEP)
state = client.get("/state").json()
assert state["privacy_regime"] in ("high_noise", "exhausted")
def test_full_episode_task3_compliance():
"""With 1 feature per step, compliance_score should be 1.0."""
grade = _run_full_episode(3, ["I1"])
assert grade["breakdown"]["compliance_score"] == 1.0
def test_task3_compliance_penalised_with_excess_features():
"""Using 3 features on Task 3 (max=1) should lower compliance score."""
grade = _run_full_episode(3, ["I1", "I2", "I3"])
assert grade["breakdown"]["compliance_score"] < 1.0
# ---------------------------------------------------------------------------
# Task 4 -- The Adversarial Regulator
# ---------------------------------------------------------------------------
def test_reset_task4_succeeds():
r = client.post("/reset", json={"task_id": 4, "seed": 42})
assert r.status_code == 200
obs = r.json()
assert obs["step"] == 0
assert obs["audit_active"] is False
assert obs["flagged_campaign"] is None
def test_task4_confidence_interval_present():
"""CampaignStats must include confidence_interval after a step."""
client.post("/reset", json={"task_id": 4, "seed": 42})
r = client.post("/step", json=VALID_STEP)
assert r.status_code == 200
camps = r.json()["observation"]["campaigns"]
for cs in camps:
assert "confidence_interval" in cs
ci = cs["confidence_interval"]
assert len(ci) == 2
assert ci[1] >= ci[0] # upper >= lower
def test_task4_audit_fires_at_step5():
"""After 5 steps, audit_active should be True and flagged_campaign set."""
client.post("/reset", json={"task_id": 4, "seed": 42})
obs = None
for _ in range(5):
r = client.post("/step", json=VALID_STEP)
obs = r.json()["observation"]
assert obs["audit_active"] is True
assert obs["flagged_campaign"] in ("camp_feed", "camp_reels", "camp_stories")
def test_task4_full_episode_score_in_range():
"""Full Task 4 episode with compliant agent returns valid score."""
from app.tasks import TASK_CONFIGS
cfg = TASK_CONFIGS[4]
client.post("/reset", json={"task_id": 4, "seed": 42})
flagged = None
for step_n in range(cfg.max_steps):
action = {
"allocations": {
"camp_feed": 0.0 if flagged == "camp_feed" else 20.0,
"camp_reels": 0.0 if flagged == "camp_reels" else 10.0,
"camp_stories": 0.0 if flagged == "camp_stories" else 10.0,
},
"attribution": "last_click",
"feature_mask": ["I1"],
"halted_campaigns": [flagged] if flagged else [],
"legal_reason_code": "GDPR_ART17" if flagged else None,
}
result = client.post("/step", json=action).json()
obs = result["observation"]
if obs.get("audit_active") and obs.get("flagged_campaign"):
flagged = obs["flagged_campaign"]
if result["done"]:
break
grade = client.post("/grader", json={"task_id": 4}).json()
assert 0.0 <= grade["score"] <= 1.0
assert "roas_recovery" in grade["breakdown"]
assert "audit_compliance" in grade["breakdown"]
assert "legal_code_quality" in grade["breakdown"]
# ---------------------------------------------------------------------------
# GraderResult -- explanation field
# ---------------------------------------------------------------------------
def test_grader_result_has_explanation_field():
"""GraderResult must include a non-empty explanation string for all tasks."""
for task_id in [1, 2, 3]:
from app.tasks import TASK_CONFIGS
cfg = TASK_CONFIGS[task_id]
client.post("/reset", json={"task_id": task_id, "seed": 42})
feat = ["I1"] if task_id == 3 else ["I1", "I2"]
action = {
"allocations": {"camp_feed": 20.0, "camp_reels": 10.0, "camp_stories": 10.0},
"attribution": "last_click",
"feature_mask": feat,
}
for _ in range(cfg.max_steps):
result = client.post("/step", json=action).json()
if result["done"]:
break
grade = client.post("/grader", json={"task_id": task_id}).json()
assert "explanation" in grade, f"Task {task_id} grader missing explanation"
assert isinstance(grade["explanation"], str)
assert len(grade["explanation"]) > 10, f"Task {task_id} explanation too short"
def test_grader_task4_explanation_present():
"""Task 4 grader should produce an explanation mentioning the audit step."""
from app.tasks import TASK_CONFIGS
cfg = TASK_CONFIGS[4]
client.post("/reset", json={"task_id": 4, "seed": 42})
flagged = None
for _ in range(cfg.max_steps):
action = {
"allocations": {
"camp_feed": 0.0 if flagged == "camp_feed" else 20.0,
"camp_reels": 0.0 if flagged == "camp_reels" else 10.0,
"camp_stories": 0.0 if flagged == "camp_stories" else 10.0,
},
"attribution": "last_click",
"feature_mask": ["I1"],
"legal_reason_code": "GDPR_ART17" if flagged else None,
}
result = client.post("/step", json=action).json()
obs = result["observation"]
if obs.get("audit_active") and obs.get("flagged_campaign"):
flagged = obs["flagged_campaign"]
if result["done"]:
break
grade = client.post("/grader", json={"task_id": 4}).json()
assert "explanation" in grade
assert "step 5" in grade["explanation"].lower() or "audit" in grade["explanation"].lower()
# ---------------------------------------------------------------------------
# StepInfo -- correlation_penalty_active field
# ---------------------------------------------------------------------------
def test_step_info_has_correlation_penalty_field():
"""StepResult.info must include correlation_penalty_active."""
client.post("/reset", json={"task_id": 1, "seed": 42})
result = client.post("/step", json=VALID_STEP).json()
assert "correlation_penalty_active" in result["info"]
assert isinstance(result["info"]["correlation_penalty_active"], bool)
def test_correlation_penalty_fires_on_concentration():
"""Putting >70% of spend on one campaign must trigger the penalty."""
client.post("/reset", json={"task_id": 1, "seed": 42})
concentrated = {
"allocations": {"camp_feed": 950.0, "camp_reels": 25.0, "camp_stories": 25.0},
"attribution": "last_click",
"feature_mask": ["I1"],
}
result = client.post("/step", json=concentrated).json()
assert result["info"]["correlation_penalty_active"] is True
def test_correlation_penalty_absent_on_balanced_spend():
"""A balanced allocation must NOT trigger the correlation penalty."""
client.post("/reset", json={"task_id": 1, "seed": 42})
balanced = {
"allocations": {"camp_feed": 200.0, "camp_reels": 200.0, "camp_stories": 200.0},
"attribution": "last_click",
"feature_mask": ["I1"],
}
result = client.post("/step", json=balanced).json()
assert result["info"]["correlation_penalty_active"] is False
# ---------------------------------------------------------------------------
# Task 2 market shift (step 9+)
# ---------------------------------------------------------------------------
def test_task2_market_shift_at_step9():
"""
From step 9 onward in Task 2 the warning should mention the market shift
(camp_reels CVR doubles). Use small allocations to stay within the $1000 budget
across all 9 steps (Task 2 has 15 steps, $1000 budget).
"""
client.post("/reset", json={"task_id": 2, "seed": 42})
small_action = {
"allocations": {"camp_feed": 30.0, "camp_reels": 15.0, "camp_stories": 15.0},
"attribution": "last_click",
"feature_mask": ["I1"],
}
obs = None
for _ in range(9):
result = client.post("/step", json=small_action).json()
assert "observation" in result, f"Step failed: {result}"
obs = result["observation"]
# Step 9 observation should carry the market-shift warning
assert obs is not None
assert obs["warning"] is not None
assert "market shift" in obs["warning"].lower() or "reels" in obs["warning"].lower()
# ---------------------------------------------------------------------------
# /simulate endpoint
# ---------------------------------------------------------------------------
def test_simulate_returns_valid_score():
"""All strategy / task combinations should return a score in [0, 1]."""
for strategy in ("equal", "greedy", "conservative"):
for task_id in (1, 2, 3, 4):
r = client.post("/simulate", json={
"task_id": task_id, "strategy": strategy, "seed": 42
})
assert r.status_code == 200, f"{strategy} task {task_id}: {r.text}"
d = r.json()
assert 0.0 <= d["score"] <= 1.0
assert d["strategy"] == strategy
assert d["task_id"] == task_id
def test_simulate_trace_has_correct_step_count():
"""Trace length must equal the number of steps completed."""
r = client.post("/simulate", json={"task_id": 1, "strategy": "equal", "seed": 42})
d = r.json()
assert len(d["trace"]) == 10 # Task 1 has 10 steps
def test_simulate_trace_fields():
"""Each trace row must contain the required fields."""
r = client.post("/simulate", json={"task_id": 1, "strategy": "greedy", "seed": 42})
for row in r.json()["trace"]:
for field in ("step", "allocations", "step_roas", "oracle_roas",
"epsilon_remaining", "privacy_regime", "reward",
"correlation_penalty_active"):
assert field in row, f"trace row missing '{field}'"
def test_simulate_invalid_strategy_returns_400():
r = client.post("/simulate", json={"task_id": 1, "strategy": "yolo", "seed": 42})
assert r.status_code == 400
assert "Unknown strategy" in r.json()["detail"]
def test_simulate_does_not_clobber_active_episode():
"""Running /simulate must not affect the shared episode state."""
client.post("/reset", json={"task_id": 1, "seed": 42})
client.post("/step", json=VALID_STEP)
state_before = client.get("/state").json()
# Run a simulate (uses its own env instance)
client.post("/simulate", json={"task_id": 2, "strategy": "greedy", "seed": 99})
state_after = client.get("/state").json()
assert state_after["task_id"] == state_before["task_id"]
assert state_after["step"] == state_before["step"]
assert state_after["total_steps"] == state_before["total_steps"]
def test_simulate_grader_has_explanation():
"""Simulate response must include a non-empty explanation in grader."""
r = client.post("/simulate", json={"task_id": 1, "strategy": "conservative", "seed": 42})
d = r.json()
assert "explanation" in d["grader"]
assert len(d["grader"]["explanation"]) > 10
# ---------------------------------------------------------------------------
# Task 1 -- 3-phase allocation trend grader
# ---------------------------------------------------------------------------
def test_task1_trend_score_penalises_naive_concentration():
"""
An agent that puts 100% into camp_feed from step 1 (naive, no exploration)
should score lower on allocation_trend than one with a genuine arc.
"""
from app.tasks import _allocation_trend_score
naive = [{"camp_feed": 100, "camp_reels": 0, "camp_stories": 0}] * 10
naive_s, _, _, _ = _allocation_trend_score(naive, "camp_feed")
arc = (
[{"camp_feed": 30, "camp_reels": 40, "camp_stories": 30}] * 3
+ [{"camp_feed": 55, "camp_reels": 30, "camp_stories": 15}] * 4
+ [{"camp_feed": 80, "camp_reels": 10, "camp_stories": 10}] * 3
)
arc_s, _, _, _ = _allocation_trend_score(arc, "camp_feed")
assert arc_s > naive_s, (
f"Genuine arc ({arc_s:.3f}) should outscore naive concentration ({naive_s:.3f})"
)
def test_task1_trend_score_rewards_full_arc():
"""A textbook explore→learn→exploit arc should score close to 1.0."""
from app.tasks import _allocation_trend_score
arc = (
[{"camp_feed": 25, "camp_reels": 40, "camp_stories": 35}] * 3
+ [{"camp_feed": 50, "camp_reels": 30, "camp_stories": 20}] * 4
+ [{"camp_feed": 80, "camp_reels": 10, "camp_stories": 10}] * 3
)
total_s, _, _, _ = _allocation_trend_score(arc, "camp_feed")
assert total_s >= 0.85, f"Full arc should score >= 0.85, got {total_s:.3f}"
def test_task1_grader_summary_has_phase_scores():
"""Task 1 GraderResult.summary must expose explore/learn/exploit sub-scores."""
client.post("/reset", json={"task_id": 1, "seed": 42})
for _ in range(10):
client.post("/step", json=VALID_STEP)
grade = client.post("/grader", json={"task_id": 1}).json()
for key in ("explore_score", "learn_score", "exploit_score"):
assert key in grade["summary"], f"summary missing '{key}'"
# ---------------------------------------------------------------------------
# Q4 Gauntlet tasks 5-7
# ---------------------------------------------------------------------------
def test_q4_step_has_all_campaign_impressions():
"""Q4 tasks should expose Feed, Reels, and Stories signal on the same day."""
client.post("/reset", json={"task_id": 7, "seed": 42})
result = client.post("/step", json=VALID_STEP).json()
campaigns = result["observation"]["campaigns"]
assert {c["campaign_id"] for c in campaigns} == {
"camp_feed", "camp_reels", "camp_stories"
}
assert all(c["impressions"] == 100 for c in campaigns)
def test_q4_phase_transitions():
"""Task 7 should progress through the four named Q4 phases."""
client.post("/reset", json={"task_id": 7, "seed": 42})
obs = None
small = {
"allocations": {"camp_feed": 20.0, "camp_reels": 10.0, "camp_stories": 10.0},
"attribution": "last_click",
"feature_mask": ["I1"],
}
phases = {}
for _ in range(81):
result = client.post("/step", json=small).json()
obs = result["observation"]
if obs["day"] in (20, 21, 51, 81):
phases[obs["day"]] = obs["platform_health"]
assert phases[20] == "Nominal"
assert phases[21] == "Signal_Loss"
assert phases[51] == "Andromeda_Glitched"
assert phases[81] == "Peak_Load"
def test_q4_capi_costs_two_epsilon_plus_feature():
"""use_capi=True should spend the 2.0 epsilon CAPI cost."""
client.post("/reset", json={"task_id": 5, "seed": 42})
action = {
**VALID_STEP,
"use_capi": True,
}
result = client.post("/step", json=action).json()
assert result["info"]["epsilon_cost"] == 2.05
assert result["observation"]["epsilon_remaining"] == 5.95
def test_schema_exposes_q4_safety_cap():
schema = client.get("/schema").json()
assert "apply_safety_cap" in schema["action"]["properties"]
assert "platform_health" in schema["observation"]["properties"]