Spaces:
Sleeping
Sleeping
| # app.py | |
| import shutil | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request | |
| from fastapi.responses import HTMLResponse, JSONResponse, FileResponse | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| import snapshot_logic | |
| # --- Configuration --- | |
| # Use /tmp/ for ephemeral storage, suitable for Hugging Face Spaces | |
| SNAPSHOTS_DIR = Path("/tmp/snapshots") | |
| EXTERNAL_API_URL = "https://triflix-testingops.hf.space/analyze" | |
| # Ensure the base directory exists on startup | |
| snapshot_logic.ensure_outdir(SNAPSHOTS_DIR) | |
| # --- FastAPI App Initialization --- | |
| app = FastAPI(title="Data Analysis API with Snapshot Caching") | |
| templates = Jinja2Templates(directory="templates") | |
| # --- API Endpoints --- | |
| async def read_root(request: Request): | |
| """Serve the main HTML page for file uploads.""" | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def upload_and_analyze( | |
| file: UploadFile = File(...), | |
| force: bool = Form(False) | |
| ): | |
| """ | |
| Main endpoint to upload a file, analyze it, and return chart data. | |
| It uses a snapshot system to avoid re-analyzing identical files. | |
| """ | |
| # 1. Validate file extension | |
| file_ext = Path(file.filename).suffix.lower() | |
| if file_ext not in snapshot_logic.ALLOWED_EXT: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Invalid file type. Allowed types are: {', '.join(snapshot_logic.ALLOWED_EXT)}" | |
| ) | |
| # 2. Save uploaded file to a temporary path for processing | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp_file: | |
| shutil.copyfileobj(file.file, tmp_file) | |
| tmp_file_path = Path(tmp_file.name) | |
| try: | |
| # 3. Fingerprint the file | |
| fp = snapshot_logic.fingerprint_from_file(tmp_file_path) | |
| index = snapshot_logic.load_index(SNAPSHOTS_DIR) | |
| # 4. Check for an existing snapshot unless 'force' is used | |
| matched_id = None | |
| if not force: | |
| matched_id = snapshot_logic.find_matching_snapshot( | |
| index, file_hash=fp["file_hash"], data_hash=fp["data_hash"] | |
| ) | |
| if matched_id: | |
| # --- REUSE EXISTING SNAPSHOT --- | |
| snapshot_logic.log.info(f"Found matching snapshot (id={matched_id}). Reusing.") | |
| api_response_path = SNAPSHOTS_DIR / "snapshots" / matched_id / "api_response.json" | |
| if not api_response_path.exists(): | |
| raise HTTPException(status_code=500, detail="Snapshot data is corrupted or missing.") | |
| api_response = json.loads(api_response_path.read_text()) | |
| return { | |
| "status": "reused", | |
| "snapshot_id": matched_id, | |
| "source_filename": index[matched_id].get("source_filename", "N/A"), | |
| "api_response": api_response | |
| } | |
| # --- CREATE NEW SNAPSHOT --- | |
| snapshot_logic.log.info("No matching snapshot found or force=True. Creating new snapshot.") | |
| # 5. Call the external analysis API | |
| try: | |
| api_response = snapshot_logic.post_file_to_endpoint(EXTERNAL_API_URL, tmp_file_path) | |
| except Exception as e: | |
| snapshot_logic.log.error(f"External API call failed: {e}") | |
| raise HTTPException(status_code=502, detail=f"External API request failed: {e}") | |
| # 6. Create and save the new snapshot bundle | |
| snapshot_id = str(snapshot_logic.uuid.uuid4()) | |
| metadata = { | |
| "snapshot_id": snapshot_id, | |
| "source_filename": file.filename, | |
| **fp, # unpack fingerprint data | |
| "uploaded_at_utc": snapshot_logic.datetime.utcnow().isoformat() + "Z", | |
| } | |
| snapshot_logic.save_snapshot_bundle( | |
| outdir=SNAPSHOTS_DIR, | |
| snapshot_id=snapshot_id, | |
| api_response=api_response, | |
| src_path=tmp_file_path, | |
| metadata=metadata | |
| ) | |
| # 7. Update the master index | |
| index[snapshot_id] = { | |
| "file_hash": fp["file_hash"], | |
| "data_hash": fp["data_hash"], | |
| "created_at_utc": metadata["uploaded_at_utc"], | |
| "source_filename": file.filename, | |
| } | |
| snapshot_logic.save_index(SNAPSHOTS_DIR, index) | |
| return { | |
| "status": "created", | |
| "snapshot_id": snapshot_id, | |
| "source_filename": file.filename, | |
| "api_response": api_response | |
| } | |
| finally: | |
| # 8. Clean up the temporary file | |
| if tmp_file_path.exists(): | |
| tmp_file_path.unlink() | |
| async def get_preprocessed_data(snapshot_id: str): | |
| """Serves the preprocessed.csv file for a given snapshot.""" | |
| file_path = SNAPSHOTS_DIR / "snapshots" / snapshot_id / "preprocessed.csv" | |
| if not file_path.exists(): | |
| raise HTTPException(status_code=404, detail="Preprocessed data not found.") | |
| return FileResponse(file_path, media_type="text/csv", filename="preprocessed.csv") | |
| async def get_column_stats(snapshot_id: str): | |
| """Serves the column_stats.json file for a given snapshot.""" | |
| file_path = SNAPSHOTS_DIR / "snapshots" / snapshot_id / "column_stats.json" | |
| if not file_path.exists(): | |
| raise HTTPException(status_code=404, detail="Column stats not found.") | |
| return FileResponse(file_path, media_type="application/json") |