Spaces:
Running
Running
| """Stateless FastAPI backend for the hosted DataForge playground. | |
| The Week 5 playground is intentionally split across two free-tier hosts: | |
| - Cloudflare Pages serves the static frontend. | |
| - Hugging Face Spaces serves this API-only backend. | |
| All uploaded data is processed in memory or under a per-request temporary | |
| directory and is discarded before the request completes. | |
| """ | |
| import io | |
| import logging | |
| import os | |
| import tempfile | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from typing import Any | |
| import pandas as pd | |
| from fastapi import FastAPI, HTTPException, Request, UploadFile | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from slowapi import Limiter | |
| from slowapi.errors import RateLimitExceeded | |
| from slowapi.util import get_remote_address | |
| from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint | |
| from starlette.responses import Response | |
| from starlette.types import ASGIApp | |
| from dataforge.cli.repair import _propose_repairs | |
| from dataforge.detectors import run_all_detectors | |
| from dataforge.detectors.base import Issue, Severity | |
| from dataforge.repairers.base import ProposedFix | |
| from dataforge.safety import SafetyFilter, SafetyVerdict | |
| from dataforge.transactions.log import ( | |
| append_created_transaction, | |
| sha256_bytes, | |
| snapshot_path_for, | |
| ) | |
| from dataforge.transactions.txn import RepairTransaction, generate_txn_id | |
| logger = logging.getLogger("playground.api") | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") | |
| MAX_UPLOAD_BYTES = 1_048_576 | |
| SAMPLES_DIR = Path(__file__).resolve().parent / "samples" | |
| ALLOWED_SAMPLES = {"hospital_10rows", "flights_10rows", "beers_10rows"} | |
| class SizeCapMiddleware(BaseHTTPMiddleware): | |
| """Reject requests whose declared Content-Length exceeds the upload cap.""" | |
| def __init__(self, app: ASGIApp, max_bytes: int = MAX_UPLOAD_BYTES) -> None: | |
| super().__init__(app) | |
| self.max_bytes = max_bytes | |
| async def dispatch( | |
| self, | |
| request: Request, | |
| call_next: RequestResponseEndpoint, | |
| ) -> Response: | |
| """Check Content-Length before any request body is read.""" | |
| content_length = request.headers.get("content-length") | |
| if content_length is not None: | |
| try: | |
| length = int(content_length) | |
| except ValueError: | |
| return JSONResponse(status_code=400, content={"error": "invalid_content_length"}) | |
| if length > self.max_bytes: | |
| logger.warning( | |
| "Rejected request: Content-Length %d exceeds max %d", | |
| length, | |
| self.max_bytes, | |
| ) | |
| return JSONResponse( | |
| status_code=413, | |
| content={"error": "file_too_large", "max_bytes": self.max_bytes}, | |
| ) | |
| return await call_next(request) | |
| limiter = Limiter(key_func=get_remote_address) | |
| def _advanced_available() -> bool: | |
| """Return whether at least one backend LLM provider is configured.""" | |
| return bool(os.environ.get("GROQ_API_KEY") or os.environ.get("GEMINI_API_KEY")) | |
| def _build_cors_origins() -> list[str]: | |
| """Build the explicit CORS allowlist from the environment.""" | |
| env_origins = os.environ.get("DATAFORGE_PLAYGROUND_ORIGINS", "") | |
| return [origin.strip() for origin in env_origins.split(",") if origin.strip()] | |
| def _build_cors_origin_regex() -> str: | |
| """Build the regex allowlist for Pages and optional localhost development.""" | |
| patterns = [r"https://.*\.pages\.dev"] | |
| if os.environ.get("DATAFORGE_PLAYGROUND_DEV") == "1": | |
| patterns.append(r"http://(?:localhost|127(?:\.\d{1,3}){3})(?::\d+)?") | |
| return "^(" + "|".join(patterns) + ")$" | |
| app = FastAPI( | |
| title="DataForge Playground API", | |
| description="Stateless backend for the hosted DataForge playground.", | |
| version="0.1.0", | |
| docs_url="/api/docs", | |
| redoc_url=None, | |
| ) | |
| app.add_middleware(SizeCapMiddleware, max_bytes=MAX_UPLOAD_BYTES) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=_build_cors_origins(), | |
| allow_origin_regex=_build_cors_origin_regex(), | |
| allow_methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["*"], | |
| allow_credentials=False, | |
| ) | |
| app.state.limiter = limiter | |
| async def _rate_limit_handler(request: Request, exc: RateLimitExceeded) -> JSONResponse: | |
| """Return a machine-readable 429 response.""" | |
| return JSONResponse( | |
| status_code=429, | |
| content={"error": "rate_limit_exceeded", "detail": str(exc.detail)}, | |
| ) | |
| async def _read_upload(file: UploadFile) -> bytes: | |
| """Read an uploaded file with a defensive hard cap.""" | |
| data = await file.read(MAX_UPLOAD_BYTES + 1) | |
| if len(data) > MAX_UPLOAD_BYTES: | |
| raise HTTPException( | |
| status_code=413, | |
| detail={"error": "file_too_large", "max_bytes": MAX_UPLOAD_BYTES}, | |
| ) | |
| return data | |
| def _csv_to_df(data: bytes) -> pd.DataFrame: | |
| """Parse CSV bytes into a string-preserving DataFrame.""" | |
| return pd.read_csv( | |
| io.BytesIO(data), | |
| dtype=str, | |
| keep_default_na=False, | |
| na_filter=False, | |
| ) | |
| def _severity_to_str(severity: Severity) -> str: | |
| """Convert a Severity enum into the JSON response value.""" | |
| return severity.value | |
| def _issues_to_response( | |
| issues: list[Issue], | |
| df: pd.DataFrame, | |
| *, | |
| advanced_requested: bool, | |
| ) -> dict[str, Any]: | |
| """Format detected issues into the public playground JSON contract.""" | |
| grouped: dict[tuple[str, str, str], list[int]] = {} | |
| for issue in issues: | |
| key = (issue.column, issue.issue_type, _severity_to_str(issue.severity)) | |
| grouped.setdefault(key, []).append(issue.row) | |
| payload_issues: list[dict[str, Any]] = [] | |
| for (column, issue_type, severity), row_indices in grouped.items(): | |
| unique_rows = sorted(set(row_indices)) | |
| payload_issues.append( | |
| { | |
| "column": column, | |
| "issue_type": issue_type, | |
| "severity": severity, | |
| "row_indices": unique_rows, | |
| "count": len(unique_rows), | |
| } | |
| ) | |
| return { | |
| "issues": payload_issues, | |
| "meta": { | |
| "rows": len(df), | |
| "columns": len(df.columns), | |
| "column_names": list(df.columns), | |
| "total_issues": len(issues), | |
| "advanced_requested": advanced_requested, | |
| }, | |
| } | |
| def _fixes_to_response( | |
| fixes: list[ProposedFix], | |
| transaction: RepairTransaction, | |
| *, | |
| source_name: str, | |
| ) -> dict[str, Any]: | |
| """Format accepted repair proposals plus a redacted transaction journal.""" | |
| payload_fixes: list[dict[str, Any]] = [] | |
| for proposed_fix in fixes: | |
| payload_fixes.append( | |
| { | |
| "row": proposed_fix.fix.row, | |
| "column": proposed_fix.fix.column, | |
| "old_value": proposed_fix.fix.old_value, | |
| "new_value": proposed_fix.fix.new_value, | |
| "detector_id": proposed_fix.fix.detector_id, | |
| "reason": proposed_fix.reason, | |
| "confidence": proposed_fix.confidence, | |
| "provenance": proposed_fix.provenance, | |
| } | |
| ) | |
| return { | |
| "fixes": payload_fixes, | |
| "txn_journal": { | |
| "txn_id": transaction.txn_id, | |
| "created_at": transaction.created_at.isoformat(), | |
| "source_name": source_name, | |
| "source_sha256": transaction.source_sha256, | |
| "fixes_count": len(transaction.fixes), | |
| "applied": transaction.applied, | |
| "events": [{"event_type": "created"}], | |
| "note": ( | |
| "Playground is stateless. This journal is ephemeral and discarded " | |
| "after the response. Install the CLI to apply and revert repairs." | |
| ), | |
| }, | |
| } | |
| def _require_advanced_mode(advanced_requested: bool) -> None: | |
| """Reject advanced mode requests unless a provider key is configured.""" | |
| if advanced_requested and not _advanced_available(): | |
| raise HTTPException(status_code=400, detail={"error": "advanced_mode_unavailable"}) | |
| def _write_ephemeral_transaction( | |
| *, | |
| upload_path: Path, | |
| source_bytes: bytes, | |
| fixes: list[ProposedFix], | |
| ) -> RepairTransaction: | |
| """Persist an ephemeral dry-run transaction under the temporary request directory.""" | |
| txn_id = generate_txn_id() | |
| snapshot_path = snapshot_path_for(upload_path, txn_id) | |
| snapshot_path.parent.mkdir(parents=True, exist_ok=True) | |
| snapshot_path.write_bytes(source_bytes) | |
| transaction = RepairTransaction( | |
| txn_id=txn_id, | |
| created_at=datetime.now(UTC), | |
| source_path=str(upload_path.resolve()), | |
| source_sha256=sha256_bytes(source_bytes), | |
| source_snapshot_path=str(snapshot_path.resolve()), | |
| fixes=[proposed_fix.fix for proposed_fix in fixes], | |
| applied=False, | |
| ) | |
| append_created_transaction(transaction) | |
| return transaction | |
| def _run_repair_pipeline( | |
| *, | |
| upload_name: str, | |
| source_bytes: bytes, | |
| allow_llm: bool, | |
| ) -> tuple[list[ProposedFix], RepairTransaction]: | |
| """Run the real dry-run repair pipeline inside a temporary workspace.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| temp_root = Path(tmpdir) | |
| upload_path = temp_root / upload_name | |
| upload_path.write_bytes(source_bytes) | |
| working_df = _csv_to_df(source_bytes) | |
| issues = run_all_detectors(working_df, schema=None) | |
| accepted_fixes, _attempt_groups = _propose_repairs( | |
| issues, | |
| upload_path, | |
| working_df.copy(deep=True), | |
| None, | |
| allow_llm=allow_llm, | |
| model="gemini-2.0-flash", | |
| allow_pii=False, | |
| confirm_pii=False, | |
| confirm_escalations=False, | |
| interactive=False, | |
| ) | |
| batch_safety = SafetyFilter().evaluate_batch(accepted_fixes) | |
| if batch_safety.verdict != SafetyVerdict.ALLOW: | |
| raise RuntimeError(f"Batch safety rejected dry-run fixes: {batch_safety.reason}") | |
| transaction = _write_ephemeral_transaction( | |
| upload_path=upload_path, | |
| source_bytes=source_bytes, | |
| fixes=accepted_fixes, | |
| ) | |
| return accepted_fixes, transaction | |
| async def root() -> dict[str, Any]: | |
| """Return service metadata for humans and uptime probes.""" | |
| return { | |
| "service": "DataForge Playground API", | |
| "status": "ok", | |
| "docs_url": "/api/docs", | |
| "frontend_hosting": "cloudflare_pages", | |
| } | |
| async def health() -> dict[str, Any]: | |
| """Return backend readiness plus UI-facing capability metadata.""" | |
| return { | |
| "status": "ok", | |
| "advanced_available": _advanced_available(), | |
| "max_upload_bytes": MAX_UPLOAD_BYTES, | |
| } | |
| async def get_sample(name: str) -> StreamingResponse: | |
| """Return a bundled sample CSV by name.""" | |
| if name not in ALLOWED_SAMPLES: | |
| raise HTTPException( | |
| status_code=404, | |
| detail={"error": "sample_not_found", "available": sorted(ALLOWED_SAMPLES)}, | |
| ) | |
| csv_path = SAMPLES_DIR / f"{name}.csv" | |
| if not csv_path.exists(): | |
| logger.error("Sample file missing on disk: %s", csv_path) | |
| raise HTTPException(status_code=500, detail={"error": "sample_file_missing"}) | |
| return StreamingResponse( | |
| io.BytesIO(csv_path.read_bytes()), | |
| media_type="text/csv", | |
| headers={"Content-Disposition": f'attachment; filename="{name}.csv"'}, | |
| ) | |
| async def profile(request: Request, file: UploadFile) -> dict[str, Any]: | |
| """Profile an uploaded CSV and return the detected issues.""" | |
| advanced_requested = request.query_params.get("advanced", "false").lower() == "true" | |
| _require_advanced_mode(advanced_requested) | |
| source_bytes = await _read_upload(file) | |
| upload_name = Path(file.filename or "upload.csv").name | |
| logger.info( | |
| "Profile request: filename=%s bytes=%d advanced=%s", | |
| upload_name, | |
| len(source_bytes), | |
| advanced_requested, | |
| ) | |
| try: | |
| df = _csv_to_df(source_bytes) | |
| issues = run_all_detectors(df, schema=None) | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| logger.exception("Profile endpoint failed") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "error": "profile_failed", | |
| "message": "The profile pipeline could not complete safely.", | |
| }, | |
| ) from exc | |
| return _issues_to_response(issues, df, advanced_requested=advanced_requested) | |
| async def repair(request: Request, file: UploadFile) -> dict[str, Any]: | |
| """Return dry-run repair proposals plus an ephemeral transaction journal.""" | |
| dry_run = request.query_params.get("dry_run", "true").lower() == "true" | |
| advanced_requested = request.query_params.get("advanced", "false").lower() == "true" | |
| if not dry_run: | |
| raise HTTPException(status_code=400, detail={"error": "apply_not_supported"}) | |
| _require_advanced_mode(advanced_requested) | |
| source_bytes = await _read_upload(file) | |
| upload_name = Path(file.filename or "upload.csv").name | |
| logger.info( | |
| "Repair request: filename=%s bytes=%d advanced=%s", | |
| upload_name, | |
| len(source_bytes), | |
| advanced_requested, | |
| ) | |
| try: | |
| fixes, transaction = _run_repair_pipeline( | |
| upload_name=upload_name, | |
| source_bytes=source_bytes, | |
| allow_llm=advanced_requested, | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as exc: | |
| logger.exception("Repair endpoint failed") | |
| raise HTTPException( | |
| status_code=500, | |
| detail={ | |
| "error": "repair_failed", | |
| "message": "The repair pipeline could not complete safely.", | |
| }, | |
| ) from exc | |
| return _fixes_to_response(fixes, transaction, source_name=upload_name) | |