ai-response-validator / eval /simulate_traffic.py
mbochniak01
Replace HHEM with sentence-level NLI, add claim decomposition and drift detection
ffbf46f
Raw
History Blame Contribute Delete
8.01 kB
"""
Populate telemetry with simulated traffic, then run drift detection.
Two batches:
clean β€” golden-dataset expected_answers (should match reference distribution)
dirty β€” same questions, hallucinated responses (should show faithfulness drift)
Bypasses the API entirely: runs graders + telemetry.record() directly.
Usage:
cd /Users/praca/ai-response-validator && .venv/bin/python eval/simulate_traffic.py
"""
import sys
import time
from pathlib import Path
import yaml
sys.path.insert(0, str(Path(__file__).parent.parent / "backend"))
import telemetry
from config import CLIENT_DOMAIN
from grader import GradeReport, grade
DATASET_PATH = Path(__file__).parent / "golden-dataset.yaml"
KNOWLEDGE_ROOT = Path(__file__).parent.parent / "knowledge"
# Hallucinated responses β€” plausible-sounding but contradicts KB facts
HALLUCINATED: dict[str, str] = {
# retail β€” NovaMart
"retail-nm-001": (
"When a product runs out of stock, the system automatically places a reorder after 72 hours "
"with no alerts sent to any manager. The supplier is notified only at month-end review."
),
"retail-nm-002": (
"To add a new supplier, send an email to the procurement team with the company name. "
"No tax ID or payment terms are required at this stage. "
"Purchase orders can be created immediately without waiting for validation."
),
"retail-nm-003": (
"Feature flags are permanent once enabled and cannot be disabled without a code deployment. "
"There is no expiry date or activation scope. Any employee can enable a flag in production."
),
"retail-nm-004": (
"The authoritative source for product information is the pricing portal. "
"SKU records are updated manually once per week by the merchandising team. "
"Archived products can be reactivated instantly by any store manager."
),
"retail-nm-005": (
"Price changes take effect immediately upon submission with no approval required. "
"There is no sync window; prices update in real time. "
"Emergency corrections are handled automatically without escalation."
),
# retail β€” ShelfWise
"retail-sw-001": (
"An out-of-stock alert fires only after a manual stock check is initiated by a store manager. "
"The alert is sent exclusively to the regional director. "
"No escalation occurs if the alert is unacknowledged."
),
"retail-sw-002": (
"Feature toggles are permanent once enabled. "
"There is no activation scope and no expiry date requirement. "
"Any user can enable toggles in production without sign-off."
),
"retail-sw-004": (
"Compliance reports are editable for up to 30 days after creation and are stored for 2 years. "
"Any user can access compliance reports from the standard dashboard. "
"Reports are generated on demand only."
),
"retail-sw-005": (
"Product catalog updates require manual approval for each SKU and can take up to 48 hours. "
"Deactivated products are permanently deleted and cannot be recovered."
),
# pharma β€” ClinixOne
"pharma-cx-001": (
"Prior authorization is optional and payers respond within 7 business days. "
"Denied requests cannot be appealed and the prescriber must choose an alternative drug."
),
"pharma-cx-003": (
"Adverse events must be reported to regulators within 30 days for all event types. "
"A safety signal is raised automatically by the system when 3 or more events occur. "
"Expected events do not require regulatory reporting."
),
"pharma-cx-004": (
"Clinical trials have two phases: Phase I for safety and Phase II for market approval. "
"Enrollment eligibility is determined by the treating physician with no formal criteria."
),
# pharma β€” PharmaLink
"pharma-pl-001": (
"Formulary pre-approval is automatically granted for all branded drugs. "
"The payer responds within 30 days and denied requests cannot be appealed."
),
"pharma-pl-003": (
"The formulary has two tiers: generic and branded. "
"Moving a drug to a higher tier requires a 7-day notice to prescribers. "
"Tier assignment is reviewed every 5 years."
),
"pharma-pl-004": (
"A prescribing pathway is a marketing document produced by pharmaceutical companies. "
"Pathways are reviewed every 5 years and payers do not use them in coverage decisions. "
"Deviation from a pathway requires no documentation."
),
"pharma-pl-005": (
"Enrollment authorization is a formality β€” patients sign a standard waiver. "
"Consent is obtained after the first study procedure, not before. "
"Protocol changes do not require re-consent from existing participants."
),
}
def _load_kb_context(domain: str) -> str:
path = KNOWLEDGE_ROOT / domain / "features.yaml"
data = yaml.safe_load(path.read_text())
chunks = [f"[{doc['title']}]\n{doc['content'].strip()}" for doc in data["documents"]]
return "\n\n".join(chunks)
def _record(pair: dict, response: str, context: str, tag: str) -> GradeReport:
client = pair["client"]
report = grade(
query=pair["question"],
response=response,
context=context,
client=client,
)
telemetry.record(
client=client,
domain=pair["domain"],
query_len=len(pair["question"].split()),
latency_ms={"retrieve": 12.0, "generate": 180.0, "grade": 45.0},
report=report,
docs_retrieved=3,
min_retrieval_score=0.72,
)
status = "PASS" if report.overall else "FAIL"
faith = next(r for r in report.results if r.metric == "faithfulness")
print(f" [{tag}] {pair['id']:<20} {status} faith={faith.score:.3f} {faith.detail}")
return report
def run() -> None:
pairs = yaml.safe_load(DATASET_PATH.read_text())["pairs"]
kb: dict[str, str] = {}
# ── Batch 1: clean traffic ──────────────────────────────────────────────
print("\n── Batch 1: clean traffic (expected answers) ──\n")
for pair in pairs:
domain = pair["domain"]
if domain not in kb:
kb[domain] = _load_kb_context(domain)
response = pair["expected_answer"].strip()
_record(pair, response, kb[domain], "clean")
time.sleep(0.05)
# ── Batch 2: dirty traffic (hallucinated responses) ─────────────────────
print("\n── Batch 2: dirty traffic (hallucinated responses) ──\n")
dirty_pairs = [p for p in pairs if p["id"] in HALLUCINATED]
for pair in dirty_pairs:
domain = pair["domain"]
response = HALLUCINATED[pair["id"]]
_record(pair, response, kb[domain], "dirty")
time.sleep(0.05)
total = telemetry.live_stats()["total_queries"]
print(f"\nTelemetry buffer: {total} events ({len(pairs)} clean + {len(dirty_pairs)} dirty)\n")
# ── Drift detection ─────────────────────────────────────────────────────
print("=" * 60)
print("Running drift detection vs golden-dataset baseline...")
print("=" * 60)
sys.path.insert(0, str(Path(__file__).parent))
from drift import build_current, build_reference, detect_drift, report_drift
print("\nBuilding reference distribution...")
reference = build_reference()
current = build_current()
cur_n = len(next(iter(current.values()), []))
print(f"Reference: {len(next(iter(reference.values())))} pairs")
print(f"Current: {cur_n} events\n")
results = detect_drift(current, reference)
report_drift(results)
print()
if __name__ == "__main__":
run()