| from fastapi import FastAPI, UploadFile, Request |
| from fastapi.responses import FileResponse, HTMLResponse |
| from fastapi.middleware.cors import CORSMiddleware |
| import os |
| import uuid |
| import time |
| import threading |
|
|
| app = FastAPI() |
|
|
| |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| UPLOAD_DIR = "uploads" |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
|
|
| |
| file_times = {} |
| file_meta = {} |
|
|
| |
| @app.get("/", response_class=HTMLResponse) |
| def home(): |
| with open("templates/index.html", "r") as f: |
| return HTMLResponse(content=f.read()) |
|
|
| |
| @app.post("/upload") |
| async def upload(file: UploadFile): |
| file_id = str(uuid.uuid4())[:8] |
| |
| safe_name = file.filename.replace(" ", "_") |
| file_path = os.path.join(UPLOAD_DIR, f"{file_id}_{safe_name}") |
|
|
| with open(file_path, "wb") as f: |
| f.write(await file.read()) |
|
|
| |
| file_times[file_path] = time.time() |
| file_meta[file_path] = file.filename |
|
|
| download_id = f"{file_id}_{safe_name}" |
| return { |
| "link": f"/file/{download_id}", |
| "file_id": download_id, |
| "original_name": file.filename, |
| "expires_in": 300, |
| } |
|
|
| |
| @app.get("/file/{file_id:path}") |
| def get_file(file_id: str): |
| file_path = os.path.join(UPLOAD_DIR, file_id) |
|
|
| if not os.path.exists(file_path): |
| from fastapi.responses import JSONResponse |
| return JSONResponse(status_code=404, content={"error": "File expired or not found"}) |
|
|
| |
| if file_path in file_times: |
| if time.time() - file_times[file_path] > 300: |
| if os.path.exists(file_path): |
| os.remove(file_path) |
| file_times.pop(file_path, None) |
| file_meta.pop(file_path, None) |
| from fastapi.responses import JSONResponse |
| return JSONResponse(status_code=410, content={"error": "File has expired"}) |
|
|
| original_name = file_meta.get(file_path, os.path.basename(file_path)) |
| return FileResponse( |
| file_path, |
| filename=original_name, |
| headers={"Content-Disposition": f'attachment; filename="{original_name}"'}, |
| ) |
|
|
| |
| @app.get("/status/{file_id:path}") |
| def file_status(file_id: str): |
| file_path = os.path.join(UPLOAD_DIR, file_id) |
| if file_path not in file_times: |
| return {"exists": False} |
| elapsed = time.time() - file_times[file_path] |
| remaining = max(0, 300 - elapsed) |
| return { |
| "exists": os.path.exists(file_path), |
| "remaining_seconds": int(remaining), |
| "original_name": file_meta.get(file_path, ""), |
| } |
|
|
| |
| def cleanup_files(): |
| while True: |
| now = time.time() |
| for path in list(file_times.keys()): |
| if now - file_times[path] > 300: |
| if os.path.exists(path): |
| os.remove(path) |
| file_times.pop(path, None) |
| file_meta.pop(path, None) |
| time.sleep(60) |
|
|
| threading.Thread(target=cleanup_files, daemon=True).start() |
|
|
| |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |