uploadkro / app.py
triflix's picture
Create app.py
3fcdea6 verified
import os
import shutil
import aiofiles # For async file operations, good practice with FastAPI
from fastapi import FastAPI, UploadFile, File, HTTPException, Request
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from pathlib import Path
# --- Configuration ---
BASE_DIR = Path(__file__).resolve().parent
FILES_DIR = BASE_DIR / "uploaded_files"
STATIC_DIR = BASE_DIR / "static"
TEMPLATES_DIR = BASE_DIR / "templates"
# Create directories if they don't exist
FILES_DIR.mkdir(parents=True, exist_ok=True)
STATIC_DIR.mkdir(parents=True, exist_ok=True) # For CSS/JS if not using CDN
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(title="File Uploader/Downloader")
# Mount static files (if you have local CSS/JS)
# app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static")
# Setup Jinja2 templates
templates = Jinja2Templates(directory=TEMPLATES_DIR)
# --- Helper Functions ---
def sanitize_filename(filename: str) -> str:
"""Basic filename sanitization."""
return os.path.basename(filename).strip()
# --- Routes ---
@app.get("/", response_class=HTMLResponse)
async def read_root(request: Request):
"""Serves the main HTML page."""
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
"""Handles file uploads."""
original_filename = file.filename
if not original_filename:
raise HTTPException(status_code=400, detail="No filename provided.")
filename = sanitize_filename(original_filename)
if not filename: # After sanitization, if it's empty
raise HTTPException(status_code=400, detail="Invalid filename after sanitization.")
file_path = FILES_DIR / filename
# Security: Prevent writing outside the designated FILES_DIR
# (Pathlib helps, but an explicit check is good for clarity/robustness)
if not file_path.resolve().is_relative_to(FILES_DIR.resolve()):
raise HTTPException(status_code=400, detail="Invalid file path (attempted directory traversal).")
try:
# Stream the file to disk in chunks to handle large files efficiently
# and avoid loading the whole file into memory at once.
async with aiofiles.open(file_path, "wb") as buffer:
while content := await file.read(1024 * 1024): # Read 1MB chunks
await buffer.write(content)
except Exception as e:
# Clean up partially uploaded file on error
if file_path.exists():
file_path.unlink()
raise HTTPException(status_code=500, detail=f"Could not save file: {str(e)}")
return JSONResponse(
content={
"message": "File uploaded successfully",
"filename": filename,
"download_url": f"/download/{filename}" # Relative URL
}
)
@app.get("/download/{filename}")
async def download_file(filename: str):
"""Handles file downloads."""
clean_filename = sanitize_filename(filename)
if not clean_filename:
raise HTTPException(status_code=400, detail="Invalid filename.")
file_path = FILES_DIR / clean_filename
# Security: Ensure the file is within the FILES_DIR and exists
if not file_path.resolve().is_relative_to(FILES_DIR.resolve()) or not file_path.is_file():
raise HTTPException(status_code=404, detail="File not found or access denied.")
return FileResponse(path=file_path, filename=clean_filename, media_type='application/octet-stream')
if __name__ == "__main__":
import uvicorn
# This part is for local execution, not used by Docker CMD
uvicorn.run(app, host="0.0.0.0", port=7860)