import os import shutil import aiofiles # For async file operations, good practice with FastAPI from fastapi import FastAPI, UploadFile, File, HTTPException, Request from fastapi.responses import HTMLResponse, FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from pathlib import Path # --- Configuration --- BASE_DIR = Path(__file__).resolve().parent FILES_DIR = BASE_DIR / "uploaded_files" STATIC_DIR = BASE_DIR / "static" TEMPLATES_DIR = BASE_DIR / "templates" # Create directories if they don't exist FILES_DIR.mkdir(parents=True, exist_ok=True) STATIC_DIR.mkdir(parents=True, exist_ok=True) # For CSS/JS if not using CDN TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) app = FastAPI(title="File Uploader/Downloader") # Mount static files (if you have local CSS/JS) # app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") # Setup Jinja2 templates templates = Jinja2Templates(directory=TEMPLATES_DIR) # --- Helper Functions --- def sanitize_filename(filename: str) -> str: """Basic filename sanitization.""" return os.path.basename(filename).strip() # --- Routes --- @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): """Serves the main HTML page.""" return templates.TemplateResponse("index.html", {"request": request}) @app.post("/upload/") async def upload_file(file: UploadFile = File(...)): """Handles file uploads.""" original_filename = file.filename if not original_filename: raise HTTPException(status_code=400, detail="No filename provided.") filename = sanitize_filename(original_filename) if not filename: # After sanitization, if it's empty raise HTTPException(status_code=400, detail="Invalid filename after sanitization.") file_path = FILES_DIR / filename # Security: Prevent writing outside the designated FILES_DIR # (Pathlib helps, but an explicit check is good for clarity/robustness) if not file_path.resolve().is_relative_to(FILES_DIR.resolve()): raise HTTPException(status_code=400, detail="Invalid file path (attempted directory traversal).") try: # Stream the file to disk in chunks to handle large files efficiently # and avoid loading the whole file into memory at once. async with aiofiles.open(file_path, "wb") as buffer: while content := await file.read(1024 * 1024): # Read 1MB chunks await buffer.write(content) except Exception as e: # Clean up partially uploaded file on error if file_path.exists(): file_path.unlink() raise HTTPException(status_code=500, detail=f"Could not save file: {str(e)}") return JSONResponse( content={ "message": "File uploaded successfully", "filename": filename, "download_url": f"/download/{filename}" # Relative URL } ) @app.get("/download/{filename}") async def download_file(filename: str): """Handles file downloads.""" clean_filename = sanitize_filename(filename) if not clean_filename: raise HTTPException(status_code=400, detail="Invalid filename.") file_path = FILES_DIR / clean_filename # Security: Ensure the file is within the FILES_DIR and exists if not file_path.resolve().is_relative_to(FILES_DIR.resolve()) or not file_path.is_file(): raise HTTPException(status_code=404, detail="File not found or access denied.") return FileResponse(path=file_path, filename=clean_filename, media_type='application/octet-stream') if __name__ == "__main__": import uvicorn # This part is for local execution, not used by Docker CMD uvicorn.run(app, host="0.0.0.0", port=7860)