triflix's picture
Create app.py
b81f462 verified
# app.py
import shutil
import tempfile
import json
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
import snapshot_logic
# --- Configuration ---
# Use /tmp/ for ephemeral storage, suitable for Hugging Face Spaces
SNAPSHOTS_DIR = Path("/tmp/snapshots")
EXTERNAL_API_URL = "https://triflix-testingops.hf.space/analyze"
# Ensure the base directory exists on startup
snapshot_logic.ensure_outdir(SNAPSHOTS_DIR)
# --- FastAPI App Initialization ---
app = FastAPI(title="Data Analysis API with Snapshot Caching")
templates = Jinja2Templates(directory="templates")
# --- API Endpoints ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
"""Serve the main HTML page for file uploads."""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/upload-and-analyze/")
async def upload_and_analyze(
file: UploadFile = File(...),
force: bool = Form(False)
):
"""
Main endpoint to upload a file, analyze it, and return chart data.
It uses a snapshot system to avoid re-analyzing identical files.
"""
# 1. Validate file extension
file_ext = Path(file.filename).suffix.lower()
if file_ext not in snapshot_logic.ALLOWED_EXT:
raise HTTPException(
status_code=400,
detail=f"Invalid file type. Allowed types are: {', '.join(snapshot_logic.ALLOWED_EXT)}"
)
# 2. Save uploaded file to a temporary path for processing
with tempfile.NamedTemporaryFile(delete=False, suffix=file_ext) as tmp_file:
shutil.copyfileobj(file.file, tmp_file)
tmp_file_path = Path(tmp_file.name)
try:
# 3. Fingerprint the file
fp = snapshot_logic.fingerprint_from_file(tmp_file_path)
index = snapshot_logic.load_index(SNAPSHOTS_DIR)
# 4. Check for an existing snapshot unless 'force' is used
matched_id = None
if not force:
matched_id = snapshot_logic.find_matching_snapshot(
index, file_hash=fp["file_hash"], data_hash=fp["data_hash"]
)
if matched_id:
# --- REUSE EXISTING SNAPSHOT ---
snapshot_logic.log.info(f"Found matching snapshot (id={matched_id}). Reusing.")
api_response_path = SNAPSHOTS_DIR / "snapshots" / matched_id / "api_response.json"
if not api_response_path.exists():
raise HTTPException(status_code=500, detail="Snapshot data is corrupted or missing.")
api_response = json.loads(api_response_path.read_text())
return {
"status": "reused",
"snapshot_id": matched_id,
"source_filename": index[matched_id].get("source_filename", "N/A"),
"api_response": api_response
}
# --- CREATE NEW SNAPSHOT ---
snapshot_logic.log.info("No matching snapshot found or force=True. Creating new snapshot.")
# 5. Call the external analysis API
try:
api_response = snapshot_logic.post_file_to_endpoint(EXTERNAL_API_URL, tmp_file_path)
except Exception as e:
snapshot_logic.log.error(f"External API call failed: {e}")
raise HTTPException(status_code=502, detail=f"External API request failed: {e}")
# 6. Create and save the new snapshot bundle
snapshot_id = str(snapshot_logic.uuid.uuid4())
metadata = {
"snapshot_id": snapshot_id,
"source_filename": file.filename,
**fp, # unpack fingerprint data
"uploaded_at_utc": snapshot_logic.datetime.utcnow().isoformat() + "Z",
}
snapshot_logic.save_snapshot_bundle(
outdir=SNAPSHOTS_DIR,
snapshot_id=snapshot_id,
api_response=api_response,
src_path=tmp_file_path,
metadata=metadata
)
# 7. Update the master index
index[snapshot_id] = {
"file_hash": fp["file_hash"],
"data_hash": fp["data_hash"],
"created_at_utc": metadata["uploaded_at_utc"],
"source_filename": file.filename,
}
snapshot_logic.save_index(SNAPSHOTS_DIR, index)
return {
"status": "created",
"snapshot_id": snapshot_id,
"source_filename": file.filename,
"api_response": api_response
}
finally:
# 8. Clean up the temporary file
if tmp_file_path.exists():
tmp_file_path.unlink()
@app.get("/snapshots/{snapshot_id}/preprocessed")
async def get_preprocessed_data(snapshot_id: str):
"""Serves the preprocessed.csv file for a given snapshot."""
file_path = SNAPSHOTS_DIR / "snapshots" / snapshot_id / "preprocessed.csv"
if not file_path.exists():
raise HTTPException(status_code=404, detail="Preprocessed data not found.")
return FileResponse(file_path, media_type="text/csv", filename="preprocessed.csv")
@app.get("/snapshots/{snapshot_id}/column-stats")
async def get_column_stats(snapshot_id: str):
"""Serves the column_stats.json file for a given snapshot."""
file_path = SNAPSHOTS_DIR / "snapshots" / snapshot_id / "column_stats.json"
if not file_path.exists():
raise HTTPException(status_code=404, detail="Column stats not found.")
return FileResponse(file_path, media_type="application/json")