from fastapi import FastAPI, UploadFile, Request from fastapi.responses import FileResponse, HTMLResponse from fastapi.middleware.cors import CORSMiddleware import os import uuid import time import threading app = FastAPI() # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) UPLOAD_DIR = "uploads" os.makedirs(UPLOAD_DIR, exist_ok=True) # Store file metadata: path -> {time, original_name} file_times = {} file_meta = {} # ── Serve frontend ──────────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) def home(): with open("templates/index.html", "r") as f: return HTMLResponse(content=f.read()) # ── Upload ──────────────────────────────────────────────────────────────────── @app.post("/upload") async def upload(file: UploadFile): file_id = str(uuid.uuid4())[:8] # Sanitize filename — replace spaces with underscores, keep extension safe safe_name = file.filename.replace(" ", "_") file_path = os.path.join(UPLOAD_DIR, f"{file_id}_{safe_name}") with open(file_path, "wb") as f: f.write(await file.read()) # Save upload time and original name file_times[file_path] = time.time() file_meta[file_path] = file.filename download_id = f"{file_id}_{safe_name}" return { "link": f"/file/{download_id}", "file_id": download_id, "original_name": file.filename, "expires_in": 300, } # ── Download ────────────────────────────────────────────────────────────────── @app.get("/file/{file_id:path}") def get_file(file_id: str): file_path = os.path.join(UPLOAD_DIR, file_id) if not os.path.exists(file_path): from fastapi.responses import JSONResponse return JSONResponse(status_code=404, content={"error": "File expired or not found"}) # Check if expired (extra safety check) if file_path in file_times: if time.time() - file_times[file_path] > 300: if os.path.exists(file_path): os.remove(file_path) file_times.pop(file_path, None) file_meta.pop(file_path, None) from fastapi.responses import JSONResponse return JSONResponse(status_code=410, content={"error": "File has expired"}) original_name = file_meta.get(file_path, os.path.basename(file_path)) return FileResponse( file_path, filename=original_name, headers={"Content-Disposition": f'attachment; filename="{original_name}"'}, ) # ── File status endpoint (for countdown UI) ─────────────────────────────────── @app.get("/status/{file_id:path}") def file_status(file_id: str): file_path = os.path.join(UPLOAD_DIR, file_id) if file_path not in file_times: return {"exists": False} elapsed = time.time() - file_times[file_path] remaining = max(0, 300 - elapsed) return { "exists": os.path.exists(file_path), "remaining_seconds": int(remaining), "original_name": file_meta.get(file_path, ""), } # ── Auto-delete thread ──────────────────────────────────────────────────────── def cleanup_files(): while True: now = time.time() for path in list(file_times.keys()): if now - file_times[path] > 300: # 5 minutes if os.path.exists(path): os.remove(path) file_times.pop(path, None) file_meta.pop(path, None) time.sleep(60) # check every 1 minute threading.Thread(target=cleanup_files, daemon=True).start() # ── Run ─────────────────────────────────────────────────────────────────────── if __name__ == "__main__": import uvicorn uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)