| |
| """CLI test harness for the CRCS pipeline. |
| |
| Two modes: |
| 1. API mode (default): starts uvicorn, hits POST /api/run + GET /api/stream |
| 2. Direct mode (--direct): runs pipeline_v10 in-process, no server needed |
| |
| Usage: |
| # Test against local server (starts it automatically) |
| python test_cli.py "What is the best treatment for IBS-D?" |
| |
| # Test against running server (local or HF Spaces) |
| python test_cli.py --url http://localhost:7860 "What is the best treatment for IBS-D?" |
| python test_cli.py --url https://nipun-crcs-live.hf.space "query here" |
| |
| # Direct mode — no server, fastest for dev iteration |
| python test_cli.py --direct "What is the best treatment for IBS-D?" |
| |
| # Options |
| --domain medical|education|legal|finance (default: medical) |
| --code ACCESS_CODE (if auth is set) |
| --quiet (only show final answer + score) |
| --json (dump raw SSE events as JSON) |
| """ |
| import argparse |
| import json |
| import os |
| import pathlib |
| import re |
| import sys |
| import time |
|
|
|
|
| def _read_final_answer_from_raw(raw_path): |
| if not raw_path: |
| return "" |
| path = pathlib.Path(raw_path) |
| if not path.exists(): |
| return "" |
| try: |
| payload = json.loads(path.read_text(encoding="utf-8")) |
| except Exception: |
| return "" |
| iterations = payload.get("iterations") or [] |
| if not iterations: |
| return "" |
| return iterations[-1].get("answer", "") |
|
|
|
|
| def _read_final_answer_from_report(report_md): |
| if not report_md: |
| return "" |
| match = re.search(r"## Generated Answer\s+([\s\S]*?)(?:\n## |\Z)", report_md) |
| if not match: |
| return "" |
| lines = [] |
| for line in match.group(1).splitlines(): |
| stripped = line.lstrip() |
| if stripped.startswith(">"): |
| lines.append(stripped[1:].lstrip()) |
| return "\n".join(lines).strip() |
|
|
|
|
| def test_api(url, query, domain, access_code, quiet, dump_json, disable_review_gate): |
| """Test via HTTP API (POST /api/run + GET /api/stream).""" |
| import urllib.request |
| import urllib.error |
|
|
| base = url.rstrip("/") |
| headers = {"Content-Type": "application/json"} |
|
|
| |
| token = "" |
| if access_code: |
| req = urllib.request.Request( |
| f"{base}/api/auth", |
| data=json.dumps({"access_code": access_code}).encode(), |
| headers=headers, |
| method="POST", |
| ) |
| try: |
| with urllib.request.urlopen(req, timeout=10) as resp: |
| auth_data = json.loads(resp.read()) |
| token = auth_data.get("session_token", "") |
| print(f"[auth] session acquired") |
| except urllib.error.HTTPError as e: |
| print(f"[auth] FAILED: {e.code} {e.read().decode()}") |
| sys.exit(1) |
|
|
| if token: |
| headers["X-Session-Token"] = token |
|
|
| if disable_review_gate: |
| sim_req = urllib.request.Request( |
| f"{base}/api/simulations", |
| data=json.dumps({"high_stakes": False}).encode(), |
| headers=headers, |
| method="POST", |
| ) |
| try: |
| with urllib.request.urlopen(sim_req, timeout=10) as resp: |
| sim_data = json.loads(resp.read()) |
| if not quiet: |
| print(f"[sim] high_stakes={sim_data.get('high_stakes')}") |
| except urllib.error.HTTPError as e: |
| print(f"[sim] FAILED: {e.code} {e.read().decode()}") |
| sys.exit(1) |
|
|
| |
| body = json.dumps({"query": query, "domain": domain, "max_iter": 3, "n_candidates": 3}) |
| req = urllib.request.Request(f"{base}/api/run", data=body.encode(), headers=headers, method="POST") |
| try: |
| with urllib.request.urlopen(req, timeout=30) as resp: |
| start_data = json.loads(resp.read()) |
| except urllib.error.HTTPError as e: |
| print(f"[run] FAILED: {e.code} {e.read().decode()}") |
| sys.exit(1) |
|
|
| run_id = start_data.get("run_id") |
| if not run_id: |
| print(f"[run] No run_id returned: {start_data}") |
| sys.exit(1) |
| print(f"[run] started: {run_id}") |
|
|
| |
| stream_url = f"{base}/api/stream/{run_id}" |
| if token: |
| stream_url += f"?token={token}" |
| req = urllib.request.Request(stream_url, headers={"Accept": "text/event-stream"}) |
|
|
| t0 = time.time() |
| final_answer = None |
| final_score = None |
|
|
| with urllib.request.urlopen(req, timeout=600) as resp: |
| buf = "" |
| while True: |
| chunk = resp.read(4096) |
| if not chunk: |
| break |
| buf += chunk.decode("utf-8", errors="replace") |
|
|
| while "\n\n" in buf: |
| idx = buf.index("\n\n") |
| frame = buf[:idx] |
| buf = buf[idx + 2:] |
|
|
| event_type = "message" |
| data = "" |
| for line in frame.split("\n"): |
| if line.startswith("event: "): |
| event_type = line[7:] |
| elif line.startswith("data: "): |
| data = line[6:] |
| elif line.startswith(":"): |
| continue |
|
|
| if not data: |
| continue |
|
|
| try: |
| payload = json.loads(data) |
| except json.JSONDecodeError: |
| continue |
|
|
| if dump_json: |
| print(json.dumps({"event": event_type, "data": payload}, ensure_ascii=False)) |
| continue |
|
|
| elapsed = time.time() - t0 |
|
|
| if event_type == "log": |
| lvl = payload.get("level", "INFO") |
| msg = payload.get("msg", "") |
| if not quiet or lvl in ("ERROR", "WARN", "HEAD", "SUCCESS"): |
| print(f" [{elapsed:6.1f}s] {lvl:7s} {msg}") |
|
|
| elif event_type == "layer": |
| status = payload.get("status", "") |
| name = payload.get("name", payload.get("id", "")) |
| model = payload.get("model", "") |
| if not quiet: |
| sym = {"running": ">>>", "pass": " OK", "fail": "ERR"}.get(status, " ") |
| print(f" [{elapsed:6.1f}s] {sym} {name} ({model})") |
|
|
| elif event_type == "status": |
| if not quiet: |
| print(f" [{elapsed:6.1f}s] ... {payload.get('msg', '')}") |
|
|
| elif event_type == "polished": |
| final_answer = payload.get("answer", "") |
| if not quiet: |
| print(f" [{elapsed:6.1f}s] POLISHED ({len(final_answer)} chars)") |
|
|
| elif event_type == "done": |
| final_score = payload.get("score", 0) |
| final_answer = payload.get("answer", final_answer) |
| converged = payload.get("converged", False) |
| iters = payload.get("iterations", 0) |
| print(f"\n{'='*60}") |
| print(f" DONE score={final_score:.4f} " |
| f"{'CONVERGED' if converged else 'MAX_ITER'} ({iters} iter) " |
| f"{elapsed:.1f}s") |
| print(f"{'='*60}") |
|
|
| elif event_type == "judge": |
| winner = payload.get("winner", "?") |
| if not quiet: |
| print(f" [{elapsed:6.1f}s] JUDGE: winner={winner}") |
|
|
| elif event_type == "error": |
| print(f" [{elapsed:6.1f}s] ERROR: {payload.get('msg', payload)}") |
|
|
| if final_answer: |
| print(f"\n--- COMMITTED OUTPUT ---\n{final_answer}\n") |
|
|
|
|
| def test_direct(query, domain): |
| """Test pipeline directly in Python — no server needed. |
| |
| Uses pipeline_v10.run_pipeline() which is a generator yielding 11-tuples |
| (log_html, status, bar_plot, conv_plot, ...). We extract the log and |
| final answer from each yield. |
| """ |
| from config import DOMAIN_WEIGHT_PRESETS, MODEL_GENERATION |
| from pipeline_v10 import run_pipeline |
|
|
| weights = DOMAIN_WEIGHT_PRESETS.get(domain, DOMAIN_WEIGHT_PRESETS["medical"]) |
|
|
| print(f"[direct] domain={domain} model={MODEL_GENERATION}") |
| print(f"[direct] weights: P={weights['P']:.2f} L={weights['L']:.2f} E={weights['E']:.2f} S={weights['S']:.2f}") |
| print(f"[direct] query: {query}") |
| print() |
|
|
| t0 = time.time() |
| last_answer = "" |
| for result in run_pipeline("", MODEL_GENERATION, query, domain, |
| weights["P"], weights["L"], weights["E"], weights["S"], |
| max_iter=3): |
| elapsed = time.time() - t0 |
| |
| if isinstance(result, tuple) and len(result) >= 6: |
| status = result[5] if isinstance(result[5], str) else "" |
| raw_path = result[9] if len(result) > 9 and isinstance(result[9], str) else "" |
| report_md = result[4] if len(result) > 4 and isinstance(result[4], str) else "" |
| if raw_path: |
| last_answer = _read_final_answer_from_raw(raw_path) or last_answer |
| elif report_md: |
| last_answer = _read_final_answer_from_report(report_md) or last_answer |
| if status: |
| print(f" [{elapsed:6.1f}s] {status}") |
| else: |
| print(f" [{elapsed:6.1f}s] {result}") |
|
|
| print(f"\n Total: {time.time() - t0:.1f}s") |
| if last_answer: |
| |
| preview = last_answer[:500] + ("..." if len(last_answer) > 500 else "") |
| print(f"\n--- COMMITTED OUTPUT ({len(last_answer)} chars) ---\n{preview}\n") |
|
|
|
|
| def main(): |
| p = argparse.ArgumentParser(description="CLI test for CRCS pipeline") |
| p.add_argument("query", help="Query to test") |
| p.add_argument("--domain", default="medical", choices=["medical", "education", "legal", "finance"]) |
| p.add_argument("--url", default="http://localhost:7860", help="API base URL") |
| p.add_argument("--code", default=os.environ.get("ACCESS_CODE", ""), help="Access code for auth") |
| p.add_argument("--direct", action="store_true", help="Run pipeline directly (no server)") |
| p.add_argument("--disable-review-gate", action="store_true", |
| help="Set high_stakes=false via /api/simulations before API runs") |
| p.add_argument("--quiet", "-q", action="store_true", help="Only show final answer + score") |
| p.add_argument("--json", action="store_true", dest="dump_json", help="Dump raw SSE events as JSON") |
| args = p.parse_args() |
|
|
| if args.direct: |
| test_direct(args.query, args.domain) |
| else: |
| test_api(args.url, args.query, args.domain, args.code, args.quiet, args.dump_json, |
| args.disable_review_gate) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|