""" AutoClean-Ai v1.0.0 — Production FastAPI Server Endpoints: Standard : POST /reset POST /step GET /state GET /health OpenEnv : GET /tasks POST /grader POST /baseline Extra : GET /leaderboard POST /leaderboard/submit GET /datasets """ import sys, os, uuid, logging, dataclasses, enum, time, threading sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from fastapi import FastAPI, HTTPException, Header, Request from fastapi.responses import JSONResponse, RedirectResponse, HTMLResponse, Response, FileResponse from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager from typing import Dict, Any, Optional, List from models import DataCleaningAction, DataCleaningObservation, DataCleaningState from environment import DataCleaningEnvironment from metrics import get_tracker from tasks import ( ALL_TASKS, get_task, task_id_for_difficulty, compute_task_score, ACTION_SCHEMA, ) logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) # ═══════════════════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════════════════ # POLISHED DEMO PAGE # ═══════════════════════════════════════════════════════════════════════════════ STUNNING_DOCS_HTML = """ DataQualityGuard-Env · OpenEnv
OpenEnv · RL Environment
v4.2.0

DataQualityGuard‑Env

Train AI models to answer only from verified context — with a 9-component reward system that penalizes fabrication and rewards factual grounding, citation accuracy, and calibrated confidence.

1M+ Examples
38 Datasets
3 Task Tiers
9 Reward Components

How it works

Three primitives. Nine reward signals. One goal: no data_qualitys.

01
🔄

reset()

Sample a question + context document from one of 38 curated datasets, stratified by difficulty tier.

02
📤

step(answer)

Submit your answer with confidence and a source quote. Receive a dense reward signal across all 9 components.

03
📊

grade()

Aggregate episode rewards into a task score. Track accuracy, data_quality rate, and skill rating over time.

9-Component Reward System

Every answer is graded on factual correctness, source grounding, citation accuracy, confidence calibration, semantic consistency, data_quality detection, ROUGE-L, BERTScore, and AlignScore. Each component is weighted and combined into a single scalar reward in [0, 1]. Confident wrong answers are penalized harder than uncertain ones.

Curriculum Progression

Episodes advance from Beginner (single-hop factual QA with unambiguous ground-truth) through Intermediate (multi-hop synthesis across multiple context sentences) to Advanced (adversarial prompts where confident refusals score best). The environment tracks a live skill rating and adjusts difficulty sampling accordingly.

Task Tiers

Three progressively harder tasks drawn from 38 datasets with 1M+ examples.

🟢

Factual Grounding

Beginner ~450K examples

Answer straightforward factual questions from a short context passage. Single-hop retrieval with unambiguous ground truth. The grader rewards precise citation and heavily penalizes adding information not found in the context.

SQuAD BoolQ OpenBookQA ARC TriviaQA +8 more
🔵

Multi-Hop Synthesis

Intermediate ~380K examples

Synthesize evidence from multiple context sentences to reach an answer. Requires connecting disparate facts without fabricating bridge claims. AlignScore and BERTScore are weighted more heavily at this tier.

HotpotQA CoQA NQ-Open MS-MARCO MuSiQue +7 more
🔴

Adversarial Resistance

Advanced ~210K examples

Resist adversarial prompts designed to elicit data_qualitys. Many questions are deliberately unanswerable — confident refusals with low confidence score better than fabricated plausible-sounding answers.

DataQualityEval TruthfulQA FEVER Climate-FEVER WittyQA +6 more

API Reference

RESTful JSON API. All endpoints accept and return application/json. No auth required.

Method Endpoint Description
POST/resetStart episode — returns question, context, difficulty, episode_id
POST/stepSubmit answer with confidence + source_quote, receive reward breakdown
GET/stateCurrent episode metadata — accuracy, data_quality_rate, skill_rating
GET/tasksList all 3 tasks with action schema
POST/graderScore a completed episode (0.0 – 1.0) from rewards + infos
POST/baselineRun heuristic baseline across all 3 tasks
GET/metadataEnvironment name, version, license
GET/schemaFull JSON schemas for action, observation, state
GET/healthHealth check — returns {"status":"healthy"}
POST/mcpJSON-RPC 2.0 tool discovery for MCP clients
GET/leaderboardRanked leaderboard by avg_reward
POST/leaderboard/submitSubmit model results for ranking

