"""Stateless FastAPI backend for the hosted DataForge playground. The hosted playground is intentionally split across two free-tier hosts: - Cloudflare Workers Static Assets serves the static frontend. - Hugging Face Spaces serves this API-only backend. All uploaded data is processed in memory or under a per-request temporary directory and is discarded before the request completes. """ import io import logging import os import tempfile import time from collections import defaultdict from collections.abc import Callable from importlib import import_module from pathlib import Path from typing import Any, Protocol, TypeVar, cast import pandas as pd from fastapi import FastAPI, HTTPException, Request, UploadFile from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint from starlette.responses import Response from starlette.types import ASGIApp from dataforge import ( CONTRACT_VERSION, Issue, RepairPipelineRequest, RepairTransaction, Severity, VerifiedFix, run_all_detectors, run_repair_pipeline, ) from dataforge.http.problem import problem_exception_handler, problem_response from dataforge.observability import configure_fastapi_observability class FallbackRateLimitExceededError(Exception): """Fallback exception shape matching slowapi's detail attribute.""" def __init__(self, detail: str) -> None: super().__init__(detail) self.detail = detail try: _slowapi_module = import_module("slowapi") _slowapi_errors = import_module("slowapi.errors") _slowapi_util = import_module("slowapi.util") _SlowapiLimiter: Any | None = _slowapi_module.Limiter _SlowapiRateLimitExceeded: type[Exception] | None = _slowapi_errors.RateLimitExceeded get_remote_address = cast(Callable[[Request], str], _slowapi_util.get_remote_address) SLOWAPI_AVAILABLE = True except ModuleNotFoundError: _SlowapiLimiter = None _SlowapiRateLimitExceeded = None SLOWAPI_AVAILABLE = False def get_remote_address(request: Request) -> str: """Return the client host for fallback rate-limit keys.""" return request.client.host if request.client else "unknown" _CallableT = TypeVar("_CallableT", bound=Callable[..., Any]) class _StorageLike(Protocol): """Minimal storage protocol used by tests and fallback middleware.""" def reset(self) -> None: ... class _LimiterLike(Protocol): """Minimal limiter protocol shared by slowapi and the fallback.""" _storage: _StorageLike def limit(self, limit_value: str) -> Callable[[_CallableT], _CallableT]: ... class _FallbackStorage: """Small in-memory windowed counter used when slowapi is unavailable.""" def __init__(self) -> None: self._hits: dict[tuple[str, str], list[float]] = defaultdict(list) def reset(self) -> None: """Clear all fallback counters.""" self._hits.clear() def allow(self, key: tuple[str, str], *, limit: int, window_seconds: float) -> bool: """Record a hit and return whether it fits inside the window.""" now = time.monotonic() hits = [seen for seen in self._hits[key] if now - seen < window_seconds] hits.append(now) self._hits[key] = hits return len(hits) <= limit class _FallbackLimiter: """Decorator-compatible fallback limiter.""" def __init__(self) -> None: self._storage: _StorageLike = _FallbackStorage() def limit(self, _limit_value: str) -> Callable[[_CallableT], _CallableT]: """Return an identity decorator; middleware enforces the limit.""" def decorator(func: _CallableT) -> _CallableT: return func return decorator _RateLimitExceeded: type[Exception] = ( _SlowapiRateLimitExceeded if _SlowapiRateLimitExceeded is not None else FallbackRateLimitExceededError ) logger = logging.getLogger("playground.api") logging.basicConfig(level=logging.INFO, format="%(asctime)s %(name)s %(levelname)s %(message)s") MAX_UPLOAD_BYTES = 1_048_576 MAX_MULTIPART_OVERHEAD_BYTES = 16_384 SAMPLES_DIR = Path(__file__).resolve().parent / "samples" SLOWAPI_CONFIG = Path(__file__).resolve().parent / "slowapi.env" ALLOWED_SAMPLES = {"hospital_10rows", "flights_10rows", "beers_10rows"} class SizeCapMiddleware(BaseHTTPMiddleware): """Reject requests whose declared Content-Length cannot contain a valid upload.""" def __init__( self, app: ASGIApp, max_file_bytes: int = MAX_UPLOAD_BYTES, max_multipart_overhead_bytes: int = MAX_MULTIPART_OVERHEAD_BYTES, ) -> None: super().__init__(app) self.max_file_bytes = max_file_bytes self.max_body_bytes = max_file_bytes + max_multipart_overhead_bytes async def dispatch( self, request: Request, call_next: RequestResponseEndpoint, ) -> Response: """Check Content-Length before any request body is read.""" content_length = request.headers.get("content-length") if content_length is not None: try: length = int(content_length) except ValueError: return JSONResponse(status_code=400, content={"error": "invalid_content_length"}) if length > self.max_body_bytes: logger.warning( "Rejected request: Content-Length %d exceeds max body %d", length, self.max_body_bytes, ) return problem_response( status=413, type_="https://dataforge.local/problems/file_too_large", title="File Too Large", detail="The uploaded request body exceeds the playground limit.", instance=str(request.url.path), error="file_too_large", max_bytes=self.max_file_bytes, ) return await call_next(request) class FallbackRateLimitMiddleware(BaseHTTPMiddleware): """Enforce the playground POST limit when slowapi is not installed.""" async def dispatch( self, request: Request, call_next: RequestResponseEndpoint, ) -> Response: """Apply a 10/minute in-memory fallback to mutating playground endpoints.""" if request.method == "POST" and request.url.path in {"/api/profile", "/api/repair"}: storage = limiter._storage key = (get_remote_address(request), request.url.path) if isinstance(storage, _FallbackStorage) and not storage.allow( key, limit=10, window_seconds=60.0, ): return problem_response( status=429, type_="https://dataforge.local/problems/rate_limit_exceeded", title="Rate Limit Exceeded", detail="10 per 1 minute", instance=str(request.url.path), headers={"Retry-After": "60"}, error="rate_limit_exceeded", ) return await call_next(request) if _SlowapiLimiter is not None: limiter: _LimiterLike = cast( _LimiterLike, _SlowapiLimiter(key_func=get_remote_address, config_filename=str(SLOWAPI_CONFIG)), ) else: limiter = _FallbackLimiter() def _advanced_available() -> bool: """Return whether at least one backend LLM provider is configured.""" return bool(os.environ.get("GROQ_API_KEY") or os.environ.get("GEMINI_API_KEY")) def _build_cors_origins() -> list[str]: """Build the explicit CORS allowlist from the environment.""" env_origins = os.environ.get("DATAFORGE_PLAYGROUND_ORIGINS", "") return [origin.strip() for origin in env_origins.split(",") if origin.strip()] def _build_cors_origin_regex() -> str | None: """Build the regex allowlist for local development only.""" patterns: list[str] = [] if os.environ.get("DATAFORGE_PLAYGROUND_DEV") == "1": patterns.append(r"http://(?:localhost|127(?:\.\d{1,3}){3})(?::\d+)?") if not patterns: return None return "^(" + "|".join(patterns) + ")$" app = FastAPI( title="DataForge Playground API", description="Stateless backend for the hosted DataForge playground.", version="0.1.0", docs_url="/api/docs", redoc_url=None, ) app.add_middleware( SizeCapMiddleware, max_file_bytes=MAX_UPLOAD_BYTES, max_multipart_overhead_bytes=MAX_MULTIPART_OVERHEAD_BYTES, ) if not SLOWAPI_AVAILABLE: app.add_middleware(FallbackRateLimitMiddleware) app.add_middleware( CORSMiddleware, allow_origins=_build_cors_origins(), allow_origin_regex=_build_cors_origin_regex(), allow_methods=["GET", "POST", "OPTIONS"], allow_headers=["*"], allow_credentials=False, ) app.state.limiter = limiter app.add_exception_handler(HTTPException, problem_exception_handler) configure_fastapi_observability(app, service_name="dataforge-playground-api") @app.exception_handler(_RateLimitExceeded) async def _rate_limit_handler(request: Request, exc: Exception) -> JSONResponse: """Return a machine-readable 429 response.""" detail = str(getattr(exc, "detail", str(exc))) return problem_response( status=429, type_="https://dataforge.local/problems/rate_limit_exceeded", title="Rate Limit Exceeded", detail=detail, instance=str(request.url.path), headers={"Retry-After": "60"}, error="rate_limit_exceeded", ) async def _read_upload(file: UploadFile) -> bytes: """Read an uploaded file with a defensive hard cap.""" data = await file.read(MAX_UPLOAD_BYTES + 1) if len(data) > MAX_UPLOAD_BYTES: raise HTTPException( status_code=413, detail={"error": "file_too_large", "max_bytes": MAX_UPLOAD_BYTES}, ) return data def _csv_to_df(data: bytes) -> pd.DataFrame: """Parse CSV bytes into a string-preserving DataFrame.""" return pd.read_csv( io.BytesIO(data), dtype=str, keep_default_na=False, na_filter=False, ) def _severity_to_str(severity: Severity) -> str: """Convert a Severity enum into the JSON response value.""" return severity.value def _issues_to_response( issues: list[Issue], df: pd.DataFrame, *, advanced_requested: bool, ) -> dict[str, Any]: """Format detected issues into the public playground JSON contract.""" grouped: dict[tuple[str, str, str], list[int]] = {} for issue in issues: key = (issue.column, issue.issue_type, _severity_to_str(issue.severity)) grouped.setdefault(key, []).append(issue.row) payload_issues: list[dict[str, Any]] = [] for (column, issue_type, severity), row_indices in grouped.items(): unique_rows = sorted(set(row_indices)) payload_issues.append( { "column": column, "issue_type": issue_type, "severity": severity, "row_indices": unique_rows, "count": len(unique_rows), } ) return { "issues": payload_issues, "meta": { "rows": len(df), "columns": len(df.columns), "column_names": list(df.columns), "total_issues": len(issues), "advanced_requested": advanced_requested, "api_version": app.version, "contract_version": CONTRACT_VERSION, }, } def _fixes_to_response( fixes: list[VerifiedFix], transaction: RepairTransaction, *, source_name: str, ) -> dict[str, Any]: """Format accepted repair proposals plus a redacted transaction journal.""" payload_fixes: list[dict[str, Any]] = [] for proposed_fix in fixes: payload_fixes.append( { "row": proposed_fix.row, "column": proposed_fix.column, "old_value": proposed_fix.old_value, "new_value": proposed_fix.new_value, "detector_id": proposed_fix.detector_id, "reason": proposed_fix.reason, "confidence": proposed_fix.confidence, "provenance": proposed_fix.provenance, } ) return { "fixes": payload_fixes, "txn_journal": { "txn_id": transaction.txn_id, "created_at": transaction.created_at.isoformat(), "source_name": source_name, "source_sha256": transaction.source_sha256, "fixes_count": len(transaction.fixes), "applied": transaction.applied, "events": [{"event_type": "created"}], "note": ( "Playground is stateless. This journal is ephemeral and discarded " "after the response. Install the CLI to apply and revert repairs." ), }, "meta": { "api_version": app.version, "contract_version": CONTRACT_VERSION, }, } def _require_advanced_mode(advanced_requested: bool) -> None: """Reject advanced mode requests unless a provider key is configured.""" if advanced_requested and not _advanced_available(): raise HTTPException(status_code=400, detail={"error": "advanced_mode_unavailable"}) def _run_repair_pipeline( *, upload_name: str, source_bytes: bytes, allow_llm: bool, ) -> tuple[list[VerifiedFix], RepairTransaction]: """Run the real dry-run repair pipeline inside a temporary workspace.""" with tempfile.TemporaryDirectory() as tmpdir: temp_root = Path(tmpdir) upload_path = temp_root / upload_name upload_path.write_bytes(source_bytes) result = run_repair_pipeline( RepairPipelineRequest( source_path=upload_path, mode="dry_run", schema=None, create_dry_run_transaction=True, allow_llm=allow_llm, ) ) if result.transaction is None: raise RuntimeError(result.receipt.reason) return result.fixes, result.transaction @app.get("/") async def root() -> dict[str, Any]: """Return service metadata for humans and uptime probes.""" return { "service": "DataForge Playground API", "status": "ok", "docs_url": "/api/docs", "frontend_hosting": "cloudflare_static_assets", } @app.get("/api/health") async def health() -> dict[str, Any]: """Return backend readiness plus UI-facing capability metadata.""" return { "status": "ok", "advanced_available": _advanced_available(), "max_upload_bytes": MAX_UPLOAD_BYTES, } @app.get("/api/samples/{name}") async def get_sample(name: str) -> StreamingResponse: """Return a bundled sample CSV by name.""" if name not in ALLOWED_SAMPLES: raise HTTPException( status_code=404, detail={"error": "sample_not_found", "available": sorted(ALLOWED_SAMPLES)}, ) csv_path = SAMPLES_DIR / f"{name}.csv" if not csv_path.exists(): logger.error("Sample file missing on disk: %s", csv_path) raise HTTPException(status_code=500, detail={"error": "sample_file_missing"}) return StreamingResponse( io.BytesIO(csv_path.read_bytes()), media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="{name}.csv"'}, ) @app.post("/api/profile") @limiter.limit("10/minute") async def profile(request: Request, file: UploadFile) -> dict[str, Any]: """Profile an uploaded CSV and return the detected issues.""" advanced_requested = request.query_params.get("advanced", "false").lower() == "true" _require_advanced_mode(advanced_requested) source_bytes = await _read_upload(file) upload_name = Path(file.filename or "upload.csv").name logger.info( "Profile request: filename=%s bytes=%d advanced=%s", upload_name, len(source_bytes), advanced_requested, ) try: df = _csv_to_df(source_bytes) issues = run_all_detectors(df, schema=None) except HTTPException: raise except Exception as exc: logger.exception("Profile endpoint failed") raise HTTPException( status_code=500, detail={ "error": "profile_failed", "message": "The profile pipeline could not complete safely.", }, ) from exc return _issues_to_response(issues, df, advanced_requested=advanced_requested) @app.post("/api/repair") @limiter.limit("10/minute") async def repair(request: Request, file: UploadFile) -> dict[str, Any]: """Return dry-run repair proposals plus an ephemeral transaction journal.""" dry_run = request.query_params.get("dry_run", "true").lower() == "true" advanced_requested = request.query_params.get("advanced", "false").lower() == "true" if not dry_run: raise HTTPException(status_code=400, detail={"error": "apply_not_supported"}) _require_advanced_mode(advanced_requested) source_bytes = await _read_upload(file) upload_name = Path(file.filename or "upload.csv").name logger.info( "Repair request: filename=%s bytes=%d advanced=%s", upload_name, len(source_bytes), advanced_requested, ) try: fixes, transaction = _run_repair_pipeline( upload_name=upload_name, source_bytes=source_bytes, allow_llm=advanced_requested, ) except HTTPException: raise except Exception as exc: logger.exception("Repair endpoint failed") raise HTTPException( status_code=500, detail={ "error": "repair_failed", "message": "The repair pipeline could not complete safely.", }, ) from exc return _fixes_to_response(fixes, transaction, source_name=upload_name)