Aperture / app /main.py
KSvend
feat: rich results dashboard with inline sparklines and overview
57ba197
import asyncio
import os
import pathlib
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi import Depends
from app.database import Database
from app.worker import worker_loop
from app.eo_products import registry
from app.api.jobs import router as jobs_router, init_router as init_jobs
from app.api.products_api import router as products_router
from app.api.auth import router as auth_router, get_current_user
from app.advisor import get_aoi_advice
from app.models import AoiAdviceRequest
# Resolve paths relative to this file so they work regardless of cwd
_HERE = pathlib.Path(__file__).resolve().parent
_FRONTEND_DIR = _HERE.parent / "frontend"
_INDEX_HTML = _FRONTEND_DIR / "index.html"
def create_app(db_path: str = "aperture.db", run_worker: bool = False) -> FastAPI:
db = Database(db_path)
@asynccontextmanager
async def lifespan(app: FastAPI):
from app.config import OPENEO_CLIENT_ID, OPENEO_CLIENT_SECRET
if OPENEO_CLIENT_ID and OPENEO_CLIENT_SECRET:
print(f"[Aperture] CDSE credentials configured (client_id={OPENEO_CLIENT_ID[:8]}...)")
else:
print("[Aperture] WARNING: CDSE credentials NOT configured β€” EO products will fail")
from app.config import ANTHROPIC_API_KEY
if ANTHROPIC_API_KEY:
print("[Aperture] Anthropic API key configured (AOI advisor enabled)")
else:
print("[Aperture] Anthropic API key not set β€” AOI advisor disabled")
print(f"[Aperture] SPACE_ID={os.environ.get('SPACE_ID', '<not set>')}")
await db.init()
worker_task = None
if run_worker:
worker_task = asyncio.create_task(worker_loop(db, registry))
yield
if worker_task is not None:
worker_task.cancel()
app = FastAPI(title="MERLx Aperture", lifespan=lifespan)
# CORS β€” restrict to same-origin by default, configurable via env
allowed_origins = os.environ.get("APERTURE_CORS_ORIGINS", "").split(",")
allowed_origins = [o.strip() for o in allowed_origins if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins or ["*"] if os.environ.get("APERTURE_DEMO", "true").lower() == "true" else [],
allow_methods=["GET", "POST"],
allow_headers=["Authorization", "Content-Type"],
)
init_jobs(db)
app.include_router(jobs_router)
app.include_router(products_router)
app.include_router(auth_router)
@app.post("/api/aoi-advice")
async def aoi_advice(request: AoiAdviceRequest, email: str = Depends(get_current_user)):
return await get_aoi_advice(request.bbox)
@app.get("/health")
async def health():
return {"status": "ok"}
# ── Download endpoints (auth-gated) ─────────────────────────────
async def _verify_job_access(job_id: str, email: str):
"""Check job exists, is complete, and belongs to the user."""
job = await db.get_job(job_id)
if job is None:
raise HTTPException(status_code=404, detail="Job not found")
if job.request.email != email:
raise HTTPException(status_code=403, detail="Not your job")
return job
@app.get("/api/jobs/{job_id}/report")
async def download_report(job_id: str, email: str = Depends(get_current_user)):
job = await _verify_job_access(job_id, email)
if job.status.value != "complete":
raise HTTPException(status_code=404, detail="Report not available yet")
pdf_path = _HERE.parent / "results" / job_id / "report.pdf"
if not pdf_path.exists():
raise HTTPException(status_code=404, detail="Report file not found")
return FileResponse(
path=str(pdf_path),
media_type="application/pdf",
filename=f"aperture_report_{job_id}.pdf",
)
@app.get("/api/jobs/{job_id}/package")
async def download_package(job_id: str, email: str = Depends(get_current_user)):
job = await _verify_job_access(job_id, email)
if job.status.value != "complete":
raise HTTPException(status_code=404, detail="Package not available yet")
zip_path = _HERE.parent / "results" / job_id / "package.zip"
if not zip_path.exists():
raise HTTPException(status_code=404, detail="Package file not found")
return FileResponse(
path=str(zip_path),
media_type="application/zip",
filename=f"aperture_package_{job_id}.zip",
)
@app.get("/api/jobs/{job_id}/maps/{product_id}")
async def get_product_map(job_id: str, product_id: str, email: str = Depends(get_current_user)):
await _verify_job_access(job_id, email)
map_path = _HERE.parent / "results" / job_id / f"{product_id}_map.png"
if not map_path.exists():
raise HTTPException(status_code=404, detail="Map not available for this EO product")
return FileResponse(
path=str(map_path),
media_type="image/png",
)
@app.get("/api/jobs/{job_id}/spatial/{product_id}")
async def get_product_spatial(job_id: str, product_id: str, email: str = Depends(get_current_user)):
await _verify_job_access(job_id, email)
spatial_path = _HERE.parent / "results" / job_id / f"{product_id}_spatial.json"
if not spatial_path.exists():
raise HTTPException(status_code=404, detail="Spatial data not available for this EO product")
import json as _json
with open(spatial_path) as f:
data = _json.load(f)
from fastapi.responses import JSONResponse
return JSONResponse(content=data)
@app.get("/api/jobs/{job_id}/overview")
async def get_job_overview(job_id: str, email: str = Depends(get_current_user)):
"""Return composite score + compound signals for the results dashboard.
Reads the same files the PDF report renderer consumes so the web
dashboard can show the same findings without re-running any analysis.
"""
await _verify_job_access(job_id, email)
import json as _json
from fastapi.responses import JSONResponse
results_dir = _HERE.parent / "results" / job_id
overview_path = results_dir / "overview_score.json"
signals_path = results_dir / "compound_signals.json"
overview: dict = {}
if overview_path.exists():
with open(overview_path) as f:
overview = _json.load(f)
signals: list = []
if signals_path.exists():
with open(signals_path) as f:
signals = _json.load(f)
return JSONResponse(content={"overview": overview, "compound_signals": signals})
# ── Static files + SPA root ───────────────────────────────────────
if _FRONTEND_DIR.exists():
app.mount("/static", StaticFiles(directory=str(_FRONTEND_DIR)), name="static")
@app.get("/", response_class=HTMLResponse)
async def serve_index():
if _INDEX_HTML.exists():
return HTMLResponse(content=_INDEX_HTML.read_text(encoding="utf-8"))
return HTMLResponse(content="<h1>MERLx Aperture</h1><p>Frontend not found.</p>")
return app
app = create_app(run_worker=True)