import uuid from pathlib import Path import socketio from fastapi import FastAPI, Request, UploadFile, File from fastapi.responses import HTMLResponse, JSONResponse, FileResponse from starlette.middleware.sessions import SessionMiddleware # Create Socket.IO async server with ASGI mode sio = socketio.AsyncServer(async_mode='asgi') app = FastAPI() app.add_middleware(SessionMiddleware, secret_key="your-secret-key") # Mount the Socket.IO app with FastAPI socket_app = socketio.ASGIApp(sio, other_asgi_app=app) # Temporary directory for uploaded files UPLOAD_DIR = Path("temp_uploads") UPLOAD_DIR.mkdir(exist_ok=True) @app.get("/", response_class=HTMLResponse) async def index(request: Request): html_content = """ File Upload with Local History

Upload File

Your Upload History (Stored in Local Storage)

    """ return HTMLResponse(content=html_content) @app.post("/upload") async def upload_file(request: Request, file: UploadFile = File(...)): # Create or retrieve a unique session ID for the user (used for Socket.IO isolation) session_id = request.session.get("id") if not session_id: session_id = str(uuid.uuid4()) request.session["id"] = session_id # Generate a unique file ID and save the file in chunks file_id = str(uuid.uuid4()) file_path = UPLOAD_DIR / file_id CHUNK_SIZE = 1024 * 1024 # 1 MB total = 0 content = await file.read() total_size = len(content) with file_path.open("wb") as f: for i in range(0, total_size, CHUNK_SIZE): chunk = content[i:i+CHUNK_SIZE] f.write(chunk) total += len(chunk) percent = int((total / total_size) * 100) await sio.emit('upload_progress', {'percent': percent}, to=session_id) # Construct the download URL (using base URL and file_id) base_url = request.url_for("index").rstrip("/") download_url = f"{base_url}/download/{file_id}" # Emit completion event with the download URL await sio.emit('upload_complete', {'download_url': download_url}, to=session_id) return JSONResponse({"download_url": download_url}) @app.get("/download/{file_id}") async def download_file(file_id: str): file_path = UPLOAD_DIR / file_id if file_path.exists(): return FileResponse(file_path, filename="downloaded_file") return JSONResponse({"error": "File not found"}, status_code=404) @sio.event async def connect(sid, environ): # Use the socket id as the room identifier await sio.save_session(sid, {"room": sid}) await sio.enter_room(sid, sid) print(f"Client connected: {sid}") @sio.event async def disconnect(sid): print(f"Client disconnected: {sid}")