Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| import shutil | |
| import time | |
| from collections import defaultdict, deque | |
| from pathlib import Path | |
| import pandas as pd | |
| from fastapi import BackgroundTasks, FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from web.job_store import JobStore | |
| from web.schemas import JobPreview, JobRequest, JobSummary, JobStatus | |
| REPO_ROOT = Path(__file__).resolve().parents[2] | |
| STATIC_DIR = REPO_ROOT / "web" / "static" | |
| JOBS_DIR = REPO_ROOT / "tmp" / "web_jobs" | |
| LOG_DIR = REPO_ROOT / "logs" | |
| LOG_DIR.mkdir(parents=True, exist_ok=True) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| filename=str(LOG_DIR / "webapp.log"), | |
| filemode="a", | |
| format="%(asctime)s %(levelname)s %(message)s", | |
| ) | |
| logger = logging.getLogger("equitia.web") | |
| app = FastAPI(title="EQUITIA Web API", version="0.1.0") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| MAX_PENDING_JOBS = int(os.getenv("EQUITIA_MAX_PENDING_JOBS", "20")) | |
| RETENTION_MINUTES = int(os.getenv("EQUITIA_RETENTION_MINUTES", "60")) | |
| job_store = JobStore(JOBS_DIR, max_pendientes=MAX_PENDING_JOBS, retention_minutes=RETENTION_MINUTES) | |
| MAX_BODY_BYTES = 300_000 | |
| RATE_LIMIT_WINDOW = 60 | |
| RATE_LIMIT_REQUESTS = 30 | |
| request_buckets: dict[str, deque[float]] = defaultdict(deque) | |
| blocked_ips: dict[str, float] = {} | |
| BLOCK_SECONDS = 300 | |
| MAX_429_BEFORE_BLOCK = 3 | |
| rate_limit_hits: dict[str, int] = defaultdict(int) | |
| async def rate_limit_and_size_guard(request: Request, call_next): | |
| client_ip = request.client.host if request.client else "unknown" | |
| now = time.time() | |
| blocked_until = blocked_ips.get(client_ip) | |
| if blocked_until and blocked_until > now: | |
| return JSONResponse(status_code=429, content={"detail": "IP temporalmente bloqueada por exceso de uso."}) | |
| if blocked_until and blocked_until <= now: | |
| blocked_ips.pop(client_ip, None) | |
| rate_limit_hits.pop(client_ip, None) | |
| bucket = request_buckets[client_ip] | |
| while bucket and now - bucket[0] > RATE_LIMIT_WINDOW: | |
| bucket.popleft() | |
| if len(bucket) >= RATE_LIMIT_REQUESTS: | |
| rate_limit_hits[client_ip] += 1 | |
| if rate_limit_hits[client_ip] >= MAX_429_BEFORE_BLOCK: | |
| blocked_ips[client_ip] = now + BLOCK_SECONDS | |
| return JSONResponse(status_code=429, content={"detail": "Rate limit excedido. Inténtalo más tarde."}) | |
| rate_limit_hits[client_ip] = 0 | |
| bucket.append(now) | |
| content_length = request.headers.get("content-length") | |
| if content_length and int(content_length) > MAX_BODY_BYTES: | |
| return JSONResponse(status_code=413, content={"detail": "Payload demasiado grande."}) | |
| return await call_next(request) | |
| def health() -> dict[str, str]: | |
| return {"status": "ok"} | |
| def obtener_schema_plantilla() -> dict: | |
| ruta = REPO_ROOT / "config" / "schemas" / "plantilla_general_ejemplo.json" | |
| if not ruta.exists(): | |
| raise HTTPException(status_code=404, detail="Schema no encontrado.") | |
| with open(ruta, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| def crear_job(payload: JobRequest) -> JobSummary: | |
| try: | |
| job = job_store.create_job(payload) | |
| logger.info("Job creado id=%s modo=%s tipo=%s", job.id, payload.modo_evaluacion, payload.tipo_evaluacion) | |
| return JobSummary( | |
| id=job.id, | |
| estado=job.estado, | |
| creado_en=job.creado_en, | |
| actualizado_en=job.actualizado_en, | |
| error=job.error, | |
| ) | |
| except RuntimeError as exc: | |
| raise HTTPException(status_code=429, detail=str(exc)) from exc | |
| def estado_job(job_id: str) -> JobSummary: | |
| job = job_store.get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job no encontrado.") | |
| return JobSummary( | |
| id=job.id, | |
| estado=job.estado, | |
| creado_en=job.creado_en, | |
| actualizado_en=job.actualizado_en, | |
| error=job.error, | |
| ) | |
| def preview_job(job_id: str) -> JobPreview: | |
| job = job_store.get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job no encontrado.") | |
| resumen = None | |
| if job.job_dir and (job.job_dir / "resumen.json").exists(): | |
| with open(job.job_dir / "resumen.json", "r", encoding="utf-8") as f: | |
| resumen = json.load(f) | |
| resultados_csv = job.job_dir / "graficos" / "resultados.csv" | |
| if resultados_csv.exists(): | |
| df = pd.read_csv(resultados_csv, sep="|") | |
| resumen["muestra"] = df.head(10).to_dict(orient="records") | |
| return JobPreview(id=job.id, estado=job.estado, resumen=resumen) | |
| def descargar_job(job_id: str, background_tasks: BackgroundTasks): | |
| job = job_store.get_job(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job no encontrado.") | |
| if job.estado != JobStatus.FINALIZADA: | |
| raise HTTPException(status_code=409, detail="El job no ha finalizado todavía.") | |
| if not job.job_dir or not job.job_dir.exists(): | |
| raise HTTPException(status_code=404, detail="No se encontraron artefactos para descargar.") | |
| zip_base = job.job_dir.parent / f"{job.id}_resultados" | |
| zip_path = Path(shutil.make_archive(str(zip_base), "zip", str(job.job_dir))) | |
| def _cleanup() -> None: | |
| try: | |
| if zip_path.exists(): | |
| zip_path.unlink(missing_ok=True) | |
| job_store.delete_job_artifacts(job_id) | |
| logger.info("Artefactos eliminados tras descarga job=%s", job_id) | |
| except Exception as exc: | |
| logger.error("Error limpiando artefactos job=%s error=%s", job_id, exc) | |
| background_tasks.add_task(_cleanup) | |
| return FileResponse(path=zip_path, filename=f"resultados_{job.id}.zip", media_type="application/zip") | |
| if STATIC_DIR.exists(): | |
| app.mount("/", StaticFiles(directory=str(STATIC_DIR), html=True), name="static") | |