#!/usr/bin/env python3 """CLI test harness for the CRCS pipeline. Two modes: 1. API mode (default): starts uvicorn, hits POST /api/run + GET /api/stream 2. Direct mode (--direct): runs pipeline_v10 in-process, no server needed Usage: # Test against local server (starts it automatically) python test_cli.py "What is the best treatment for IBS-D?" # Test against running server (local or HF Spaces) python test_cli.py --url http://localhost:7860 "What is the best treatment for IBS-D?" python test_cli.py --url https://nipun-crcs-live.hf.space "query here" # Direct mode — no server, fastest for dev iteration python test_cli.py --direct "What is the best treatment for IBS-D?" # Options --domain medical|education|legal|finance (default: medical) --code ACCESS_CODE (if auth is set) --quiet (only show final answer + score) --json (dump raw SSE events as JSON) """ import argparse import json import os import pathlib import re import sys import time def _read_final_answer_from_raw(raw_path): if not raw_path: return "" path = pathlib.Path(raw_path) if not path.exists(): return "" try: payload = json.loads(path.read_text(encoding="utf-8")) except Exception: return "" iterations = payload.get("iterations") or [] if not iterations: return "" return iterations[-1].get("answer", "") def _read_final_answer_from_report(report_md): if not report_md: return "" match = re.search(r"## Generated Answer\s+([\s\S]*?)(?:\n## |\Z)", report_md) if not match: return "" lines = [] for line in match.group(1).splitlines(): stripped = line.lstrip() if stripped.startswith(">"): lines.append(stripped[1:].lstrip()) return "\n".join(lines).strip() def test_api(url, query, domain, access_code, quiet, dump_json, disable_review_gate): """Test via HTTP API (POST /api/run + GET /api/stream).""" import urllib.request import urllib.error base = url.rstrip("/") headers = {"Content-Type": "application/json"} # Auth if needed token = "" if access_code: req = urllib.request.Request( f"{base}/api/auth", data=json.dumps({"access_code": access_code}).encode(), headers=headers, method="POST", ) try: with urllib.request.urlopen(req, timeout=10) as resp: auth_data = json.loads(resp.read()) token = auth_data.get("session_token", "") print(f"[auth] session acquired") except urllib.error.HTTPError as e: print(f"[auth] FAILED: {e.code} {e.read().decode()}") sys.exit(1) if token: headers["X-Session-Token"] = token if disable_review_gate: sim_req = urllib.request.Request( f"{base}/api/simulations", data=json.dumps({"high_stakes": False}).encode(), headers=headers, method="POST", ) try: with urllib.request.urlopen(sim_req, timeout=10) as resp: sim_data = json.loads(resp.read()) if not quiet: print(f"[sim] high_stakes={sim_data.get('high_stakes')}") except urllib.error.HTTPError as e: print(f"[sim] FAILED: {e.code} {e.read().decode()}") sys.exit(1) # Start pipeline body = json.dumps({"query": query, "domain": domain, "max_iter": 3, "n_candidates": 3}) req = urllib.request.Request(f"{base}/api/run", data=body.encode(), headers=headers, method="POST") try: with urllib.request.urlopen(req, timeout=30) as resp: start_data = json.loads(resp.read()) except urllib.error.HTTPError as e: print(f"[run] FAILED: {e.code} {e.read().decode()}") sys.exit(1) run_id = start_data.get("run_id") if not run_id: print(f"[run] No run_id returned: {start_data}") sys.exit(1) print(f"[run] started: {run_id}") # Stream events stream_url = f"{base}/api/stream/{run_id}" if token: stream_url += f"?token={token}" req = urllib.request.Request(stream_url, headers={"Accept": "text/event-stream"}) t0 = time.time() final_answer = None final_score = None with urllib.request.urlopen(req, timeout=600) as resp: buf = "" while True: chunk = resp.read(4096) if not chunk: break buf += chunk.decode("utf-8", errors="replace") while "\n\n" in buf: idx = buf.index("\n\n") frame = buf[:idx] buf = buf[idx + 2:] event_type = "message" data = "" for line in frame.split("\n"): if line.startswith("event: "): event_type = line[7:] elif line.startswith("data: "): data = line[6:] elif line.startswith(":"): continue # heartbeat comment if not data: continue try: payload = json.loads(data) except json.JSONDecodeError: continue if dump_json: print(json.dumps({"event": event_type, "data": payload}, ensure_ascii=False)) continue elapsed = time.time() - t0 if event_type == "log": lvl = payload.get("level", "INFO") msg = payload.get("msg", "") if not quiet or lvl in ("ERROR", "WARN", "HEAD", "SUCCESS"): print(f" [{elapsed:6.1f}s] {lvl:7s} {msg}") elif event_type == "layer": status = payload.get("status", "") name = payload.get("name", payload.get("id", "")) model = payload.get("model", "") if not quiet: sym = {"running": ">>>", "pass": " OK", "fail": "ERR"}.get(status, " ") print(f" [{elapsed:6.1f}s] {sym} {name} ({model})") elif event_type == "status": if not quiet: print(f" [{elapsed:6.1f}s] ... {payload.get('msg', '')}") elif event_type == "polished": final_answer = payload.get("answer", "") if not quiet: print(f" [{elapsed:6.1f}s] POLISHED ({len(final_answer)} chars)") elif event_type == "done": final_score = payload.get("score", 0) final_answer = payload.get("answer", final_answer) converged = payload.get("converged", False) iters = payload.get("iterations", 0) print(f"\n{'='*60}") print(f" DONE score={final_score:.4f} " f"{'CONVERGED' if converged else 'MAX_ITER'} ({iters} iter) " f"{elapsed:.1f}s") print(f"{'='*60}") elif event_type == "judge": winner = payload.get("winner", "?") if not quiet: print(f" [{elapsed:6.1f}s] JUDGE: winner={winner}") elif event_type == "error": print(f" [{elapsed:6.1f}s] ERROR: {payload.get('msg', payload)}") if final_answer: print(f"\n--- COMMITTED OUTPUT ---\n{final_answer}\n") def test_direct(query, domain): """Test pipeline directly in Python — no server needed. Uses pipeline_v10.run_pipeline() which is a generator yielding 11-tuples (log_html, status, bar_plot, conv_plot, ...). We extract the log and final answer from each yield. """ from config import DOMAIN_WEIGHT_PRESETS, MODEL_GENERATION from pipeline_v10 import run_pipeline weights = DOMAIN_WEIGHT_PRESETS.get(domain, DOMAIN_WEIGHT_PRESETS["medical"]) print(f"[direct] domain={domain} model={MODEL_GENERATION}") print(f"[direct] weights: P={weights['P']:.2f} L={weights['L']:.2f} E={weights['E']:.2f} S={weights['S']:.2f}") print(f"[direct] query: {query}") print() t0 = time.time() last_answer = "" for result in run_pipeline("", MODEL_GENERATION, query, domain, weights["P"], weights["L"], weights["E"], weights["S"], max_iter=3): elapsed = time.time() - t0 # result is a tuple — first element is log HTML, second is status string if isinstance(result, tuple) and len(result) >= 6: status = result[5] if isinstance(result[5], str) else "" raw_path = result[9] if len(result) > 9 and isinstance(result[9], str) else "" report_md = result[4] if len(result) > 4 and isinstance(result[4], str) else "" if raw_path: last_answer = _read_final_answer_from_raw(raw_path) or last_answer elif report_md: last_answer = _read_final_answer_from_report(report_md) or last_answer if status: print(f" [{elapsed:6.1f}s] {status}") else: print(f" [{elapsed:6.1f}s] {result}") print(f"\n Total: {time.time() - t0:.1f}s") if last_answer: # Show just first 500 chars of answer preview = last_answer[:500] + ("..." if len(last_answer) > 500 else "") print(f"\n--- COMMITTED OUTPUT ({len(last_answer)} chars) ---\n{preview}\n") def main(): p = argparse.ArgumentParser(description="CLI test for CRCS pipeline") p.add_argument("query", help="Query to test") p.add_argument("--domain", default="medical", choices=["medical", "education", "legal", "finance"]) p.add_argument("--url", default="http://localhost:7860", help="API base URL") p.add_argument("--code", default=os.environ.get("ACCESS_CODE", ""), help="Access code for auth") p.add_argument("--direct", action="store_true", help="Run pipeline directly (no server)") p.add_argument("--disable-review-gate", action="store_true", help="Set high_stakes=false via /api/simulations before API runs") p.add_argument("--quiet", "-q", action="store_true", help="Only show final answer + score") p.add_argument("--json", action="store_true", dest="dump_json", help="Dump raw SSE events as JSON") args = p.parse_args() if args.direct: test_direct(args.query, args.domain) else: test_api(args.url, args.query, args.domain, args.code, args.quiet, args.dump_json, args.disable_review_gate) if __name__ == "__main__": main()