quickdrop / app.py
parthmax's picture
Update app.py
bcb19c3 verified
from fastapi import FastAPI, UploadFile, Request
from fastapi.responses import FileResponse, HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
import os
import uuid
import time
import threading
app = FastAPI()
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
UPLOAD_DIR = "uploads"
os.makedirs(UPLOAD_DIR, exist_ok=True)
# Store file metadata: path -> {time, original_name}
file_times = {}
file_meta = {}
# ── Serve frontend ────────────────────────────────────────────────────────────
@app.get("/", response_class=HTMLResponse)
def home():
with open("templates/index.html", "r") as f:
return HTMLResponse(content=f.read())
# ── Upload ────────────────────────────────────────────────────────────────────
@app.post("/upload")
async def upload(file: UploadFile):
file_id = str(uuid.uuid4())[:8]
# Sanitize filename β€” replace spaces with underscores, keep extension safe
safe_name = file.filename.replace(" ", "_")
file_path = os.path.join(UPLOAD_DIR, f"{file_id}_{safe_name}")
with open(file_path, "wb") as f:
f.write(await file.read())
# Save upload time and original name
file_times[file_path] = time.time()
file_meta[file_path] = file.filename
download_id = f"{file_id}_{safe_name}"
return {
"link": f"/file/{download_id}",
"file_id": download_id,
"original_name": file.filename,
"expires_in": 300,
}
# ── Download ──────────────────────────────────────────────────────────────────
@app.get("/file/{file_id:path}")
def get_file(file_id: str):
file_path = os.path.join(UPLOAD_DIR, file_id)
if not os.path.exists(file_path):
from fastapi.responses import JSONResponse
return JSONResponse(status_code=404, content={"error": "File expired or not found"})
# Check if expired (extra safety check)
if file_path in file_times:
if time.time() - file_times[file_path] > 300:
if os.path.exists(file_path):
os.remove(file_path)
file_times.pop(file_path, None)
file_meta.pop(file_path, None)
from fastapi.responses import JSONResponse
return JSONResponse(status_code=410, content={"error": "File has expired"})
original_name = file_meta.get(file_path, os.path.basename(file_path))
return FileResponse(
file_path,
filename=original_name,
headers={"Content-Disposition": f'attachment; filename="{original_name}"'},
)
# ── File status endpoint (for countdown UI) ───────────────────────────────────
@app.get("/status/{file_id:path}")
def file_status(file_id: str):
file_path = os.path.join(UPLOAD_DIR, file_id)
if file_path not in file_times:
return {"exists": False}
elapsed = time.time() - file_times[file_path]
remaining = max(0, 300 - elapsed)
return {
"exists": os.path.exists(file_path),
"remaining_seconds": int(remaining),
"original_name": file_meta.get(file_path, ""),
}
# ── Auto-delete thread ────────────────────────────────────────────────────────
def cleanup_files():
while True:
now = time.time()
for path in list(file_times.keys()):
if now - file_times[path] > 300: # 5 minutes
if os.path.exists(path):
os.remove(path)
file_times.pop(path, None)
file_meta.pop(path, None)
time.sleep(60) # check every 1 minute
threading.Thread(target=cleanup_files, daemon=True).start()
# ── Run ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
import uvicorn
uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True)