ZU4 / app.py
Factor Studios
Update app.py
4fded7f verified
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import os
import uuid
import shutil
app = FastAPI()
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
UPLOAD_DIRECTORY = "./uploads"
os.makedirs(UPLOAD_DIRECTORY, exist_ok=True)
def format_bytes(bytes_value):
"""Convert bytes to human readable format"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes_value < 1024.0:
return f"{bytes_value:.2f} {unit}"
bytes_value /= 1024.0
return f"{bytes_value:.2f} PB"
def check_disk_space():
"""Check disk space on startup and log the information"""
try:
# Get disk usage for the current directory
total, used, free = shutil.disk_usage(".")
# Calculate usage percentage
usage_percentage = (used / total) * 100 if total > 0 else 0
print("=" * 50)
print("DISK SPACE CHECK ON STARTUP")
print("=" * 50)
print(f"Total space: {format_bytes(total)}")
print(f"Used space: {format_bytes(used)}")
print(f"Free space: {format_bytes(free)}")
print(f"Usage percentage: {usage_percentage:.2f}%")
print(f"Upload directory: {os.path.abspath(UPLOAD_DIRECTORY)}")
print(f"Upload directory exists: {os.path.exists(UPLOAD_DIRECTORY)}")
# Warning if disk space is low
if usage_percentage > 90:
print("⚠️ WARNING: Disk space is critically low!")
elif usage_percentage > 80:
print("⚠️ WARNING: Disk space is getting low!")
else:
print("✅ Disk space is adequate")
print("=" * 50)
except Exception as e:
print(f"❌ Failed to check disk space: {str(e)}")
@app.on_event("startup")
async def startup_event():
"""Run startup tasks"""
check_disk_space()
@app.post("/upload")
async def upload_mp4(file: UploadFile = File(...)):
if not file.filename.endswith(".mp4"):
raise HTTPException(status_code=400, detail="Invalid file type. Only MP4 files are allowed.")
file_id = str(uuid.uuid4()) + ".mp4"
file_path = os.path.join(UPLOAD_DIRECTORY, file_id)
try:
with open(file_path, "wb") as buffer:
while True:
chunk = await file.read(1024 * 1024) # Read in 1MB chunks
if not chunk:
break
buffer.write(chunk)
return JSONResponse(content={"id": file_id, "message": "File uploaded successfully"}, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to upload file: {str(e)}")
@app.get("/download/{mp4_id}")
async def download_mp4(mp4_id: str):
file_path = os.path.join(UPLOAD_DIRECTORY, mp4_id)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path, media_type="video/mp4", filename=mp4_id)
@app.delete("/delete/{mp4_id}")
async def delete_mp4(mp4_id: str):
file_path = os.path.join(UPLOAD_DIRECTORY, mp4_id)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
try:
os.remove(file_path)
return JSONResponse(content={"message": f"File {mp4_id} deleted successfully"}, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to delete file: {str(e)}")