Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, UploadFile, File, HTTPException | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import os | |
| import uuid | |
| import shutil | |
| app = FastAPI() | |
| # Configure CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allows all origins | |
| allow_credentials=True, | |
| allow_methods=["*"], # Allows all methods | |
| allow_headers=["*"], # Allows all headers | |
| ) | |
| UPLOAD_DIRECTORY = "./uploads" | |
| os.makedirs(UPLOAD_DIRECTORY, exist_ok=True) | |
| def format_bytes(bytes_value): | |
| """Convert bytes to human readable format""" | |
| for unit in ['B', 'KB', 'MB', 'GB', 'TB']: | |
| if bytes_value < 1024.0: | |
| return f"{bytes_value:.2f} {unit}" | |
| bytes_value /= 1024.0 | |
| return f"{bytes_value:.2f} PB" | |
| def check_disk_space(): | |
| """Check disk space on startup and log the information""" | |
| try: | |
| # Get disk usage for the current directory | |
| total, used, free = shutil.disk_usage(".") | |
| # Calculate usage percentage | |
| usage_percentage = (used / total) * 100 if total > 0 else 0 | |
| print("=" * 50) | |
| print("DISK SPACE CHECK ON STARTUP") | |
| print("=" * 50) | |
| print(f"Total space: {format_bytes(total)}") | |
| print(f"Used space: {format_bytes(used)}") | |
| print(f"Free space: {format_bytes(free)}") | |
| print(f"Usage percentage: {usage_percentage:.2f}%") | |
| print(f"Upload directory: {os.path.abspath(UPLOAD_DIRECTORY)}") | |
| print(f"Upload directory exists: {os.path.exists(UPLOAD_DIRECTORY)}") | |
| # Warning if disk space is low | |
| if usage_percentage > 90: | |
| print("⚠️ WARNING: Disk space is critically low!") | |
| elif usage_percentage > 80: | |
| print("⚠️ WARNING: Disk space is getting low!") | |
| else: | |
| print("✅ Disk space is adequate") | |
| print("=" * 50) | |
| except Exception as e: | |
| print(f"❌ Failed to check disk space: {str(e)}") | |
| async def startup_event(): | |
| """Run startup tasks""" | |
| check_disk_space() | |
| async def upload_mp4(file: UploadFile = File(...)): | |
| if not file.filename.endswith(".mp4"): | |
| raise HTTPException(status_code=400, detail="Invalid file type. Only MP4 files are allowed.") | |
| file_id = str(uuid.uuid4()) + ".mp4" | |
| file_path = os.path.join(UPLOAD_DIRECTORY, file_id) | |
| try: | |
| with open(file_path, "wb") as buffer: | |
| while True: | |
| chunk = await file.read(1024 * 1024) # Read in 1MB chunks | |
| if not chunk: | |
| break | |
| buffer.write(chunk) | |
| return JSONResponse(content={"id": file_id, "message": "File uploaded successfully"}, status_code=200) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to upload file: {str(e)}") | |
| async def download_mp4(mp4_id: str): | |
| file_path = os.path.join(UPLOAD_DIRECTORY, mp4_id) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse(file_path, media_type="video/mp4", filename=mp4_id) | |
| async def delete_mp4(mp4_id: str): | |
| file_path = os.path.join(UPLOAD_DIRECTORY, mp4_id) | |
| if not os.path.exists(file_path): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| try: | |
| os.remove(file_path) | |
| return JSONResponse(content={"message": f"File {mp4_id} deleted successfully"}, status_code=200) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to delete file: {str(e)}") | |