ffmpeg_demo / app.py
Kingoteam's picture
Update app.py
ee35bb6 verified
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import FileResponse
import subprocess
import os
import uuid
import logging
import json
# تنظیم لاگ برای دیباگ
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI()
def get_video_resolution(input_path: str) -> tuple[int, int]:
"""دریافت رزولوشن ویدیوی ورودی با استفاده از ffprobe"""
try:
command = [
"ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height", "-of", "json",
input_path
]
result = subprocess.run(command, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
width = data["streams"][0]["width"]
height = data["streams"][0]["height"]
logger.info(f"Input resolution: {width}x{height}")
return width, height
except subprocess.CalledProcessError as e:
logger.error(f"ffprobe error: {e.stderr}")
raise ValueError("خطا در دریافت رزولوشن ویدیو")
def compress_video(input_path: str, crf: int, resolution: str) -> str:
logger.info(f"Starting video compression for {input_path}")
try:
# بررسی وجود فایل ورودی
if not os.path.exists(input_path):
raise ValueError("فایل ویدیویی یافت نشد!")
# تولید نام یکتا برای فایل خروجی
output_path = f"compressed_{uuid.uuid4().hex}.mp4"
# دریافت رزولوشن ورودی
input_width, input_height = get_video_resolution(input_path)
# تنظیم رزولوشن خروجی
scale_filter = None
target_resolutions = {
"720p": (1280, 720),
"480p": (854, 480),
"360p": (640, 360),
"240p": (426, 240),
"original": (input_width, input_height)
}
if resolution not in target_resolutions:
raise ValueError("رزولوشن نامعتبر!")
target_width, target_height = target_resolutions[resolution]
# محدود کردن رزولوشن خروجی به حداکثر رزولوشن ورودی
if target_width > input_width or target_height > input_height:
logger.info(f"Target resolution {target_width}x{target_height} is larger than input {input_width}x{input_height}, using input resolution")
target_width, target_height = input_width, input_height
scale_filter = f"scale={target_width}:{target_height}:force_original_aspect_ratio=decrease"
# ساخت دستور FFmpeg
command = [
"ffmpeg", "-y", "-i", input_path,
"-vcodec", "libx264", "-crf", str(crf), "-preset", "medium",
"-acodec", "aac", "-b:a", "128k"
]
if scale_filter:
command.extend(["-vf", scale_filter])
command.append(output_path)
# اجرای دستور FFmpeg
result = subprocess.run(command, check=True, capture_output=True, text=True)
logger.info(f"Video compressed successfully: {output_path}")
# بررسی وجود فایل خروجی
if not os.path.exists(output_path):
raise ValueError(f"فایل خروجی {output_path} تولید نشد!")
return output_path
except subprocess.CalledProcessError as e:
logger.error(f"FFmpeg error: {e.stderr}")
raise HTTPException(status_code=500, detail=f"خطا در فشرده‌سازی: {e.stderr}")
except Exception as e:
logger.error(f"Error in compression: {str(e)}")
raise HTTPException(status_code=500, detail=f"خطا: {str(e)}")
@app.get("/")
async def root():
return {"message": "Welcome to Video Compression API. Use POST /compress for video processing."}
@app.post("/compress")
async def compress_video_api(
file: UploadFile = File(...),
crf: int = Form(default=23, ge=10, le=28),
resolution: str = Form(default="original")
):
logger.info("Received API request for video compression")
input_path = None
output_path = None
try:
# ذخیره فایل ورودی به صورت موقت
input_path = f"temp_{uuid.uuid4().hex}.mp4"
with open(input_path, "wb") as f:
f.write(await file.read())
# فشرده‌سازی ویدیو
output_path = compress_video(input_path, crf, resolution)
# ارسال فایل خروجی
logger.info("Video processed successfully")
return FileResponse(
output_path,
media_type="video/mp4",
headers={"Content-Disposition": f"attachment; filename={os.path.basename(output_path)}"}
)
except Exception as e:
logger.error(f"API error: {str(e)}")
raise HTTPException(status_code=500, detail=f"خطا در پردازش: {str(e)}")
finally:
# فقط فایل ورودی رو حذف کن
if input_path and os.path.exists(input_path):
os.remove(input_path)
if __name__ == "__main__":
import uvicorn
logger.info("Starting Uvicorn server")
uvicorn.run(app, host="0.0.0.0", port=7860)