| import os |
| import json |
| import uuid |
| from typing import Dict, List, Optional, Any, Tuple |
| from pathlib import Path |
| from datetime import datetime |
| from concurrent.futures import ThreadPoolExecutor |
| import asyncio |
|
|
| from moviepy import VideoFileClip, TextClip, CompositeVideoClip |
| |
| |
| import numpy as np |
|
|
| from core.version_manager import VersionManager, VideoVersion, OriginalVideo, ProcessingType, VersionStatus |
| from schemas import VideoFormat, Dimensions, TranscriptConfig |
|
|
| class AdvancedVideoProcessor: |
| def __init__(self, version_manager: VersionManager): |
| self.version_manager = version_manager |
| self.executor = ThreadPoolExecutor(max_workers=4) |
| |
| async def process_video_with_versioning( |
| self, |
| original_id: str, |
| processing_type: ProcessingType, |
| processing_config: Dict[str, Any], |
| version_name: Optional[str] = None, |
| parent_version_id: Optional[str] = None |
| ) -> str: |
| """ |
| معالجة الفيديو مع إدارة النسخ المتقدمة |
| """ |
| try: |
| |
| if parent_version_id: |
| |
| parent_version = self.version_manager.get_version(parent_version_id) |
| if not parent_version: |
| raise ValueError(f"Parent version {parent_version_id} not found") |
| source_path = parent_version.file_path |
| base_original_id = parent_version.original_id |
| else: |
| |
| source_path = self.version_manager.get_original_path(original_id) |
| if not source_path: |
| raise ValueError(f"Original video {original_id} not found") |
| base_original_id = original_id |
|
|
| |
| version_id = self.version_manager.create_version( |
| original_id=base_original_id, |
| processing_type=processing_type, |
| version_name=version_name, |
| parent_version=parent_version_id, |
| processing_config=processing_config |
| ) |
|
|
| |
| version_info = self.version_manager.get_version(version_id) |
| if not version_info: |
| raise ValueError(f"Failed to retrieve version info for {version_id}") |
|
|
| |
| future = self.executor.submit( |
| self._process_video_sync, |
| source_path, |
| version_info.file_path, |
| processing_type, |
| processing_config, |
| version_id |
| ) |
|
|
| |
| await asyncio.sleep(0.1) |
| return version_id |
|
|
| except Exception as e: |
| raise RuntimeError(f"Failed to start video processing: {str(e)}") |
|
|
| def _process_video_sync( |
| self, |
| source_path: str, |
| output_path: str, |
| processing_type: ProcessingType, |
| processing_config: Dict[str, Any], |
| version_id: str |
| ): |
| """ |
| معالجة الفيديو المتزامنة (تشغيل في ThreadPool) |
| """ |
| try: |
| print(f"🎬 Starting {processing_type.value} processing for version {version_id}") |
| |
| |
| self.version_manager.update_version_status(version_id, VersionStatus.PROCESSING) |
|
|
| |
| if not os.path.exists(source_path): |
| raise FileNotFoundError(f"Source video not found: {source_path}") |
|
|
| |
| if processing_type == ProcessingType.TRANSCRIPT: |
| self._add_transcript_to_video( |
| source_path, output_path, processing_config["transcript_config"] |
| ) |
| elif processing_type == ProcessingType.CROP: |
| self._crop_video( |
| source_path, output_path, processing_config["crop_config"] |
| ) |
| elif processing_type == ProcessingType.EFFECTS: |
| self._apply_effects_to_video( |
| source_path, output_path, processing_config["effects_config"] |
| ) |
| elif processing_type == ProcessingType.AUDIO: |
| self._process_audio_in_video( |
| source_path, output_path, processing_config["audio_config"] |
| ) |
| elif processing_type == ProcessingType.COMBINED: |
| self._combined_processing( |
| source_path, output_path, processing_config |
| ) |
| else: |
| raise ValueError(f"Unsupported processing type: {processing_type}") |
|
|
| |
| self.version_manager.update_version_status(version_id, VersionStatus.COMPLETED) |
| print(f"✅ Processing completed for version {version_id}") |
|
|
| except Exception as e: |
| print(f"❌ Processing failed for version {version_id}: {str(e)}") |
| |
| self.version_manager.update_version_status(version_id, VersionStatus.FAILED) |
| |
| raise |
|
|
| def _add_transcript_to_video(self, source_path: str, output_path: str, transcript_config: Dict[str, Any]): |
| """إضافة ترانسكريبت إلى الفيديو""" |
| try: |
| print("📝 Adding transcript to video...") |
| |
| |
| video = VideoFileClip(source_path) |
| |
| |
| font_size = transcript_config.get("font_size", 24) |
| font_color = transcript_config.get("font_color", "white") |
| font_family = transcript_config.get("font_family", "Arial") |
| |
| |
| bg_color = transcript_config.get("background_color", "black") |
| bg_alpha = transcript_config.get("background_alpha", 0.8) |
| |
| |
| position = transcript_config.get("position", "bottom") |
| margin = transcript_config.get("margin", 20) |
| |
| |
| shadow = transcript_config.get("shadow", True) |
| outline = transcript_config.get("outline", True) |
| opacity = transcript_config.get("opacity", 1.0) |
| |
| |
| text_clips = [] |
| segments = transcript_config.get("segments", []) |
| |
| for segment in segments: |
| start_time = segment.get("start", 0) |
| end_time = segment.get("end", video.duration) |
| text = segment.get("text", "") |
| segment_position = segment.get("position", position) |
| segment_font_size = segment.get("font_size", font_size) |
| segment_font_color = segment.get("font_color", font_color) |
| |
| if not text: |
| continue |
| |
| |
| txt_clip = TextClip( |
| text, |
| fontsize=segment_font_size, |
| color=segment_font_color, |
| font=font_family, |
| stroke_color="black" if outline else None, |
| stroke_width=2 if outline else 0 |
| ) |
| |
| |
| txt_clip = txt_clip.set_duration(end_time - start_time) |
| txt_clip = txt_clip.set_start(start_time) |
| |
| |
| if segment_position == "top": |
| txt_clip = txt_clip.set_position(("center", margin)) |
| elif segment_position == "center": |
| txt_clip = txt_clip.set_position("center") |
| else: |
| txt_clip = txt_clip.set_position(("center", video.h - margin - segment_font_size)) |
| |
| |
| if opacity < 1.0: |
| txt_clip = txt_clip.set_opacity(opacity) |
| |
| text_clips.append(txt_clip) |
| |
| |
| if bg_alpha > 0 and text_clips: |
| for i, txt_clip in enumerate(text_clips): |
| |
| bg_clip = TextClip( |
| " " * 50, |
| fontsize=font_size, |
| color=bg_color, |
| bg_color=bg_color, |
| font=font_family |
| ) |
| bg_clip = bg_clip.set_duration(txt_clip.duration) |
| bg_clip = bg_clip.set_start(txt_clip.start) |
| bg_clip = bg_clip.set_position(txt_clip.pos) |
| bg_clip = bg_clip.set_opacity(bg_alpha) |
| |
| |
| text_clips[i] = CompositeVideoClip([bg_clip, txt_clip]) |
| |
| |
| if text_clips: |
| final_video = CompositeVideoClip([video] + text_clips) |
| else: |
| final_video = video |
| |
| |
| final_video.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| temp_audiofile="temp-audio.m4a", |
| remove_temp=True, |
| logger=None |
| ) |
| |
| |
| video.close() |
| if text_clips: |
| for clip in text_clips: |
| clip.close() |
| if 'final_video' in locals(): |
| final_video.close() |
| |
| print("✅ Transcript added successfully") |
| |
| except Exception as e: |
| print(f"❌ Error adding transcript: {str(e)}") |
| raise |
|
|
| def _crop_video(self, source_path: str, output_path: str, crop_config: Dict[str, Any]): |
| """قص الفيديو""" |
| try: |
| print("✂️ Cropping video...") |
| |
| video = VideoFileClip(source_path) |
| |
| if crop_config.get("center_crop", False): |
| |
| target_width = crop_config.get("width", video.w) |
| target_height = crop_config.get("height", video.h) |
| aspect_ratio = crop_config.get("aspect_ratio") |
| |
| if aspect_ratio: |
| |
| if aspect_ratio == "9:16": |
| target_width = min(video.w, video.h * 9 / 16) |
| target_height = min(video.h, video.w * 16 / 9) |
| elif aspect_ratio == "1:1": |
| size = min(video.w, video.h) |
| target_width = size |
| target_height = size |
| elif aspect_ratio == "16:9": |
| target_width = video.w |
| target_height = video.w * 9 / 16 |
| |
| |
| x_center = video.w / 2 |
| y_center = video.h / 2 |
| x1 = max(0, x_center - target_width / 2) |
| y1 = max(0, y_center - target_height / 2) |
| x2 = min(video.w, x_center + target_width / 2) |
| y2 = min(video.h, y_center + target_height / 2) |
| |
| else: |
| |
| x1 = crop_config.get("x1", 0) |
| y1 = crop_config.get("y1", 0) |
| x2 = crop_config.get("x2", video.w) |
| y2 = crop_config.get("y2", video.h) |
| target_width = crop_config.get("width", x2 - x1) |
| target_height = crop_config.get("height", y2 - y1) |
| |
| |
| cropped_video = video.crop(x1=x1, y1=y1, x2=x2, y2=y2) |
| |
| |
| if cropped_video.w != target_width or cropped_video.h != target_height: |
| cropped_video = cropped_video.resize((target_width, target_height)) |
| |
| |
| cropped_video.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| temp_audiofile="temp-audio.m4a", |
| remove_temp=True, |
| logger=None |
| ) |
| |
| |
| video.close() |
| cropped_video.close() |
| |
| print("✅ Video cropped successfully") |
| |
| except Exception as e: |
| print(f"❌ Error cropping video: {str(e)}") |
| raise |
|
|
| def _apply_effects_to_video(self, source_path: str, output_path: str, effects_config: Dict[str, Any]): |
| """تطبيق تأثيرات على الفيديو""" |
| try: |
| print("🎨 Applying effects to video...") |
| |
| video = VideoFileClip(source_path) |
| |
| |
| if "brightness" in effects_config: |
| video = video.fx(vfx.colorx, effects_config["brightness"]) |
| |
| if "contrast" in effects_config: |
| video = video.fx(vfx.lum_contrast, contrast=effects_config["contrast"]) |
| |
| if "saturation" in effects_config: |
| |
| saturation_factor = effects_config["saturation"] |
| if saturation_factor != 1.0: |
| |
| def saturation_effect(frame): |
| |
| import cv2 |
| hsv = cv2.cvtColor(frame, cv2.COLOR_RGB2HSV).astype("float32") |
| hsv[:, :, 1] = hsv[:, :, 1] * saturation_factor |
| hsv[:, :, 1] = np.clip(hsv[:, :, 1], 0, 255) |
| result = cv2.cvtColor(hsv.astype("uint8"), cv2.COLOR_HSV2RGB) |
| return result |
| |
| video = video.fl_image(saturation_effect) |
| |
| |
| if effects_config.get("sepia", False): |
| video = video.fx(vfx.sepia) |
| |
| if effects_config.get("black_white", False): |
| video = video.fx(vfx.blackwhite) |
| |
| if effects_config.get("vintage", False): |
| |
| video = video.fx(vfx.colorx, 0.8) |
| video = video.fx(vfx.gamma_corr, 0.8) |
| |
| if effects_config.get("vignette", 0) > 0: |
| strength = effects_config["vignette"] |
| video = video.fx(vfx.vignette, intensity=strength) |
| |
| if effects_config.get("blur", 0) > 0: |
| strength = effects_config["blur"] |
| video = video.fx(vfx.blur, strength) |
| |
| if effects_config.get("noise", 0) > 0: |
| strength = effects_config["noise"] |
| |
| def noise_effect(frame): |
| noise = np.random.normal(0, strength * 25, frame.shape).astype(np.uint8) |
| result = cv2.add(frame.astype(np.uint8), noise) |
| return result |
| |
| video = video.fl_image(noise_effect) |
| |
| |
| if "fade_in" in effects_config: |
| duration = effects_config["fade_in"] |
| video = video.fx(vfx.fadein, duration) |
| |
| if "fade_out" in effects_config: |
| duration = effects_config["fade_out"] |
| video = video.fx(vfx.fadeout, duration) |
| |
| |
| video.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| temp_audiofile="temp-audio.m4a", |
| remove_temp=True, |
| logger=None |
| ) |
| |
| |
| video.close() |
| |
| print("✅ Effects applied successfully") |
| |
| except Exception as e: |
| print(f"❌ Error applying effects: {str(e)}") |
| raise |
|
|
| def _process_audio_in_video(self, source_path: str, output_path: str, audio_config: Dict[str, Any]): |
| """معالجة الصوت في الفيديو""" |
| try: |
| print("🔊 Processing audio in video...") |
| |
| video = VideoFileClip(source_path) |
| audio = video.audio |
| |
| if audio is None: |
| print("⚠️ No audio track found, creating silent audio") |
| |
| import numpy as np |
| duration = video.duration |
| fps = 44100 |
| silent_audio = np.zeros(int(duration * fps)) |
| from moviepy import AudioArrayClip |
| audio = AudioArrayClip(silent_audio, fps=fps) |
| |
| |
| |
| if "volume" in audio_config: |
| volume_factor = audio_config["volume"] |
| audio = audio.fx(volumex, volume_factor) |
| |
| if audio_config.get("normalize", False): |
| |
| audio = audio.fx(afx.normalize) |
| |
| if audio_config.get("remove_noise", False): |
| |
| audio = audio.fx(afx.audio_fadein, 0.1) |
| audio = audio.fx(afx.audio_fadeout, 0.1) |
| |
| if "bass_boost" in audio_config: |
| bass_factor = audio_config["bass_boost"] |
| |
| def bass_boost(audio_clip): |
| def bass_boost_frame(frame): |
| |
| return frame * (1 + bass_factor * 0.5) |
| return audio_clip.fl(bass_boost_frame) |
| |
| audio = bass_boost(audio) |
| |
| if "treble_boost" in audio_config: |
| treble_factor = audio_config["treble_boost"] |
| |
| def treble_boost(audio_clip): |
| def treble_boost_frame(frame): |
| |
| return frame * (1 + treble_factor * 0.3) |
| return audio_clip.fl(treble_boost_frame) |
| |
| audio = treble_boost(audio) |
| |
| if "speed" in audio_config: |
| speed_factor = audio_config["speed"] |
| |
| audio = audio.fx(afx.speedx, speed_factor) |
| |
| if "pitch_shift" in audio_config: |
| pitch_shift = audio_config["pitch_shift"] |
| |
| audio = audio.fx(afx.speedx, 1.0) |
| |
| if "fade_in" in audio_config: |
| duration = audio_config["fade_in"] |
| audio = audio.fx(afx.audio_fadein, duration) |
| |
| if "fade_out" in audio_config: |
| duration = audio_config["fade_out"] |
| audio = audio.fx(afx.audio_fadeout, duration) |
| |
| |
| final_video = video.set_audio(audio) |
| |
| |
| final_video.write_videofile( |
| output_path, |
| codec="libx264", |
| audio_codec="aac", |
| temp_audiofile="temp-audio.m4a", |
| remove_temp=True, |
| logger=None |
| ) |
| |
| |
| video.close() |
| audio.close() |
| final_video.close() |
| |
| print("✅ Audio processing completed") |
| |
| except Exception as e: |
| print(f"❌ Error processing audio: {str(e)}") |
| raise |
|
|
| def _combined_processing(self, source_path: str, output_path: str, processing_config: Dict[str, Any]): |
| """معالجة مركبة متعددة""" |
| try: |
| print("🔄 Starting combined processing...") |
| |
| |
| temp_files = [] |
| current_path = source_path |
| |
| |
| if "crop_config" in processing_config: |
| print("📐 Step 1: Cropping...") |
| temp_crop = output_path.replace(".mp4", "_crop_temp.mp4") |
| temp_files.append(temp_crop) |
| self._crop_video(current_path, temp_crop, processing_config["crop_config"]) |
| current_path = temp_crop |
| |
| |
| if "effects_config" in processing_config: |
| print("🎨 Step 2: Applying effects...") |
| temp_effects = output_path.replace(".mp4", "_effects_temp.mp4") |
| temp_files.append(temp_effects) |
| self._apply_effects_to_video(current_path, temp_effects, processing_config["effects_config"]) |
| current_path = temp_effects |
| |
| |
| if "transcript_config" in processing_config: |
| print("📝 Step 3: Adding transcript...") |
| temp_transcript = output_path.replace(".mp4", "_transcript_temp.mp4") |
| temp_files.append(temp_transcript) |
| self._add_transcript_to_video(current_path, temp_transcript, processing_config["transcript_config"]) |
| current_path = temp_transcript |
| |
| |
| if "audio_config" in processing_config: |
| print("🔊 Step 4: Processing audio...") |
| temp_audio = output_path.replace(".mp4", "_audio_temp.mp4") |
| temp_files.append(temp_audio) |
| self._process_audio_in_video(current_path, temp_audio, processing_config["audio_config"]) |
| current_path = temp_audio |
| |
| |
| if current_path != output_path: |
| import shutil |
| shutil.copy2(current_path, output_path) |
| |
| |
| for temp_file in temp_files: |
| if os.path.exists(temp_file) and temp_file != output_path: |
| os.remove(temp_file) |
| |
| print("✅ Combined processing completed") |
| |
| except Exception as e: |
| print(f"❌ Error in combined processing: {str(e)}") |
| |
| temp_files = [f for f in temp_files if os.path.exists(f) and f != output_path] |
| for temp_file in temp_files: |
| try: |
| os.remove(temp_file) |
| except: |
| pass |
| raise |
|
|
| def get_original_info(self, original_id: str) -> Optional[Dict[str, Any]]: |
| """الحصول على معلومات الفيديو الأصلي""" |
| try: |
| original_data = self.version_manager.registry["originals"].get(original_id) |
| if not original_data: |
| return None |
| |
| return { |
| "original_id": original_id, |
| "file_name": original_data["file_name"], |
| "file_size": original_data["file_size"], |
| "duration": original_data["duration"], |
| "resolution": original_data["resolution"], |
| "upload_date": original_data["upload_date"], |
| "metadata": original_data.get("metadata", {}) |
| } |
| except Exception as e: |
| print(f"❌ Error getting original info: {str(e)}") |
| return None |
|
|
| def get_version_info(self, version_id: str) -> Optional[Dict[str, Any]]: |
| """الحصول على معلومات نسخة معينة""" |
| try: |
| version = self.version_manager.get_version(version_id) |
| if not version: |
| return None |
| |
| return { |
| "version_id": version_id, |
| "version_name": version.version_name, |
| "original_id": version.original_id, |
| "processing_type": version.processing_type.value, |
| "status": version.status.value, |
| "file_size": version.file_size, |
| "duration": version.duration, |
| "resolution": version.resolution, |
| "created_at": version.created_at, |
| "parent_version": version.parent_version, |
| "file_path": version.file_path |
| } |
| except Exception as e: |
| print(f"❌ Error getting version info: {str(e)}") |
| return None |