# app.py import shutil import tempfile import json from pathlib import Path from typing import Optional from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from fastapi.templating import Jinja2Templates from fastapi.staticfiles import StaticFiles import snapshot_logic # --- Configuration --- # Use /tmp/ for ephemeral storage, suitable for Hugging Face Spaces SNAPSHOTS_DIR = Path("/tmp/snapshots") EXTERNAL_API_URL = "https://triflix-testingops.hf.space/analyze" # Ensure the base directory exists on startup snapshot_logic.ensure_outdir(SNAPSHOTS_DIR) # --- FastAPI App Initialization --- app = FastAPI(title="Data Analysis API with Snapshot Caching") templates = Jinja2Templates(directory="templates") # --- API Endpoints --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): """Serve the main HTML page for file uploads.""" return templates.TemplateResponse("index.html", {"request": request}) @app.post("/upload-and-analyze/") async def upload_and_analyze( file: UploadFile = File(...), force: bool = Form(False) ): """ Main endpoint to upload a file, analyze it, and return chart data. It uses a snapshot system to avoid re-analyzing identical files. """ # 1. Validate file extension file_ext = Path(file.filename).suffix.lower() if file_ext not in snapshot_logic.ALLOWED_EXT: raise HTTPException( status_code=400, detail=f"Invalid file type. Allowed types are: {', '.join(snapshot_logic.ALLOWED_EXT)}" ) # 2. Save uploaded file to a temporary path for processing with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp_file: shutil.copyfileobj(file.file, tmp_file) tmp_file_path = Path(tmp_file.name) try: # 3. Fingerprint the file fp = snapshot_logic.fingerprint_from_file(tmp_file_path) index = snapshot_logic.load_index(SNAPSHOTS_DIR) # 4. Check for an existing snapshot unless 'force' is used matched_id = None if not force: matched_id = snapshot_logic.find_matching_snapshot( index, file_hash=fp["file_hash"], data_hash=fp["data_hash"] ) if matched_id: # --- REUSE EXISTING SNAPSHOT --- snapshot_logic.log.info(f"Found matching snapshot (id={matched_id}). Reusing.") api_response_path = SNAPSHOTS_DIR / "snapshots" / matched_id / "api_response.json" if not api_response_path.exists(): raise HTTPException(status_code=500, detail="Snapshot data is corrupted or missing.") api_response = json.loads(api_response_path.read_text()) return { "status": "reused", "snapshot_id": matched_id, "source_filename": index[matched_id].get("source_filename", "N/A"), "api_response": api_response } # --- CREATE NEW SNAPSHOT --- snapshot_logic.log.info("No matching snapshot found or force=True. Creating new snapshot.") # 5. Call the external analysis API try: api_response = snapshot_logic.post_file_to_endpoint(EXTERNAL_API_URL, tmp_file_path) except Exception as e: snapshot_logic.log.error(f"External API call failed: {e}") raise HTTPException(status_code=502, detail=f"External API request failed: {e}") # 6. Create and save the new snapshot bundle snapshot_id = str(snapshot_logic.uuid.uuid4()) metadata = { "snapshot_id": snapshot_id, "source_filename": file.filename, **fp, # unpack fingerprint data "uploaded_at_utc": snapshot_logic.datetime.utcnow().isoformat() + "Z", } snapshot_logic.save_snapshot_bundle( outdir=SNAPSHOTS_DIR, snapshot_id=snapshot_id, api_response=api_response, src_path=tmp_file_path, metadata=metadata ) # 7. Update the master index index[snapshot_id] = { "file_hash": fp["file_hash"], "data_hash": fp["data_hash"], "created_at_utc": metadata["uploaded_at_utc"], "source_filename": file.filename, } snapshot_logic.save_index(SNAPSHOTS_DIR, index) return { "status": "created", "snapshot_id": snapshot_id, "source_filename": file.filename, "api_response": api_response } finally: # 8. Clean up the temporary file if tmp_file_path.exists(): tmp_file_path.unlink() @app.get("/snapshots/{snapshot_id}/preprocessed") async def get_preprocessed_data(snapshot_id: str): """Serves the preprocessed.csv file for a given snapshot.""" file_path = SNAPSHOTS_DIR / "snapshots" / snapshot_id / "preprocessed.csv" if not file_path.exists(): raise HTTPException(status_code=404, detail="Preprocessed data not found.") return FileResponse(file_path, media_type="text/csv", filename="preprocessed.csv") @app.get("/snapshots/{snapshot_id}/column-stats") async def get_column_stats(snapshot_id: str): """Serves the column_stats.json file for a given snapshot.""" file_path = SNAPSHOTS_DIR / "snapshots" / snapshot_id / "column_stats.json" if not file_path.exists(): raise HTTPException(status_code=404, detail="Column stats not found.") return FileResponse(file_path, media_type="application/json")