mobisoft's picture
Update app.py
8413f0a verified
from fastapi import FastAPI, UploadFile, File, Form
from fastapi.responses import JSONResponse, FileResponse
from fastapi.middleware.cors import CORSMiddleware
import os
import uuid
import shutil
import subprocess
import threading
import time
# =========================================
# FASTAPI APP
# =========================================
app = FastAPI()
# =========================================
# CORS
# =========================================
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# =========================================
# FOLDERS
# =========================================
UPLOAD_DIR = "uploads"
OUTPUT_DIR = "outputs"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)
# =========================================
# TASK STORE
# =========================================
tasks = {}
# =========================================
# GET VIDEO DURATION
# =========================================
def get_video_duration(filepath):
command = [
"ffprobe",
"-v",
"error",
"-show_entries",
"format=duration",
"-of",
"default=noprint_wrappers=1:nokey=1",
filepath
]
result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
try:
return float(result.stdout.strip())
except:
return 0
# =========================================
# COMPRESS VIDEO WORKER
# =========================================
def compress_worker(
task_id,
input_path,
output_path,
target_size_mb
):
try:
# =====================================
# GET DURATION
# =====================================
duration = get_video_duration(
input_path
)
if duration <= 0:
tasks[task_id] = {
"status": "error",
"progress": 0,
"message": "Invalid video"
}
return
# =====================================
# BITRATE CALCULATION
# =====================================
# Convert MB -> bits
total_bits = (
target_size_mb
* 1024
* 1024
* 8
)
# Audio bitrate
audio_bitrate = 96
# Calculate total bitrate
total_bitrate = int(
total_bits / duration / 1000
)
# Video bitrate
video_bitrate = (
total_bitrate - audio_bitrate
)
# Minimum safe bitrate
if video_bitrate < 250:
video_bitrate = 250
# =====================================
# UPDATE STATUS
# =====================================
tasks[task_id] = {
"status": "processing",
"progress": 0
}
# =====================================
# FFMPEG COMMAND
# =====================================
command = [
"ffmpeg",
"-i",
input_path,
# VIDEO CODEC
"-vcodec",
"libx264",
# FAST PRESET
"-preset",
"veryfast",
# VIDEO BITRATE
"-b:v",
f"{video_bitrate}k",
"-maxrate",
f"{video_bitrate}k",
"-bufsize",
f"{video_bitrate * 2}k",
# AUDIO
"-acodec",
"aac",
"-b:a",
f"{audio_bitrate}k",
# WEB OPTIMIZATION
"-movflags",
"+faststart",
# MULTI THREAD
"-threads",
"0",
# PROGRESS
"-progress",
"pipe:1",
# OVERWRITE
"-y",
output_path
]
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
universal_newlines=True
)
# =====================================
# REAL PROGRESS
# =====================================
while True:
line = process.stdout.readline()
if not line:
break
line = line.strip()
if "out_time_ms=" in line:
try:
out_time_ms = int(
line.split("=")[1]
)
current_time = (
out_time_ms / 1000000
)
percent = int(
(current_time / duration)
* 100
)
if percent > 100:
percent = 100
tasks[task_id] = {
"status": "processing",
"progress": percent
}
except:
pass
process.wait()
# =====================================
# CHECK RESULT
# =====================================
if process.returncode != 0:
tasks[task_id] = {
"status": "error",
"progress": 0,
"message": "Compression failed"
}
return
if not os.path.exists(output_path):
tasks[task_id] = {
"status": "error",
"progress": 0,
"message": "Output file missing"
}
return
# =====================================
# FINAL SIZE CHECK
# =====================================
output_size_mb = (
os.path.getsize(output_path)
/ 1024
/ 1024
)
# =====================================
# COMPLETE
# =====================================
tasks[task_id] = {
"status": "completed",
"progress": 100,
"download_url":
f"/download/{task_id}",
"output_size_mb":
round(output_size_mb, 2)
}
except Exception as e:
tasks[task_id] = {
"status": "error",
"progress": 0,
"message": str(e)
}
# =========================================
# COMPRESS API
# =========================================
@app.post("/compress")
async def compress_video(
file: UploadFile = File(...),
target_size: int = Form(...)
):
try:
if not file.filename:
return JSONResponse(
{"error": "No file uploaded"},
status_code=400
)
# =====================================
# TASK ID
# =====================================
task_id = str(uuid.uuid4())
input_path = os.path.join(
UPLOAD_DIR,
f"{task_id}_{file.filename}"
)
output_path = os.path.join(
OUTPUT_DIR,
f"{task_id}.mp4"
)
# =====================================
# SAVE FILE
# =====================================
with open(input_path, "wb") as buffer:
shutil.copyfileobj(
file.file,
buffer
)
# =====================================
# START THREAD
# =====================================
thread = threading.Thread(
target=compress_worker,
args=(
task_id,
input_path,
output_path,
target_size
)
)
thread.start()
return {
"success": True,
"task_id": task_id
}
except Exception as e:
return JSONResponse(
{"error": str(e)},
status_code=500
)
# =========================================
# PROGRESS API
# =========================================
@app.get("/progress/{task_id}")
async def get_progress(task_id: str):
if task_id not in tasks:
return JSONResponse(
{"error": "Task not found"},
status_code=404
)
return tasks[task_id]
# =========================================
# DOWNLOAD API
# =========================================
@app.get("/download/{task_id}")
async def download_video(task_id: str):
filepath = os.path.join(
OUTPUT_DIR,
f"{task_id}.mp4"
)
if not os.path.exists(filepath):
return JSONResponse(
{"error": "File not found"},
status_code=404
)
return FileResponse(
filepath,
media_type="video/mp4",
filename="compressed-video.mp4"
)
# =========================================
# AUTO CLEANUP
# =========================================
def cleanup_old_files():
while True:
now = time.time()
for folder in [
UPLOAD_DIR,
OUTPUT_DIR
]:
for filename in os.listdir(folder):
filepath = os.path.join(
folder,
filename
)
try:
age = (
now -
os.path.getmtime(filepath)
)
# Delete after 1 hour
if age > 3600:
os.remove(filepath)
except:
pass
time.sleep(1800)
cleanup_thread = threading.Thread(
target=cleanup_old_files,
daemon=True
)
cleanup_thread.start()
# =========================================
# ROOT
# =========================================
@app.get("/")
async def home():
return {
"message":
"Video Compressor API Running"
}
# =========================================
# RUN APP
# =========================================
if __name__ == "__main__":
import uvicorn
port = int(
os.environ.get("PORT", 7860)
)
uvicorn.run(
"app:app",
host="0.0.0.0",
port=port
)