crcs-live / test_cli.py
Nipun's picture
Add organized audit test suite and bump to v0.22.3
d55997a
#!/usr/bin/env python3
"""CLI test harness for the CRCS pipeline.
Two modes:
1. API mode (default): starts uvicorn, hits POST /api/run + GET /api/stream
2. Direct mode (--direct): runs pipeline_v10 in-process, no server needed
Usage:
# Test against local server (starts it automatically)
python test_cli.py "What is the best treatment for IBS-D?"
# Test against running server (local or HF Spaces)
python test_cli.py --url http://localhost:7860 "What is the best treatment for IBS-D?"
python test_cli.py --url https://nipun-crcs-live.hf.space "query here"
# Direct mode — no server, fastest for dev iteration
python test_cli.py --direct "What is the best treatment for IBS-D?"
# Options
--domain medical|education|legal|finance (default: medical)
--code ACCESS_CODE (if auth is set)
--quiet (only show final answer + score)
--json (dump raw SSE events as JSON)
"""
import argparse
import json
import os
import pathlib
import re
import sys
import time
def _read_final_answer_from_raw(raw_path):
if not raw_path:
return ""
path = pathlib.Path(raw_path)
if not path.exists():
return ""
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return ""
iterations = payload.get("iterations") or []
if not iterations:
return ""
return iterations[-1].get("answer", "")
def _read_final_answer_from_report(report_md):
if not report_md:
return ""
match = re.search(r"## Generated Answer\s+([\s\S]*?)(?:\n## |\Z)", report_md)
if not match:
return ""
lines = []
for line in match.group(1).splitlines():
stripped = line.lstrip()
if stripped.startswith(">"):
lines.append(stripped[1:].lstrip())
return "\n".join(lines).strip()
def test_api(url, query, domain, access_code, quiet, dump_json, disable_review_gate):
"""Test via HTTP API (POST /api/run + GET /api/stream)."""
import urllib.request
import urllib.error
base = url.rstrip("/")
headers = {"Content-Type": "application/json"}
# Auth if needed
token = ""
if access_code:
req = urllib.request.Request(
f"{base}/api/auth",
data=json.dumps({"access_code": access_code}).encode(),
headers=headers,
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
auth_data = json.loads(resp.read())
token = auth_data.get("session_token", "")
print(f"[auth] session acquired")
except urllib.error.HTTPError as e:
print(f"[auth] FAILED: {e.code} {e.read().decode()}")
sys.exit(1)
if token:
headers["X-Session-Token"] = token
if disable_review_gate:
sim_req = urllib.request.Request(
f"{base}/api/simulations",
data=json.dumps({"high_stakes": False}).encode(),
headers=headers,
method="POST",
)
try:
with urllib.request.urlopen(sim_req, timeout=10) as resp:
sim_data = json.loads(resp.read())
if not quiet:
print(f"[sim] high_stakes={sim_data.get('high_stakes')}")
except urllib.error.HTTPError as e:
print(f"[sim] FAILED: {e.code} {e.read().decode()}")
sys.exit(1)
# Start pipeline
body = json.dumps({"query": query, "domain": domain, "max_iter": 3, "n_candidates": 3})
req = urllib.request.Request(f"{base}/api/run", data=body.encode(), headers=headers, method="POST")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
start_data = json.loads(resp.read())
except urllib.error.HTTPError as e:
print(f"[run] FAILED: {e.code} {e.read().decode()}")
sys.exit(1)
run_id = start_data.get("run_id")
if not run_id:
print(f"[run] No run_id returned: {start_data}")
sys.exit(1)
print(f"[run] started: {run_id}")
# Stream events
stream_url = f"{base}/api/stream/{run_id}"
if token:
stream_url += f"?token={token}"
req = urllib.request.Request(stream_url, headers={"Accept": "text/event-stream"})
t0 = time.time()
final_answer = None
final_score = None
with urllib.request.urlopen(req, timeout=600) as resp:
buf = ""
while True:
chunk = resp.read(4096)
if not chunk:
break
buf += chunk.decode("utf-8", errors="replace")
while "\n\n" in buf:
idx = buf.index("\n\n")
frame = buf[:idx]
buf = buf[idx + 2:]
event_type = "message"
data = ""
for line in frame.split("\n"):
if line.startswith("event: "):
event_type = line[7:]
elif line.startswith("data: "):
data = line[6:]
elif line.startswith(":"):
continue # heartbeat comment
if not data:
continue
try:
payload = json.loads(data)
except json.JSONDecodeError:
continue
if dump_json:
print(json.dumps({"event": event_type, "data": payload}, ensure_ascii=False))
continue
elapsed = time.time() - t0
if event_type == "log":
lvl = payload.get("level", "INFO")
msg = payload.get("msg", "")
if not quiet or lvl in ("ERROR", "WARN", "HEAD", "SUCCESS"):
print(f" [{elapsed:6.1f}s] {lvl:7s} {msg}")
elif event_type == "layer":
status = payload.get("status", "")
name = payload.get("name", payload.get("id", ""))
model = payload.get("model", "")
if not quiet:
sym = {"running": ">>>", "pass": " OK", "fail": "ERR"}.get(status, " ")
print(f" [{elapsed:6.1f}s] {sym} {name} ({model})")
elif event_type == "status":
if not quiet:
print(f" [{elapsed:6.1f}s] ... {payload.get('msg', '')}")
elif event_type == "polished":
final_answer = payload.get("answer", "")
if not quiet:
print(f" [{elapsed:6.1f}s] POLISHED ({len(final_answer)} chars)")
elif event_type == "done":
final_score = payload.get("score", 0)
final_answer = payload.get("answer", final_answer)
converged = payload.get("converged", False)
iters = payload.get("iterations", 0)
print(f"\n{'='*60}")
print(f" DONE score={final_score:.4f} "
f"{'CONVERGED' if converged else 'MAX_ITER'} ({iters} iter) "
f"{elapsed:.1f}s")
print(f"{'='*60}")
elif event_type == "judge":
winner = payload.get("winner", "?")
if not quiet:
print(f" [{elapsed:6.1f}s] JUDGE: winner={winner}")
elif event_type == "error":
print(f" [{elapsed:6.1f}s] ERROR: {payload.get('msg', payload)}")
if final_answer:
print(f"\n--- COMMITTED OUTPUT ---\n{final_answer}\n")
def test_direct(query, domain):
"""Test pipeline directly in Python — no server needed.
Uses pipeline_v10.run_pipeline() which is a generator yielding 11-tuples
(log_html, status, bar_plot, conv_plot, ...). We extract the log and
final answer from each yield.
"""
from config import DOMAIN_WEIGHT_PRESETS, MODEL_GENERATION
from pipeline_v10 import run_pipeline
weights = DOMAIN_WEIGHT_PRESETS.get(domain, DOMAIN_WEIGHT_PRESETS["medical"])
print(f"[direct] domain={domain} model={MODEL_GENERATION}")
print(f"[direct] weights: P={weights['P']:.2f} L={weights['L']:.2f} E={weights['E']:.2f} S={weights['S']:.2f}")
print(f"[direct] query: {query}")
print()
t0 = time.time()
last_answer = ""
for result in run_pipeline("", MODEL_GENERATION, query, domain,
weights["P"], weights["L"], weights["E"], weights["S"],
max_iter=3):
elapsed = time.time() - t0
# result is a tuple — first element is log HTML, second is status string
if isinstance(result, tuple) and len(result) >= 6:
status = result[5] if isinstance(result[5], str) else ""
raw_path = result[9] if len(result) > 9 and isinstance(result[9], str) else ""
report_md = result[4] if len(result) > 4 and isinstance(result[4], str) else ""
if raw_path:
last_answer = _read_final_answer_from_raw(raw_path) or last_answer
elif report_md:
last_answer = _read_final_answer_from_report(report_md) or last_answer
if status:
print(f" [{elapsed:6.1f}s] {status}")
else:
print(f" [{elapsed:6.1f}s] {result}")
print(f"\n Total: {time.time() - t0:.1f}s")
if last_answer:
# Show just first 500 chars of answer
preview = last_answer[:500] + ("..." if len(last_answer) > 500 else "")
print(f"\n--- COMMITTED OUTPUT ({len(last_answer)} chars) ---\n{preview}\n")
def main():
p = argparse.ArgumentParser(description="CLI test for CRCS pipeline")
p.add_argument("query", help="Query to test")
p.add_argument("--domain", default="medical", choices=["medical", "education", "legal", "finance"])
p.add_argument("--url", default="http://localhost:7860", help="API base URL")
p.add_argument("--code", default=os.environ.get("ACCESS_CODE", ""), help="Access code for auth")
p.add_argument("--direct", action="store_true", help="Run pipeline directly (no server)")
p.add_argument("--disable-review-gate", action="store_true",
help="Set high_stakes=false via /api/simulations before API runs")
p.add_argument("--quiet", "-q", action="store_true", help="Only show final answer + score")
p.add_argument("--json", action="store_true", dest="dump_json", help="Dump raw SSE events as JSON")
args = p.parse_args()
if args.direct:
test_direct(args.query, args.domain)
else:
test_api(args.url, args.query, args.domain, args.code, args.quiet, args.dump_json,
args.disable_review_gate)
if __name__ == "__main__":
main()