Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.responses import FileResponse | |
| from typing import List, Dict | |
| import os | |
| import shutil | |
| import uuid | |
| app = FastAPI() | |
| # Create uploads directory if it doesn't exist | |
| UPLOAD_DIR = "uploads" | |
| if not os.path.exists(UPLOAD_DIR): | |
| os.makedirs(UPLOAD_DIR) | |
| # Store file mappings with user info: {unique_code: {"filename": filename, "user_id": user_id}} | |
| file_codes: Dict[str, dict] = {} | |
| # Reverse mapping: {filename: {"code": code, "user_id": user_id}} | |
| filename_codes: Dict[str, dict] = {} | |
| async def upload_file(file: UploadFile = File(...), user_id: str = None): | |
| try: | |
| # Generate unique code | |
| unique_code = str(uuid.uuid4())[:8] | |
| # Save the uploaded file | |
| file_path = os.path.join(UPLOAD_DIR, file.filename) | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Store the mapping with user info | |
| file_codes[unique_code] = {"filename": file.filename, "user_id": user_id} | |
| filename_codes[file.filename] = {"code": unique_code, "user_id": user_id} | |
| return { | |
| "filename": file.filename, | |
| "access_code": unique_code, | |
| "message": "File uploaded successfully" | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def upload_multiple_files(files: List[UploadFile] = File(...)): | |
| try: | |
| uploaded_files = [] | |
| for file in files: | |
| # Generate unique code for each file | |
| unique_code = str(uuid.uuid4())[:8] | |
| file_path = os.path.join(UPLOAD_DIR, file.filename) | |
| with open(file_path, "wb") as buffer: | |
| shutil.copyfileobj(file.file, buffer) | |
| # Store the mapping | |
| file_codes[unique_code] = {"filename": file.filename, "user_id": None} | |
| filename_codes[file.filename] = {"code": unique_code, "user_id": None} | |
| uploaded_files.append({ | |
| "filename": file.filename, | |
| "access_code": unique_code | |
| }) | |
| return {"files": uploaded_files, "message": "Files uploaded successfully"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def list_files(user_id: str): | |
| """List files for a specific user""" | |
| try: | |
| files = [] | |
| for filename in os.listdir(UPLOAD_DIR): | |
| file_info = filename_codes.get(filename, {}) | |
| if file_info.get("user_id") == user_id: | |
| files.append({ | |
| "filename": filename, | |
| "access_code": file_info["code"] | |
| }) | |
| return {"files": files} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def download_file(access_code: str): | |
| try: | |
| if access_code not in file_codes: | |
| raise HTTPException(status_code=404, detail="Invalid access code") | |
| filename = file_codes[access_code]["filename"] | |
| file_path = os.path.join(UPLOAD_DIR, filename) | |
| if os.path.exists(file_path): | |
| return FileResponse(file_path, filename=filename) | |
| else: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def delete_file(access_code: str): | |
| try: | |
| if access_code not in file_codes: | |
| raise HTTPException(status_code=404, detail="Invalid access code") | |
| filename = file_codes[access_code]["filename"] | |
| file_path = os.path.join(UPLOAD_DIR, filename) | |
| if os.path.exists(file_path): | |
| os.remove(file_path) | |
| # Remove from both mappings | |
| del filename_codes[filename] | |
| del file_codes[access_code] | |
| return {"message": f"File {filename} deleted successfully"} | |
| else: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def direct_download(access_code: str): | |
| """Direct download route using just the access code in the URL""" | |
| try: | |
| if access_code not in file_codes: | |
| # If not a valid access code, return 404 or redirect to home | |
| raise HTTPException(status_code=404, detail="Invalid access code") | |
| filename = file_codes[access_code]["filename"] | |
| file_path = os.path.join(UPLOAD_DIR, filename) | |
| if os.path.exists(file_path): | |
| # Determine content disposition based on file type | |
| content_disposition = "inline" # For viewing in browser | |
| # For certain file types, force download | |
| if not filename.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.pdf', '.txt')): | |
| content_disposition = "attachment" | |
| return FileResponse( | |
| file_path, | |
| filename=filename, | |
| headers={"Content-Disposition": f"{content_disposition}; filename={filename}"} | |
| ) | |
| else: | |
| raise HTTPException(status_code=404, detail="File not found") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |
| #uvicorn.run(app, host="127.0.0.1", port=7860) |