File size: 4,552 Bytes
81f731d c43f678 e595855 9de8bc6 e595855 9de8bc6 c43f678 4ab541b c43f678 e595855 4ab541b 9de8bc6 4ab541b 9de8bc6 4ab541b 81f731d 4ab541b 304a5a4 4ab541b e595855 4ab541b 9de8bc6 4ab541b 9de8bc6 4ab541b bcb19c3 4ab541b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | from fastapi import FastAPI, UploadFile, Request
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
import os
import uuid
import time
import threading
app = FastAPI()
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
# Store file metadata: path -> {time, original_name}
file_times = {}
file_meta = {}
# ββ Serve frontend ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get("/", response_class=HTMLResponse)
def home():
with open("templates/index.html", "r") as f:
return HTMLResponse(content=f.read())
# ββ Upload ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.post("/upload")
async def upload(file: UploadFile):
file_id = str(uuid.uuid4())[:8]
# Sanitize filename β replace spaces with underscores, keep extension safe
safe_name = file.filename.replace(" ", "_")
file_path = os.path.join(UPLOAD_DIR, f"{file_id}_{safe_name}")
with open(file_path, "wb") as f:
f.write(await file.read())
# Save upload time and original name
file_times[file_path] = time.time()
file_meta[file_path] = file.filename
download_id = f"{file_id}_{safe_name}"
return {
"link": f"/file/{download_id}",
"file_id": download_id,
"original_name": file.filename,
"expires_in": 300,
}
# ββ Download ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get("/file/{file_id:path}")
def get_file(file_id: str):
file_path = os.path.join(UPLOAD_DIR, file_id)
if not os.path.exists(file_path):
from fastapi.responses import JSONResponse
return JSONResponse(status_code=404, content={"error": "File expired or not found"})
# Check if expired (extra safety check)
if file_path in file_times:
if time.time() - file_times[file_path] > 300:
if os.path.exists(file_path):
os.remove(file_path)
file_times.pop(file_path, None)
file_meta.pop(file_path, None)
from fastapi.responses import JSONResponse
return JSONResponse(status_code=410, content={"error": "File has expired"})
original_name = file_meta.get(file_path, os.path.basename(file_path))
return FileResponse(
file_path,
filename=original_name,
headers={"Content-Disposition": f'attachment; filename="{original_name}"'},
)
# ββ File status endpoint (for countdown UI) βββββββββββββββββββββββββββββββββββ
@app.get("/status/{file_id:path}")
def file_status(file_id: str):
file_path = os.path.join(UPLOAD_DIR, file_id)
if file_path not in file_times:
return {"exists": False}
elapsed = time.time() - file_times[file_path]
remaining = max(0, 300 - elapsed)
return {
"exists": os.path.exists(file_path),
"remaining_seconds": int(remaining),
"original_name": file_meta.get(file_path, ""),
}
# ββ Auto-delete thread ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def cleanup_files():
while True:
now = time.time()
for path in list(file_times.keys()):
if now - file_times[path] > 300: # 5 minutes
if os.path.exists(path):
os.remove(path)
file_times.pop(path, None)
file_meta.pop(path, None)
time.sleep(60) # check every 1 minute
threading.Thread(target=cleanup_files, daemon=True).start()
# ββ Run βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |