Clipping / routers /video_versions.py
aliSaac510's picture
virsions and trnscript
ce16ace
from fastapi import APIRouter, File, UploadFile, Form, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse, FileResponse
from typing import List, Optional, Dict, Any
import os
import uuid
import shutil
from datetime import datetime
import json
from core.version_manager import VersionManager, ProcessingType, VersionStatus
from schemas import VideoFormat, Timestamp, TranscriptConfig, CropConfig, EffectsConfig
# إنشاء الموجه
router = APIRouter(tags=["Video Version Management"])
# تهيئة المدير فقط
version_manager = VersionManager()
# مجلدات مؤقتة
UPLOAD_DIR = "temp_uploads"
PROCESSED_DIR = "processed_videos"
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
# ============================================
# نقاط نهاية إدارة الفيديو الأصلي
# ============================================
@router.post("/upload-original")
async def upload_original_video(
background_tasks: BackgroundTasks,
video_file: UploadFile = File(...),
metadata: Optional[str] = Form(None)
) -> Dict[str, Any]:
"""
رفع الفيديو الأصلي (محمي من التعديل)
"""
# التحقق من نوع الملف
if not video_file.content_type or not video_file.content_type.startswith('video/'):
raise HTTPException(status_code=400, detail="File must be a video")
# إنشاء اسم فريد للملف
file_extension = os.path.splitext(video_file.filename)[1] if video_file.filename else ".mp4"
temp_filename = f"original_{uuid.uuid4()}{file_extension}"
temp_path = os.path.join(UPLOAD_DIR, temp_filename)
# حفظ الملف مؤقتاً
try:
with open(temp_path, "wb") as buffer:
shutil.copyfileobj(video_file.file, buffer)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error saving file: {str(e)}")
# تحليل البيانات الوصفية
metadata_dict = {}
if metadata:
try:
metadata_dict = json.loads(metadata)
except json.JSONDecodeError:
pass
# تسجيل الفيديو الأصلي
try:
original_id = version_manager.register_original(
source_path=temp_path,
file_name=video_file.filename or "unknown_video.mp4",
metadata=metadata_dict
)
# تنظيف الملف المؤقت
background_tasks.add_task(os.remove, temp_path)
return {
"original_id": original_id,
"message": "Original video uploaded successfully",
"status": "protected"
}
except Exception as e:
# تنظيف في حالة الخطأ
if os.path.exists(temp_path):
os.remove(temp_path)
raise HTTPException(status_code=500, detail=f"Error registering video: {str(e)}")
@router.get("/originals")
async def list_original_videos() -> Dict[str, Any]:
"""قائمة بجميع الفيديوهات الأصلية"""
originals = []
for original_id in version_manager.registry["originals"]:
original_data = version_manager.registry["originals"][original_id]
versions_count = len([
v for v in version_manager.registry["versions"].values()
if v["original_id"] == original_id
])
originals.append({
"original_id": original_id,
"file_name": original_data["file_name"],
"file_size": original_data["file_size"],
"duration": original_data["duration"],
"resolution": original_data["resolution"],
"upload_date": original_data["upload_date"],
"versions_count": versions_count
})
return {
"originals": originals,
"total_count": len(originals)
}
@router.get("/originals/{original_id}")
async def get_original_details(original_id: str) -> Dict[str, Any]:
"""الحصول على تفاصيل الفيديو الأصلي"""
original_info = version_manager.get_original(original_id)
if not original_info:
raise HTTPException(status_code=404, detail="Original video not found")
return original_info
@router.get("/originals/{original_id}/download")
async def download_original(original_id: str):
"""تحميل الفيديو الأصلي (للقراءة فقط)"""
original_path = version_manager.get_original_path(original_id)
if not original_path or not os.path.exists(original_path):
raise HTTPException(status_code=404, detail="Original video not found")
original_data = version_manager.registry["originals"][original_id]
return FileResponse(
original_path,
media_type="video/mp4",
filename=original_data["file_name"]
)
# ============================================
# نقاط نهاية معالجة الفيديو
# ============================================
@router.post("/{original_id}/process")
async def process_video(
original_id: str,
processing_type: str = Form(...), # transcript, crop, effects, audio, combined
version_name: Optional[str] = Form(None),
transcript_config: Optional[str] = Form(None),
crop_config: Optional[str] = Form(None),
effects_config: Optional[str] = Form(None),
audio_config: Optional[str] = Form(None)
) -> Dict[str, Any]:
"""
معالجة الفيديو بنوع معين وإنشاء نسخة جديدة
أنواع المعالجة:
- transcript: إضافة ترانسكريبت
- crop: قص الفيديو
- effects: تطبيق تأثيرات
- audio: معالجة الصوت
- combined: معالجة مركبة متعددة
"""
# التحقق من وجود الفيديو الأصلي
if original_id not in version_manager.registry["originals"]:
raise HTTPException(status_code=404, detail="Original video not found")
# بناء إعدادات المعالجة
processing_config = {}
try:
if processing_type == "transcript" and transcript_config:
processing_config["transcript_config"] = json.loads(transcript_config)
processing_type_enum = ProcessingType.TRANSCRIPT
elif processing_type == "crop" and crop_config:
processing_config["crop_config"] = json.loads(crop_config)
processing_type_enum = ProcessingType.CROP
elif processing_type == "effects" and effects_config:
processing_config["effects_config"] = json.loads(effects_config)
processing_type_enum = ProcessingType.EFFECTS
elif processing_type == "audio" and audio_config:
processing_config["audio_config"] = json.loads(audio_config)
processing_type_enum = ProcessingType.AUDIO
elif processing_type == "combined":
if transcript_config:
processing_config["transcript_config"] = json.loads(transcript_config)
if crop_config:
processing_config["crop_config"] = json.loads(crop_config)
if effects_config:
processing_config["effects_config"] = json.loads(effects_config)
if audio_config:
processing_config["audio_config"] = json.loads(audio_config)
if not processing_config:
raise HTTPException(status_code=400, detail="At least one config must be provided for combined processing")
processing_type_enum = ProcessingType.COMBINED
else:
raise HTTPException(status_code=400, detail=f"Invalid processing type: {processing_type}")
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON config: {str(e)}")
try:
# إنشاء نسخة جديدة فقط (بدون معالجة فعلية)
version_id = version_manager.create_version(
original_id=original_id,
processing_type=processing_type_enum,
version_name=version_name or f"{processing_type}_version",
processing_config=processing_config
)
# تحديث الحالة إلى مكتمل مؤقتاً
version_manager.update_version_status(version_id, VersionStatus.COMPLETED)
return {
"version_id": version_id,
"processing_type": processing_type,
"status": "completed",
"message": f"{processing_type} version created successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Processing error: {str(e)}")
# ============================================
# نقاط نهاية إدارة النسخ
# ============================================
@router.get("/versions/{original_id}")
async def list_video_versions(original_id: str) -> Dict[str, Any]:
"""قائمة بجميع النسخ المرتبطة بفيديو أصلي"""
if original_id not in version_manager.registry["originals"]:
raise HTTPException(status_code=404, detail="Original video not found")
versions = version_manager.get_all_versions(original_id)
version_list = []
for version in versions:
version_list.append({
"version_id": version.version_id,
"version_name": version.version_name,
"processing_type": version.processing_type,
"status": version.status,
"file_size": version.file_size,
"duration": version.duration,
"resolution": version.resolution,
"created_at": version.created_at,
"parent_version": version.parent_version
})
return {
"original_id": original_id,
"versions": version_list,
"total_count": len(version_list)
}
@router.get("/versions/details/{version_id}")
async def get_version_details(version_id: str) -> Dict[str, Any]:
"""الحصول على تفاصيل نسخة معينة"""
version_info = version_manager.get_version(version_id)
if not version_info:
raise HTTPException(status_code=404, detail="Version not found")
return version_info
@router.get("/versions/download/{version_id}")
async def download_version(version_id: str):
"""تحميل نسخة معالجة"""
version = version_manager.get_version(version_id)
if not version:
raise HTTPException(status_code=404, detail="Version not found")
if version.status != VersionStatus.COMPLETED:
raise HTTPException(status_code=400, detail="Version processing not completed")
if not os.path.exists(version.file_path):
raise HTTPException(status_code=404, detail="Version file not found")
return FileResponse(
version.file_path,
media_type="video/mp4",
filename=f"{version.version_name}.mp4"
)
@router.delete("/versions/{version_id}")
async def delete_version(version_id: str) -> Dict[str, Any]:
"""حذف نسخة معينة"""
success = version_manager.delete_version(version_id)
if not success:
raise HTTPException(status_code=404, detail="Version not found or could not be deleted")
return {
"version_id": version_id,
"message": "Version deleted successfully"
}
# ============================================
# نقاط نهاية الإحصائيات والمعلومات
# ============================================
@router.get("/stats")
async def get_system_stats() -> Dict[str, Any]:
"""الحصول على إحصائيات النظام"""
stats = version_manager.get_storage_stats()
return {
"storage": stats,
"system_status": "operational",
"timestamp": datetime.now().isoformat()
}
@router.get("/version-tree/{original_id}")
async def get_version_tree(original_id: str) -> Dict[str, Any]:
"""الحصول على شجرة النسخ لفيديو أصلي"""
if original_id not in version_manager.registry["originals"]:
raise HTTPException(status_code=404, detail="Original video not found")
version_tree = version_manager.get_version_tree(original_id)
return {
"original_id": original_id,
"version_tree": version_tree
}