Quick Start

Three commands to run your first episode.

bash
# Install and launch pip install -e . uvicorn server.app:app --port 7860 # Run heuristic baseline python inference.py --heuristic --env-url http://localhost:7860
python
import requests BASE = "http://localhost:7860" # 1. Reset — get a question + context obs = requests.post(f"{BASE}/reset", json={"difficulty": "beginner"}).json() session_id = obs["session_id"] print(obs["question"]) # 2. Step — submit your answer result = requests.post(f"{BASE}/step", json={ "answer": "Based on the context, ...", "confidence": 0.85, "source_quote": "verbatim text from context", "session_id": session_id, }).json() print(result["reward"]) # scalar in [0, 1] print(result["is_data_quality"]) # bool

Interactive Playground

Reset an episode, read the context, craft your answer, and see the live reward breakdown.

No episode active
Click Reset to load a question and context...
0.70
Reward Breakdown
Submit an answer to see the 9-component reward breakdown
Raw JSON response
Observation
Click Reset to start an episode.
""" # ═══════════════════════════════════════════════════════════════════════════════ # FASTAPI APP — session-isolated environments for thread safety # ═══════════════════════════════════════════════════════════════════════════════ _default_env: Optional[DataCleaningEnvironment] = None _env_loading = False _env_lock = threading.Lock() def _get_default_env() -> DataCleaningEnvironment: """Get or create the shared dataset-loader environment (used only for dataset access).""" global _default_env, _env_loading if _default_env is not None: return _default_env with _env_lock: if _default_env is not None: return _default_env _env_loading = True try: logger.info("Creating DataCleaningEnvironment (dataset loader)...") _default_env = DataCleaningEnvironment() logger.info(f"Environment ready — {_default_env.dataset_loader.get_total_examples():,} examples loaded.") return _default_env except Exception as e: logger.error(f"Failed to create environment: {e}") from dataset_loader import DatasetLoader class MinimalEnv: def __init__(self): self.dataset_loader = DatasetLoader() self.dataset_loader.examples = [] def reset(self, **kwargs): return type('Obs', (), {'question': 'Placeholder', 'context': 'Context', 'reward': 0.0, 'done': False, 'info': {}})() def step(self, action): return type('Obs', (), {'reward': 0.0, 'done': False, 'is_data_quality': False, 'info': {}})() def state(self): return {} def close(self): pass _default_env = MinimalEnv() return _default_env finally: _env_loading = False def _create_session_env(session_id: str) -> DataCleaningEnvironment: """Create a fresh per-session environment that shares the dataset loader (expensive to load) but has its own episode state (safe for concurrent use).""" loader_env = _get_default_env() # Pass the shared loader directly into __init__ so we skip the expensive # DatasetLoader() construction and dataset loading that would otherwise # happen inside DataQualityEnvironment.__init__ env = DataCleaningEnvironment(session_id=session_id, dataset_loader=loader_env.dataset_loader) return env _sessions: Dict[str, DataCleaningEnvironment] = {} _session_lock = threading.Lock() def _get_session(session_id: str) -> Optional[DataCleaningEnvironment]: """Retrieve an existing session environment.""" with _session_lock: return _sessions.get(session_id) def _cleanup_session(session_id: str): """Remove and clean up a session environment.""" with _session_lock: env = _sessions.pop(session_id, None) if env: try: env.close() except: pass @asynccontextmanager async def lifespan(app: FastAPI): global _default_env def preload_models(): try: logger.info("Preloading ML models...") import transformers transformers.logging.set_verbosity_error() from sentence_transformers import SentenceTransformer, CrossEncoder SentenceTransformer('all-MiniLM-L6-v2') CrossEncoder('cross-encoder/nli-deberta-v3-small') from rouge_score import rouge_scorer rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True) try: from grader import _get_bert_scorer _get_bert_scorer() except: pass logger.info("All ML models preloaded!") except Exception as e: logger.error(f"Model preload failed: {e}") threading.Thread(target=preload_models, daemon=True).start() def background_load(): try: logger.info("Background dataset loading...") env = _get_default_env() logger.info(f"Loaded {env.dataset_loader.get_total_examples():,} examples.") except Exception as e: logger.error(f"Background loading failed: {e}") threading.Thread(target=background_load, daemon=True).start() yield if _default_env: try: _default_env.close() except: pass app = FastAPI( lifespan=lifespan, title="DataQualityGuard-Env", version="4.2.0", docs_url="/swagger", redoc_url="/redoc", ) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) import json as _json _LEADERBOARD_FILE = "/tmp/data_quality_guard_leaderboard.json" def _load_leaderboard(): if os.path.exists(_LEADERBOARD_FILE): try: with open(_LEADERBOARD_FILE, "r", encoding="utf-8") as f: return _json.load(f) except Exception: pass return {} def _save_leaderboard(lb): try: with open(_LEADERBOARD_FILE, "w", encoding="utf-8") as f: _json.dump(lb, f, indent=2) except Exception: pass _leaderboard: Dict[str, Dict[str, Any]] = _load_leaderboard() def _safe_dict(obj): if hasattr(obj, 'model_dump'): return _safe_dict(obj.model_dump()) if hasattr(obj, 'dict'): return _safe_dict(obj.dict()) if dataclasses.is_dataclass(obj): return {f.name: _safe_dict(getattr(obj, f.name)) for f in dataclasses.fields(obj)} if isinstance(obj, enum.Enum): return obj.value if isinstance(obj, dict): return {k: _safe_dict(v) for k, v in obj.items()} if isinstance(obj, list): return [_safe_dict(i) for i in obj] if isinstance(obj, (str, int, float, bool, type(None))): return obj return str(obj) # ═══════════════════════════════════════════════════════════════════════════════ # ROUTES # ═══════════════════════════════════════════════════════════════════════════════ @app.get("/", include_in_schema=False, response_class=HTMLResponse) async def root(): return STUNNING_DOCS_HTML @app.get("/favicon.ico", include_in_schema=False) async def favicon(): # SVG favicon served as data-uri in the HTML; return 204 so browsers stop logging 404s return Response(status_code=204) @app.get("/docs", include_in_schema=False, response_class=HTMLResponse) async def docs(): return STUNNING_DOCS_HTML @app.post("/reset", tags=["Environment"]) async def reset(body: Dict[str, Any] = {}): try: # Create a per-session environment for thread safety session_id = body.get("session_id") or f"ses_{uuid.uuid4().hex[:8]}" env = _create_session_env(session_id) obs = env.reset(**{k: v for k, v in body.items() if k in ("seed", "episode_id", "difficulty")}) # Store the episode_id -> session mapping so /step can find this env episode_id = getattr(obs, 'episode_id', None) or body.get("episode_id") or session_id with _session_lock: _sessions[episode_id] = env _sessions[session_id] = env result = _safe_dict(obs) result["session_id"] = session_id return JSONResponse(content=result) except Exception as e: import traceback logger.error(f"Reset error: {e}\n{traceback.format_exc()}") raise HTTPException(500, str(e)) @app.post("/step", tags=["Environment"]) async def step(action_data: Dict[str, Any]): try: # Look up session by episode_id or session_id for thread safety session_id = action_data.pop("session_id", None) or action_data.pop("episode_id", None) env = _get_session(session_id) if session_id else None if env is None: # Fallback: use default env (single-user mode) env = _get_default_env() valid = set(DataCleaningAction.model_fields.keys()) if hasattr(DataCleaningAction, 'model_fields') else set(DataCleaningAction.__fields__.keys()) action = DataCleaningAction(**{k: v for k, v in action_data.items() if k in valid}) result = _safe_dict(env.step(action)) # If episode is done, clean up session if result.get("done", False) and session_id: _cleanup_session(session_id) return JSONResponse(content=result) except Exception as e: import traceback logger.error(f"Step error: {e}\n{traceback.format_exc()}") raise HTTPException(500, str(e)) @app.get("/state", tags=["Environment"]) async def get_state(session_id: Optional[str] = None): try: env = _get_session(session_id) if session_id else _get_default_env() return JSONResponse(content=_safe_dict(env.state())) except Exception as e: raise HTTPException(500, str(e)) @app.get("/tasks", tags=["OpenEnv"]) async def list_tasks(): ordered = ["task_1_factual_grounding", "task_2_multi_hop_synthesis", "task_3_adversarial_resistance"] return {"tasks": [ALL_TASKS[t].to_dict() for t in ordered if t in ALL_TASKS], "action_schema": ACTION_SCHEMA} @app.post("/grader", tags=["OpenEnv"]) async def grade_episode(body: Dict[str, Any]): task_id = body.get("task_id") if not task_id: raise HTTPException(422, "'task_id' required") task = get_task(task_id) if not task: raise HTTPException(404, f"task_id '{task_id}' not found") rewards, infos = body.get("step_rewards", []), body.get("step_infos", []) if not infos and rewards: return {"task_id": task_id, "score": round(sum(rewards)/len(rewards), 4)} return compute_task_score(task, rewards, infos) @app.post("/baseline", tags=["OpenEnv"]) async def run_baseline(body: Dict[str, Any] = {}): steps = max(3, min(10, int(body.get("steps_per_task", 5)))) seed = int(body.get("seed", 42)) results = [] for task_id, diff in [("task_1_factual_grounding","beginner"),("task_2_multi_hop_synthesis","intermediate"),("task_3_adversarial_resistance","advanced")]: task = get_task(task_id) if not task: continue sid = f"bl_{task_id}_{seed}" # Use session-based env with shared dataset loader env = _create_session_env(sid) obs_dict = _safe_dict(env.reset(seed=seed, difficulty=diff)) rewards, infos = [], [] for _ in range(steps): if obs_dict.get("done"): break ctx = obs_dict.get("context", "") action = DataQualityAction(answer=ctx[:100], confidence=0.6, source_quote=ctx[:80]) obs_dict = _safe_dict(env.step(action)) rewards.append(float(obs_dict.get("reward") or 0)) obs_meta = obs_dict.get("metadata", {}) if isinstance(obs_meta, dict): obs_correctness = obs_meta.get("correctness", 0.0) obs_calibration = obs_meta.get("calibration", 0.6) rb = obs_meta.get("reward_breakdown", {}) infos.append({ "correctness": obs_correctness, "grounding": obs_dict.get("grounding_score", 0), "calibration": obs_calibration, "data_quality_score": 1.0 if obs_dict.get("is_data_quality") else 0.0, "is_data_quality": bool(obs_dict.get("is_data_quality", False)), "semantic_consistency": rb.get("semantic_consistency", 0.0), "rouge_l": rb.get("rouge_l", 0.0), "bert_score": rb.get("bert_score", 0.0), "align_score": rb.get("align_score", 0.0), }) else: infos.append({ "correctness": 0.0, "grounding": obs_dict.get("grounding_score", 0), "calibration": 0.6, "data_quality_score": 1.0 if obs_dict.get("is_data_quality") else 0.0, "is_data_quality": bool(obs_dict.get("is_data_quality", False)), }) results.append(compute_task_score(task, rewards, infos)) try: env.close() except: pass return {"tasks": results, "summary": {"overall_score": round(sum(r["score"] for r in results)/max(len(results),1), 4)}} @app.post("/batch/evaluate", tags=["Evaluation"]) async def batch_evaluate(body: Dict[str, Any]): items = body.get("items", []) if not items: raise HTTPException(422, "'items' required") from server.grader import calculate_reward results = [] for i, item in enumerate(items): r, info = calculate_reward(item.get("answer",""), item.get("confidence",0.5), item.get("source_quote",""), item.get("context",""), item.get("ground_truth","")) results.append({"index": i, "reward": round(r,4), "is_data_quality": info.get("is_data_quality", False)}) return {"total_items": len(results), "results": results} @app.get("/leaderboard", tags=["Leaderboard"]) async def leaderboard(): if not _leaderboard: return {"leaderboard": [], "message": "No submissions"} ranked = sorted(_leaderboard.values(), key=lambda x: x.get("avg_reward",0), reverse=True) for i, e in enumerate(ranked): e["rank"] = i+1 return {"leaderboard": ranked} @app.post("/leaderboard/submit", tags=["Leaderboard"]) async def submit_leaderboard(data: Dict[str, Any]): required = ["model_name", "avg_reward", "avg_accuracy", "data_quality_rate", "total_episodes", "total_steps"] if missing := [f for f in required if f not in data]: raise HTTPException(422, f"Missing: {missing}") _leaderboard[data["model_name"]] = {**data, "submitted_at": time.time()} _save_leaderboard(_leaderboard) return {"status": "submitted", "model_name": data["model_name"]} @app.get("/health", tags=["Info"]) async def health(): return {"status": "healthy", "version": "4.2.0"} @app.get("/metadata", tags=["OpenEnv"]) async def metadata(): return { "name": "data_quality-guard-env", "version": "4.2.0", "license": "MIT", "description": ( "An OpenEnv RL environment that trains AI models to answer questions " "ONLY from verified context documents — penalizing data_quality and " "rewarding factual grounding." ), } @app.get("/schema", tags=["OpenEnv"]) async def schema(): return { "action": { "type": "object", "required": ["answer"], "properties": { "answer": {"type": "string", "description": "Answer derived ONLY from the provided context document."}, "confidence": {"type": "number", "minimum": 0.0, "maximum": 1.0, "default": 0.5}, "source_quote": {"type": "string", "default": ""}, "reasoning": {"type": "string", "default": ""}, }, }, "observation": { "type": "object", "properties": { "question": {"type": "string"}, "context": {"type": "string"}, "ground_truth": {"type": "string"}, "done": {"type": "boolean"}, "reward": {"type": "number"}, "feedback": {"type": "string"}, "is_data_quality": {"type": "boolean"}, "grounding_score": {"type": "number"}, "difficulty_level": {"type": "string"}, "attempts_remaining": {"type": "integer"}, }, }, "state": { "type": "object", "properties": { "episode_id": {"type": "string"}, "step_count": {"type": "integer"}, "accuracy": {"type": "number"}, "data_quality_rate": {"type": "number"}, "average_reward": {"type": "number"}, "current_difficulty": {"type": "string"}, "skill_rating": {"type": "number"}, "current_streak": {"type": "integer"}, "best_streak": {"type": "integer"}, }, }, } @app.get("/web", include_in_schema=False) async def web(): return FileResponse("server/static/index.html") @app.get("/datasets", tags=["Info"]) async def datasets(): try: return {"total_examples": _get_default_env().dataset_loader.get_total_examples()} except: return {"total_examples": 0} @app.post("/mcp", tags=["OpenEnv"]) async def mcp(body: Dict[str, Any]): if body.get("method") == "tools/list": return {"jsonrpc": "2.0", "id": body.get("id",1), "result": {"tools": [{"name": "reset", "inputSchema": {"type": "object"}}, {"name": "step", "inputSchema": {"type": "object"}}]}} return {"jsonrpc": "2.0", "id": body.get("id",1), "result": {"name": "data_quality-guard-env", "version": "4.2.0"}} @app.middleware("http") async def log_req(request, call_next): resp = await call_next(request) logger.info(f"{request.method} {request.url.path} → {resp.status_code}") return resp def main(): import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860) if __name__ == "__main__": main()