File size: 8,007 Bytes
ffbf46f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | """
Populate telemetry with simulated traffic, then run drift detection.
Two batches:
clean β golden-dataset expected_answers (should match reference distribution)
dirty β same questions, hallucinated responses (should show faithfulness drift)
Bypasses the API entirely: runs graders + telemetry.record() directly.
Usage:
cd /Users/praca/ai-response-validator && .venv/bin/python eval/simulate_traffic.py
"""
import sys
import time
from pathlib import Path
import yaml
sys.path.insert(0, str(Path(__file__).parent.parent / "backend"))
import telemetry
from config import CLIENT_DOMAIN
from grader import GradeReport, grade
DATASET_PATH = Path(__file__).parent / "golden-dataset.yaml"
KNOWLEDGE_ROOT = Path(__file__).parent.parent / "knowledge"
# Hallucinated responses β plausible-sounding but contradicts KB facts
HALLUCINATED: dict[str, str] = {
# retail β NovaMart
"retail-nm-001": (
"When a product runs out of stock, the system automatically places a reorder after 72 hours "
"with no alerts sent to any manager. The supplier is notified only at month-end review."
),
"retail-nm-002": (
"To add a new supplier, send an email to the procurement team with the company name. "
"No tax ID or payment terms are required at this stage. "
"Purchase orders can be created immediately without waiting for validation."
),
"retail-nm-003": (
"Feature flags are permanent once enabled and cannot be disabled without a code deployment. "
"There is no expiry date or activation scope. Any employee can enable a flag in production."
),
"retail-nm-004": (
"The authoritative source for product information is the pricing portal. "
"SKU records are updated manually once per week by the merchandising team. "
"Archived products can be reactivated instantly by any store manager."
),
"retail-nm-005": (
"Price changes take effect immediately upon submission with no approval required. "
"There is no sync window; prices update in real time. "
"Emergency corrections are handled automatically without escalation."
),
# retail β ShelfWise
"retail-sw-001": (
"An out-of-stock alert fires only after a manual stock check is initiated by a store manager. "
"The alert is sent exclusively to the regional director. "
"No escalation occurs if the alert is unacknowledged."
),
"retail-sw-002": (
"Feature toggles are permanent once enabled. "
"There is no activation scope and no expiry date requirement. "
"Any user can enable toggles in production without sign-off."
),
"retail-sw-004": (
"Compliance reports are editable for up to 30 days after creation and are stored for 2 years. "
"Any user can access compliance reports from the standard dashboard. "
"Reports are generated on demand only."
),
"retail-sw-005": (
"Product catalog updates require manual approval for each SKU and can take up to 48 hours. "
"Deactivated products are permanently deleted and cannot be recovered."
),
# pharma β ClinixOne
"pharma-cx-001": (
"Prior authorization is optional and payers respond within 7 business days. "
"Denied requests cannot be appealed and the prescriber must choose an alternative drug."
),
"pharma-cx-003": (
"Adverse events must be reported to regulators within 30 days for all event types. "
"A safety signal is raised automatically by the system when 3 or more events occur. "
"Expected events do not require regulatory reporting."
),
"pharma-cx-004": (
"Clinical trials have two phases: Phase I for safety and Phase II for market approval. "
"Enrollment eligibility is determined by the treating physician with no formal criteria."
),
# pharma β PharmaLink
"pharma-pl-001": (
"Formulary pre-approval is automatically granted for all branded drugs. "
"The payer responds within 30 days and denied requests cannot be appealed."
),
"pharma-pl-003": (
"The formulary has two tiers: generic and branded. "
"Moving a drug to a higher tier requires a 7-day notice to prescribers. "
"Tier assignment is reviewed every 5 years."
),
"pharma-pl-004": (
"A prescribing pathway is a marketing document produced by pharmaceutical companies. "
"Pathways are reviewed every 5 years and payers do not use them in coverage decisions. "
"Deviation from a pathway requires no documentation."
),
"pharma-pl-005": (
"Enrollment authorization is a formality β patients sign a standard waiver. "
"Consent is obtained after the first study procedure, not before. "
"Protocol changes do not require re-consent from existing participants."
),
}
def _load_kb_context(domain: str) -> str:
path = KNOWLEDGE_ROOT / domain / "features.yaml"
data = yaml.safe_load(path.read_text())
chunks = [f"[{doc['title']}]\n{doc['content'].strip()}" for doc in data["documents"]]
return "\n\n".join(chunks)
def _record(pair: dict, response: str, context: str, tag: str) -> GradeReport:
client = pair["client"]
report = grade(
query=pair["question"],
response=response,
context=context,
client=client,
)
telemetry.record(
client=client,
domain=pair["domain"],
query_len=len(pair["question"].split()),
latency_ms={"retrieve": 12.0, "generate": 180.0, "grade": 45.0},
report=report,
docs_retrieved=3,
min_retrieval_score=0.72,
)
status = "PASS" if report.overall else "FAIL"
faith = next(r for r in report.results if r.metric == "faithfulness")
print(f" [{tag}] {pair['id']:<20} {status} faith={faith.score:.3f} {faith.detail}")
return report
def run() -> None:
pairs = yaml.safe_load(DATASET_PATH.read_text())["pairs"]
kb: dict[str, str] = {}
# ββ Batch 1: clean traffic ββββββββββββββββββββββββββββββββββββββββββββββ
print("\nββ Batch 1: clean traffic (expected answers) ββ\n")
for pair in pairs:
domain = pair["domain"]
if domain not in kb:
kb[domain] = _load_kb_context(domain)
response = pair["expected_answer"].strip()
_record(pair, response, kb[domain], "clean")
time.sleep(0.05)
# ββ Batch 2: dirty traffic (hallucinated responses) βββββββββββββββββββββ
print("\nββ Batch 2: dirty traffic (hallucinated responses) ββ\n")
dirty_pairs = [p for p in pairs if p["id"] in HALLUCINATED]
for pair in dirty_pairs:
domain = pair["domain"]
response = HALLUCINATED[pair["id"]]
_record(pair, response, kb[domain], "dirty")
time.sleep(0.05)
total = telemetry.live_stats()["total_queries"]
print(f"\nTelemetry buffer: {total} events ({len(pairs)} clean + {len(dirty_pairs)} dirty)\n")
# ββ Drift detection βββββββββββββββββββββββββββββββββββββββββββββββββββββ
print("=" * 60)
print("Running drift detection vs golden-dataset baseline...")
print("=" * 60)
sys.path.insert(0, str(Path(__file__).parent))
from drift import build_current, build_reference, detect_drift, report_drift
print("\nBuilding reference distribution...")
reference = build_reference()
current = build_current()
cur_n = len(next(iter(current.values()), []))
print(f"Reference: {len(next(iter(reference.values())))} pairs")
print(f"Current: {cur_n} events\n")
results = detect_drift(current, reference)
report_drift(results)
print()
if __name__ == "__main__":
run()
|