|
|
import os |
|
|
import shutil |
|
|
import aiofiles |
|
|
from fastapi import FastAPI, UploadFile, File, HTTPException, Request |
|
|
from fastapi.responses import HTMLResponse, FileResponse, JSONResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from fastapi.templating import Jinja2Templates |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
BASE_DIR = Path(__file__).resolve().parent |
|
|
FILES_DIR = BASE_DIR / "uploaded_files" |
|
|
STATIC_DIR = BASE_DIR / "static" |
|
|
TEMPLATES_DIR = BASE_DIR / "templates" |
|
|
|
|
|
|
|
|
FILES_DIR.mkdir(parents=True, exist_ok=True) |
|
|
STATIC_DIR.mkdir(parents=True, exist_ok=True) |
|
|
TEMPLATES_DIR.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
app = FastAPI(title="File Uploader/Downloader") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
templates = Jinja2Templates(directory=TEMPLATES_DIR) |
|
|
|
|
|
|
|
|
def sanitize_filename(filename: str) -> str: |
|
|
"""Basic filename sanitization.""" |
|
|
return os.path.basename(filename).strip() |
|
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
|
async def read_root(request: Request): |
|
|
"""Serves the main HTML page.""" |
|
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
@app.post("/upload/") |
|
|
async def upload_file(file: UploadFile = File(...)): |
|
|
"""Handles file uploads.""" |
|
|
original_filename = file.filename |
|
|
if not original_filename: |
|
|
raise HTTPException(status_code=400, detail="No filename provided.") |
|
|
|
|
|
filename = sanitize_filename(original_filename) |
|
|
if not filename: |
|
|
raise HTTPException(status_code=400, detail="Invalid filename after sanitization.") |
|
|
|
|
|
file_path = FILES_DIR / filename |
|
|
|
|
|
|
|
|
|
|
|
if not file_path.resolve().is_relative_to(FILES_DIR.resolve()): |
|
|
raise HTTPException(status_code=400, detail="Invalid file path (attempted directory traversal).") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
async with aiofiles.open(file_path, "wb") as buffer: |
|
|
while content := await file.read(1024 * 1024): |
|
|
await buffer.write(content) |
|
|
except Exception as e: |
|
|
|
|
|
if file_path.exists(): |
|
|
file_path.unlink() |
|
|
raise HTTPException(status_code=500, detail=f"Could not save file: {str(e)}") |
|
|
|
|
|
return JSONResponse( |
|
|
content={ |
|
|
"message": "File uploaded successfully", |
|
|
"filename": filename, |
|
|
"download_url": f"/download/{filename}" |
|
|
} |
|
|
) |
|
|
|
|
|
@app.get("/download/{filename}") |
|
|
async def download_file(filename: str): |
|
|
"""Handles file downloads.""" |
|
|
clean_filename = sanitize_filename(filename) |
|
|
if not clean_filename: |
|
|
raise HTTPException(status_code=400, detail="Invalid filename.") |
|
|
|
|
|
file_path = FILES_DIR / clean_filename |
|
|
|
|
|
|
|
|
if not file_path.resolve().is_relative_to(FILES_DIR.resolve()) or not file_path.is_file(): |
|
|
raise HTTPException(status_code=404, detail="File not found or access denied.") |
|
|
|
|
|
return FileResponse(path=file_path, filename=clean_filename, media_type='application/octet-stream') |
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
uvicorn.run(app, host="0.0.0.0", port=7860) |