Spaces:
Running
Running
| # === FILE: app.py (Video Agent - FIXED: نص ثابت على شاشة متحركة) === | |
| # | |
| # IMPORTANT: For proper text rendering, make sure Roboto-Bold.ttf font is installed | |
| # Install with: apt-get install -y fonts-roboto | |
| # Or download from: https://fonts.google.com/specimen/Roboto | |
| # | |
| # If font is not available, the system will fall back to Arial-Bold | |
| # | |
| # ✅ UPDATED: Video output dimensions set to YouTube Shorts (1080x1920 - 9:16 aspect ratio) | |
| # ✅ UPDATED: Smart background color extraction from image for letterboxing | |
| import os | |
| import io | |
| import json | |
| import base64 | |
| import logging | |
| import random | |
| from typing import Optional, Dict, Any, Tuple, List | |
| from datetime import datetime | |
| import tempfile | |
| from collections import Counter | |
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| # Fix for Pillow 10.0.0+ compatibility with MoviePy | |
| if not hasattr(Image, 'ANTIALIAS'): | |
| Image.ANTIALIAS = Image.LANCZOS | |
| # استيراد مكتبات معالجة الصوت والفيديو | |
| from kokoro_engine import KokoroEngine | |
| KOKORO_AVAILABLE = True | |
| try: | |
| from moviepy.editor import ImageClip, AudioFileClip, CompositeVideoClip, TextClip | |
| MOVIEPY_AVAILABLE = True | |
| except ImportError: | |
| MOVIEPY_AVAILABLE = False | |
| logging.warning("⚠️ MoviePy not available. Install with: pip install moviepy") | |
| # ---------------- Logging Setup ---------------- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") | |
| log = logging.getLogger("video_agent") | |
| # ---------------- Environment Variables ---------------- | |
| VIDEO_HISTORY_DIR = os.getenv("VIDEO_HISTORY_DIR", "video_history") | |
| MAX_HISTORY_COUNT = int(os.getenv("MAX_HISTORY_COUNT", "10")) | |
| VOICE_STATE_FILE = os.getenv("VOICE_STATE_FILE", "voice_rotation_state.json") | |
| # ---------------- YouTube Shorts Dimensions ---------------- | |
| YOUTUBE_SHORTS_WIDTH = 1080 | |
| YOUTUBE_SHORTS_HEIGHT = 1920 | |
| # ---------------- Kokoro Voices List ---------------- | |
| KOKORO_VOICES = [ | |
| # British Male | |
| "bm_daniel", | |
| "bm_fable", | |
| "bm_george", | |
| "bm_lewis", | |
| # American Male | |
| "am_adam", | |
| "am_echo", | |
| "am_eric", | |
| "am_fenrir", | |
| "am_liam", | |
| "am_michael", | |
| "am_onyx", | |
| "am_puck", | |
| # British Female | |
| "bf_alice", | |
| "bf_emma", | |
| "bf_isabella", | |
| "bf_lily", | |
| # American Female | |
| "af_alloy", | |
| "af_aoede", | |
| "af_bella", | |
| "af_heart", | |
| "af_jessica", | |
| "af_kore", | |
| "af_nicole", | |
| "af_nova", | |
| "af_river", | |
| "af_sarah", | |
| "af_sky", | |
| ] | |
| # ---------------- Initialization Check ---------------- | |
| IS_SERVICE_READY = True | |
| # ---------------- Color Extraction Function ---------------- | |
| def get_dominant_color(image: Image.Image, sample_size: int = 100) -> Tuple[int, int, int]: | |
| """ | |
| استخراج اللون السائد من الصورة باستخدام تحليل الألوان الأكثر شيوعاً. | |
| Args: | |
| image: صورة PIL | |
| sample_size: حجم العينة لتسريع المعالجة | |
| Returns: | |
| Tuple من (R, G, B) للون السائد | |
| """ | |
| try: | |
| # تصغير الصورة لتسريع المعالجة | |
| img_small = image.copy() | |
| img_small.thumbnail((sample_size, sample_size)) | |
| # تحويل إلى RGB إذا لزم الأمر | |
| if img_small.mode != 'RGB': | |
| img_small = img_small.convert('RGB') | |
| # الحصول على جميع الألوان | |
| pixels = list(img_small.getdata()) | |
| # حساب اللون الأكثر شيوعاً | |
| color_counter = Counter(pixels) | |
| dominant_color = color_counter.most_common(1)[0][0] | |
| log.info(f"Dominant color extracted: RGB{dominant_color}") | |
| return dominant_color | |
| except Exception as e: | |
| log.warning(f"Failed to extract dominant color: {e}, using default (30, 30, 30)") | |
| return (30, 30, 30) # لون رمادي غامق كخيار احتياطي | |
| def get_edge_average_color(image: Image.Image, border_width: int = 50) -> Tuple[int, int, int]: | |
| """ | |
| استخراج متوسط اللون من حواف الصورة (أكثر دقة للخلفية). | |
| Args: | |
| image: صورة PIL | |
| border_width: عرض الحدود للعينة | |
| Returns: | |
| Tuple من (R, G, B) لمتوسط لون الحواف | |
| """ | |
| try: | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| width, height = image.size | |
| # استخراج عينات من الحواف | |
| edge_pixels = [] | |
| # الحافة العلوية | |
| for x in range(width): | |
| for y in range(min(border_width, height)): | |
| edge_pixels.append(image.getpixel((x, y))) | |
| # الحافة السفلية | |
| for x in range(width): | |
| for y in range(max(0, height - border_width), height): | |
| edge_pixels.append(image.getpixel((x, y))) | |
| # الحافة اليسرى | |
| for y in range(height): | |
| for x in range(min(border_width, width)): | |
| edge_pixels.append(image.getpixel((x, y))) | |
| # الحافة اليمنى | |
| for y in range(height): | |
| for x in range(max(0, width - border_width), width): | |
| edge_pixels.append(image.getpixel((x, y))) | |
| # حساب المتوسط | |
| if edge_pixels: | |
| avg_r = int(sum(p[0] for p in edge_pixels) / len(edge_pixels)) | |
| avg_g = int(sum(p[1] for p in edge_pixels) / len(edge_pixels)) | |
| avg_b = int(sum(p[2] for p in edge_pixels) / len(edge_pixels)) | |
| log.info(f"Edge average color: RGB({avg_r}, {avg_g}, {avg_b})") | |
| return (avg_r, avg_g, avg_b) | |
| else: | |
| return (30, 30, 30) | |
| except Exception as e: | |
| log.warning(f"Failed to extract edge color: {e}, using default") | |
| return (30, 30, 30) | |
| def prepare_image_for_shorts(image: Image.Image) -> Image.Image: | |
| """ | |
| تحضير الصورة لتناسب أبعاد YouTube Shorts (1080x1920) مع خلفية ملونة. | |
| Args: | |
| image: الصورة الأصلية | |
| Returns: | |
| صورة بأبعاد 1080x1920 مع خلفية ملونة مناسبة | |
| """ | |
| try: | |
| # تحويل إلى RGB | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # استخراج لون الخلفية المناسب (من حواف الصورة) | |
| bg_color = get_edge_average_color(image, border_width=30) | |
| # إنشاء canvas جديد بأبعاد YouTube Shorts | |
| canvas = Image.new('RGB', (YOUTUBE_SHORTS_WIDTH, YOUTUBE_SHORTS_HEIGHT), bg_color) | |
| # حساب نسبة القياس للصورة للحفاظ على النسب | |
| img_width, img_height = image.size | |
| target_ratio = YOUTUBE_SHORTS_WIDTH / YOUTUBE_SHORTS_HEIGHT | |
| img_ratio = img_width / img_height | |
| if img_ratio > target_ratio: | |
| # الصورة أعرض من النسبة المطلوبة | |
| new_width = YOUTUBE_SHORTS_WIDTH | |
| new_height = int(new_width / img_ratio) | |
| else: | |
| # الصورة أطول من النسبة المطلوبة | |
| new_height = YOUTUBE_SHORTS_HEIGHT | |
| new_width = int(new_height * img_ratio) | |
| # تغيير حجم الصورة | |
| resized_image = image.resize((new_width, new_height), Image.LANCZOS) | |
| # حساب موضع اللصق لتوسيط الصورة | |
| paste_x = (YOUTUBE_SHORTS_WIDTH - new_width) // 2 | |
| paste_y = (YOUTUBE_SHORTS_HEIGHT - new_height) // 2 | |
| # لصق الصورة على الـ canvas | |
| canvas.paste(resized_image, (paste_x, paste_y)) | |
| log.info(f"✅ Image prepared for YouTube Shorts: {YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT} with background color {bg_color}") | |
| return canvas | |
| except Exception as e: | |
| log.error(f"Failed to prepare image for Shorts: {e}") | |
| raise | |
| # ---------------- Voice Rotation Manager ---------------- | |
| class VoiceRotationManager: | |
| """إدارة تدوير الأصوات بشكل دوري""" | |
| def __init__(self, state_file: str, voices: List[str]): | |
| self.state_file = state_file | |
| self.voices = voices | |
| self.current_index = 0 | |
| self.load_state() | |
| def load_state(self): | |
| """تحميل حالة التدوير من الملف""" | |
| if os.path.exists(self.state_file): | |
| try: | |
| with open(self.state_file, "r") as f: | |
| data = json.load(f) | |
| self.current_index = data.get("current_voice_index", 0) | |
| # التأكد من أن المؤشر في النطاق الصحيح | |
| if self.current_index >= len(self.voices): | |
| self.current_index = 0 | |
| log.info(f"Voice rotation state loaded: index={self.current_index}") | |
| except Exception as e: | |
| log.warning(f"Could not load voice rotation state: {e}") | |
| self.current_index = 0 | |
| else: | |
| log.info("No voice rotation state file found, starting from index 0") | |
| def save_state(self): | |
| """حفظ حالة التدوير في الملف""" | |
| try: | |
| with open(self.state_file, "w") as f: | |
| json.dump({"current_voice_index": self.current_index}, f) | |
| log.info(f"Voice rotation state saved: index={self.current_index}") | |
| except Exception as e: | |
| log.error(f"Failed to save voice rotation state: {e}") | |
| def get_next_voice(self) -> str: | |
| """الحصول على الصوت التالي والانتقال للصوت الذي يليه""" | |
| voice = self.voices[self.current_index] | |
| log.info(f"Selected voice: {voice} (index: {self.current_index}/{len(self.voices)-1})") | |
| # الانتقال للصوت التالي | |
| self.current_index = (self.current_index + 1) % len(self.voices) | |
| self.save_state() | |
| return voice | |
| def get_current_voice(self) -> str: | |
| """الحصول على الصوت الحالي بدون تغيير المؤشر""" | |
| return self.voices[self.current_index] | |
| def reset(self): | |
| """إعادة تعيين التدوير إلى البداية""" | |
| self.current_index = 0 | |
| self.save_state() | |
| log.info("Voice rotation reset to index 0") | |
| # ---------------- Motion Effects Functions ---------------- | |
| def apply_zoom_in_effect(clip, duration): | |
| """تأثير التكبير التدريجي - من 100% إلى 120%""" | |
| w, h = clip.size | |
| def effect(gf, t): | |
| frame = gf(t) | |
| progress = min(t / duration, 1.0) | |
| zoom_factor = 1.0 + (progress * 0.2) | |
| new_w = int(w * zoom_factor) | |
| new_h = int(h * zoom_factor) | |
| from PIL import Image as PILImage | |
| img = PILImage.fromarray(frame.astype('uint8')) | |
| img_resized = img.resize((new_w, new_h), PILImage.LANCZOS) | |
| left = (new_w - w) // 2 | |
| top = (new_h - h) // 2 | |
| img_cropped = img_resized.crop((left, top, left + w, top + h)) | |
| return np.array(img_cropped) | |
| return clip.fl(effect) | |
| def apply_zoom_out_effect(clip, duration): | |
| """تأثير التصغير التدريجي - من 120% إلى 100%""" | |
| w, h = clip.size | |
| def effect(gf, t): | |
| frame = gf(t) | |
| progress = min(t / duration, 1.0) | |
| zoom_factor = 1.2 - (progress * 0.2) | |
| new_w = int(w * zoom_factor) | |
| new_h = int(h * zoom_factor) | |
| from PIL import Image as PILImage | |
| img = PILImage.fromarray(frame.astype('uint8')) | |
| img_resized = img.resize((new_w, new_h), PILImage.LANCZOS) | |
| left = (new_w - w) // 2 | |
| top = (new_h - h) // 2 | |
| img_cropped = img_resized.crop((left, top, left + w, top + h)) | |
| return np.array(img_cropped) | |
| return clip.fl(effect) | |
| def apply_pan_right_effect(clip, duration): | |
| """تأثير الانسحاب لليمين""" | |
| w, h = clip.size | |
| def effect(gf, t): | |
| frame = gf(t) | |
| progress = min(t / duration, 1.0) | |
| zoom_factor = 1.2 | |
| new_w = int(w * zoom_factor) | |
| new_h = int(h * zoom_factor) | |
| from PIL import Image as PILImage | |
| img = PILImage.fromarray(frame.astype('uint8')) | |
| img_resized = img.resize((new_w, new_h), PILImage.LANCZOS) | |
| max_offset = (new_w - w) // 2 | |
| left = int(max_offset * (1 - progress)) | |
| top = (new_h - h) // 2 | |
| img_cropped = img_resized.crop((left, top, left + w, top + h)) | |
| return np.array(img_cropped) | |
| return clip.fl(effect) | |
| def apply_pan_left_effect(clip, duration): | |
| """تأثير الانسحاب لليسار""" | |
| w, h = clip.size | |
| def effect(gf, t): | |
| frame = gf(t) | |
| progress = min(t / duration, 1.0) | |
| zoom_factor = 1.2 | |
| new_w = int(w * zoom_factor) | |
| new_h = int(h * zoom_factor) | |
| from PIL import Image as PILImage | |
| img = PILImage.fromarray(frame.astype('uint8')) | |
| img_resized = img.resize((new_w, new_h), PILImage.LANCZOS) | |
| max_offset = (new_w - w) // 2 | |
| left = int(max_offset * progress) | |
| top = (new_h - h) // 2 | |
| img_cropped = img_resized.crop((left, top, left + w, top + h)) | |
| return np.array(img_cropped) | |
| return clip.fl(effect) | |
| def apply_pan_down_effect(clip, duration): | |
| """تأثير الانسحاب للأسفل""" | |
| w, h = clip.size | |
| def effect(gf, t): | |
| frame = gf(t) | |
| progress = min(t / duration, 1.0) | |
| zoom_factor = 1.2 | |
| new_w = int(w * zoom_factor) | |
| new_h = int(h * zoom_factor) | |
| from PIL import Image as PILImage | |
| img = PILImage.fromarray(frame.astype('uint8')) | |
| img_resized = img.resize((new_w, new_h), PILImage.LANCZOS) | |
| max_offset = (new_h - h) // 2 | |
| left = (new_w - w) // 2 | |
| top = int(max_offset * (1 - progress)) | |
| img_cropped = img_resized.crop((left, top, left + w, top + h)) | |
| return np.array(img_cropped) | |
| return clip.fl(effect) | |
| def apply_pan_up_effect(clip, duration): | |
| """تأثير الانسحاب للأعلى""" | |
| w, h = clip.size | |
| def effect(gf, t): | |
| frame = gf(t) | |
| progress = min(t / duration, 1.0) | |
| zoom_factor = 1.2 | |
| new_w = int(w * zoom_factor) | |
| new_h = int(h * zoom_factor) | |
| from PIL import Image as PILImage | |
| img = PILImage.fromarray(frame.astype('uint8')) | |
| img_resized = img.resize((new_w, new_h), PILImage.LANCZOS) | |
| max_offset = (new_h - h) // 2 | |
| left = (new_w - w) // 2 | |
| top = int(max_offset * progress) | |
| img_cropped = img_resized.crop((left, top, left + w, top + h)) | |
| return np.array(img_cropped) | |
| return clip.fl(effect) | |
| def apply_ken_burns_effect(clip, duration): | |
| """تأثير Ken Burns - تكبير وحركة قطرية""" | |
| w, h = clip.size | |
| def effect(gf, t): | |
| frame = gf(t) | |
| progress = min(t / duration, 1.0) | |
| zoom_factor = 1.0 + (progress * 0.3) | |
| new_w = int(w * zoom_factor) | |
| new_h = int(h * zoom_factor) | |
| from PIL import Image as PILImage | |
| img = PILImage.fromarray(frame.astype('uint8')) | |
| img_resized = img.resize((new_w, new_h), PILImage.LANCZOS) | |
| max_offset_w = (new_w - w) // 2 | |
| max_offset_h = (new_h - h) // 2 | |
| left = int(max_offset_w * (1 - progress * 0.5)) | |
| top = int(max_offset_h * (1 - progress * 0.5)) | |
| img_cropped = img_resized.crop((left, top, left + w, top + h)) | |
| return np.array(img_cropped) | |
| return clip.fl(effect) | |
| def get_random_motion_effect(): | |
| """اختيار تأثير حركة عشوائي""" | |
| effects = [ | |
| ('zoom_in', apply_zoom_in_effect), | |
| ('zoom_out', apply_zoom_out_effect), | |
| ('pan_right', apply_pan_right_effect), | |
| ('pan_left', apply_pan_left_effect), | |
| ('pan_down', apply_pan_down_effect), | |
| ('pan_up', apply_pan_up_effect), | |
| ('ken_burns', apply_ken_burns_effect) | |
| ] | |
| effect_name, effect_func = random.choice(effects) | |
| log.info(f"Selected motion effect: {effect_name}") | |
| return effect_name, effect_func | |
| # ---------------- FIXED: Text Overlay Function ---------------- | |
| def create_text_overlay(text: str, video_size: Tuple[int, int], duration: float) -> Optional[ImageClip]: | |
| """ | |
| إنشاء طبقة نص ثابتة شفافة فوق الفيديو. | |
| النص يبقى في نفس الموضع بالنسبة للشاشة، بينما الصورة تتحرك خلفه. | |
| Args: | |
| text: النص المراد عرضه | |
| video_size: حجم الفيديو (width, height) | |
| duration: المدة الكلية | |
| Returns: | |
| ImageClip شفاف مع النص أو None | |
| """ | |
| # تنظيف النص من الرموز الخاصة | |
| clean_text = text.replace("...", "").replace("—", "-").strip() | |
| if not clean_text: | |
| log.warning("Empty text after cleaning, skipping text overlay") | |
| return None | |
| log.info(f"Creating fixed text overlay: '{clean_text[:50]}...'") | |
| try: | |
| width, height = video_size | |
| # إنشاء صورة شفافة بالكامل (RGBA) | |
| img = Image.new('RGBA', (width, height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| # حساب حجم الخط (محسّن لأبعاد YouTube Shorts) | |
| fontsize = int(width * 0.07) # زيادة حجم الخط قليلاً للشاشات العمودية | |
| stroke_width = max(3, int(fontsize / 18)) | |
| # تحميل الخط | |
| font = None | |
| for font_name in ["Roboto-Bold.ttf", "DejaVuSans-Bold.ttf", "Arial.ttf", "LiberationSans-Bold.ttf"]: | |
| try: | |
| font = ImageFont.truetype(font_name, fontsize) | |
| log.info(f"✅ Font loaded: {font_name}") | |
| break | |
| except: | |
| pass | |
| if not font: | |
| font = ImageFont.load_default() | |
| log.warning("⚠️ Using default font") | |
| # تقسيم النص إلى أسطر | |
| def wrap_text(text, font, max_width): | |
| lines = [] | |
| words = text.split() | |
| while words: | |
| line = '' | |
| while words and draw.textlength(line + words[0] + ' ', font=font) < max_width: | |
| line += (words.pop(0) + ' ') | |
| if not line and words: | |
| line = words.pop(0) | |
| lines.append(line.strip()) | |
| return lines | |
| wrapped_lines = wrap_text(clean_text, font, width * 0.9) | |
| # حساب الموضع الثابت في منتصف الشاشة | |
| line_height = fontsize + 12 | |
| total_height = len(wrapped_lines) * line_height | |
| # موضع ثابت في منتصف الشاشة | |
| start_y = (height - total_height) // 2 | |
| # رسم كل سطر في موضع ثابت | |
| current_y = start_y | |
| for line in wrapped_lines: | |
| # حساب عرض السطر | |
| bbox = draw.textbbox((0, 0), line, font=font) | |
| line_width = bbox[2] - bbox[0] | |
| line_x = (width - line_width) // 2 | |
| # رسم النص مع الحدود (موضع ثابت) | |
| draw.text( | |
| (line_x, current_y), | |
| line, | |
| font=font, | |
| fill=(255, 255, 255, 255), # أبيض بالكامل | |
| stroke_width=stroke_width, | |
| stroke_fill=(0, 0, 0, 255) # حدود سوداء | |
| ) | |
| current_y += line_height | |
| # تحويل إلى numpy array مع الحفاظ على الشفافية | |
| img_array = np.array(img) | |
| # إنشاء ImageClip من الصورة الشفافة | |
| text_clip = ImageClip(img_array, duration=duration, ismask=False, transparent=True) | |
| log.info(f"✅ Fixed text overlay created successfully (transparent layer)") | |
| return text_clip | |
| except Exception as e: | |
| log.error(f"❌ Failed to create text overlay: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| # ---------------- Video Agent Class ---------------- | |
| class VideoAgent: | |
| def __init__(self): | |
| self.log = logging.getLogger("video_agent") | |
| self.history = [] | |
| self._setup_history_dir() | |
| # تهيئة مدير تدوير الأصوات | |
| self.voice_manager = VoiceRotationManager(VOICE_STATE_FILE, KOKORO_VOICES) | |
| # تهيئة نموذج Kokoro TTS | |
| self.tts = None | |
| self.kokoro_available = False | |
| self.log.info("Initializing Kokoro TTS (ONNX - NeuML model)...") | |
| try: | |
| self.tts = KokoroEngine() | |
| self.kokoro_available = True | |
| self.log.info("✅ Kokoro-ONNX initialized successfully (NeuML model)") | |
| except Exception as e: | |
| self.tts = None | |
| self.kokoro_available = False | |
| self.log.error("❌ Kokoro TTS initialization failed") | |
| self.log.error(str(e)) | |
| import traceback | |
| traceback.print_exc() | |
| def _setup_history_dir(self): | |
| """إنشاء مجلد السجل إذا لم يكن موجوداً""" | |
| if not os.path.exists(VIDEO_HISTORY_DIR): | |
| os.makedirs(VIDEO_HISTORY_DIR) | |
| self.log.info(f"Created video history directory: {VIDEO_HISTORY_DIR}") | |
| def generate_audio(self, text: str, output_path: str, voice: Optional[str] = None, speed: float = 0.8) -> Tuple[bool, str]: | |
| """توليد ملف صوتي من النص باستخدام Kokoro TTS.""" | |
| if not self.kokoro_available or self.tts is None: | |
| self.log.error("Kokoro TTS not available.") | |
| return False, "" | |
| try: | |
| if voice is None: | |
| voice = self.voice_manager.get_next_voice() | |
| else: | |
| if voice not in KOKORO_VOICES: | |
| self.log.warning(f"Voice '{voice}' not found, using next voice from rotation") | |
| voice = self.voice_manager.get_next_voice() | |
| self.log.info(f"Generating audio with voice: {voice}, speed: {speed}") | |
| self.log.info(f"Text: {text[:50]}...") | |
| # تعيين الصوت | |
| self.tts.set_voice(voice) | |
| # توليد الصوت | |
| audio_data = self.tts.synthesize(text, speed=speed) | |
| # حفظ الملف | |
| import scipy.io.wavfile as wavfile | |
| sample_rate = 24000 | |
| # التأكد من أن البيانات في النطاق الصحيح لـ int16 | |
| audio_int16 = np.clip(audio_data * 32767, -32768, 32767).astype(np.int16) | |
| wavfile.write(output_path, sample_rate, audio_int16) | |
| self.log.info(f"✅ Audio generated successfully: {output_path}") | |
| return True, voice | |
| except Exception as e: | |
| self.log.error(f"Audio generation failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False, "" | |
| def create_video(self, image_bytes: bytes, audio_path: str, output_path: str, display_text: str) -> bool: | |
| """ | |
| إنشاء فيديو من صورة وملف صوتي مع نص ثابت على الشاشة. | |
| الفيديو الناتج بأبعاد YouTube Shorts (1080x1920). | |
| Args: | |
| image_bytes: بيانات الصورة | |
| audio_path: مسار الملف الصوتي | |
| output_path: مسار حفظ الفيديو | |
| display_text: النص المراد عرضه (ثابت على الشاشة) | |
| """ | |
| if not MOVIEPY_AVAILABLE: | |
| self.log.error("MoviePy not available.") | |
| return False | |
| audio = None | |
| video = None | |
| base_clip = None | |
| animated_clip = None | |
| text_clip = None | |
| try: | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| # ✅ تحضير الصورة لأبعاد YouTube Shorts مع خلفية ملونة | |
| self.log.info(f"Preparing image for YouTube Shorts ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT})...") | |
| shorts_image = prepare_image_for_shorts(image) | |
| img_array = np.array(shorts_image) | |
| if not os.path.exists(audio_path): | |
| raise FileNotFoundError(f"Audio file not found: {audio_path}") | |
| self.log.info(f"Loading audio from: {audio_path}") | |
| audio = AudioFileClip(audio_path) | |
| audio_duration = audio.duration | |
| self.log.info(f"Creating YouTube Shorts video ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT}) with duration: {audio_duration:.2f}s") | |
| # اختيار التأثير | |
| effect_name, effect_func = get_random_motion_effect() | |
| self.log.info(f"Applying effect: {effect_name}") | |
| # إنشاء clip أساسي | |
| base_clip = ImageClip(img_array, duration=audio_duration) | |
| # تطبيق التأثير على الصورة | |
| animated_clip = effect_func(base_clip, audio_duration) | |
| # إنشاء طبقة نص ثابتة شفافة | |
| self.log.info(f"Creating fixed text overlay for: '{display_text[:50]}...'") | |
| text_clip = create_text_overlay(display_text, animated_clip.size, audio_duration) | |
| if text_clip: | |
| # دمج الصورة المتحركة مع النص الثابت | |
| self.log.info("Compositing video: moving image + fixed text overlay...") | |
| video = CompositeVideoClip([animated_clip, text_clip]) | |
| self.log.info("✅ Text overlay composited successfully (fixed position)") | |
| else: | |
| self.log.warning("⚠️ Text overlay creation failed, using video without text") | |
| video = animated_clip | |
| # إضافة الصوت | |
| video = video.set_audio(audio) | |
| # كتابة الفيديو | |
| self.log.info(f"Writing YouTube Shorts video to: {output_path}") | |
| video.write_videofile( | |
| output_path, | |
| fps=24, | |
| codec='libx264', | |
| audio_codec='aac', | |
| temp_audiofile='temp-audio.m4a', | |
| remove_temp=True, | |
| verbose=False, | |
| logger=None, | |
| preset='ultrafast', | |
| threads=4 | |
| ) | |
| self.log.info(f"✅ YouTube Shorts video created successfully ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT}) with {effect_name} effect + fixed text: {output_path}") | |
| return True | |
| except Exception as e: | |
| self.log.error(f"Video creation failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| finally: | |
| # تنظيف الموارد | |
| try: | |
| if text_clip is not None: | |
| text_clip.close() | |
| self.log.debug("Text clip closed") | |
| except Exception as e: | |
| self.log.warning(f"Error closing text clip: {e}") | |
| try: | |
| if animated_clip is not None: | |
| animated_clip.close() | |
| self.log.debug("Animated clip closed") | |
| except Exception as e: | |
| self.log.warning(f"Error closing animated clip: {e}") | |
| try: | |
| if base_clip is not None: | |
| base_clip.close() | |
| self.log.debug("Base clip closed") | |
| except Exception as e: | |
| self.log.warning(f"Error closing base clip: {e}") | |
| try: | |
| if audio is not None: | |
| audio.close() | |
| self.log.debug("Audio closed") | |
| except Exception as e: | |
| self.log.warning(f"Error closing audio: {e}") | |
| try: | |
| if video is not None: | |
| video.close() | |
| self.log.debug("Video closed") | |
| except Exception as e: | |
| self.log.warning(f"Error closing video: {e}") | |
| def process_request_custom( | |
| self, | |
| image_base64: str, | |
| tts_text: str, | |
| voice: Optional[str] = None, | |
| speed: float = 0.8 | |
| ) -> Dict[str, Any]: | |
| """ | |
| معالجة طلب إنشاء فيديو مع نص صوتي مخصص. | |
| Args: | |
| image_base64: الصورة (بدون نص مكتوب عليها) | |
| tts_text: النص الصوتي المخصص من حقل tts_kokoro | |
| voice: الصوت (اختياري) | |
| speed: السرعة | |
| """ | |
| if not self.kokoro_available or self.tts is None: | |
| raise RuntimeError("Kokoro TTS not available") | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_text = tts_text[:30].replace(" ", "_").replace("/", "_").strip() | |
| temp_dir = tempfile.gettempdir() | |
| audio_path = os.path.join(temp_dir, f"audio_{timestamp}.wav") | |
| video_filename = f"{timestamp}_{safe_text}.mp4" | |
| video_path = os.path.join(VIDEO_HISTORY_DIR, video_filename) | |
| try: | |
| image_bytes = base64.b64decode(image_base64) | |
| # توليد الصوت | |
| success, voice_used = self.generate_audio(tts_text, audio_path, voice, speed) | |
| if not success: | |
| raise RuntimeError("Failed to generate audio.") | |
| # إنشاء الفيديو مع النص الثابت | |
| if not self.create_video(image_bytes, audio_path, video_path, tts_text): | |
| raise RuntimeError("Failed to create video.") | |
| with open(video_path, "rb") as f: | |
| video_bytes = f.read() | |
| video_base64 = base64.b64encode(video_bytes).decode('utf-8') | |
| entry = { | |
| "timestamp": timestamp, | |
| "tts_text": tts_text, | |
| "voice": voice_used, | |
| "video_path": video_path, | |
| "duration": self._get_video_duration(video_path) | |
| } | |
| self.history.insert(0, entry) | |
| self.history = self.history[:MAX_HISTORY_COUNT] | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| self.log.info(f"✅ Video processing completed: {video_filename}") | |
| return { | |
| "video_base64": video_base64, | |
| "video_path": video_path, | |
| "tts_text": tts_text, | |
| "voice_used": voice_used, | |
| "status": "success", | |
| "message": f"YouTube Shorts video ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT}) generated successfully with voice: {voice_used} at speed: {speed}" | |
| } | |
| except Exception as e: | |
| self.log.error(f"Video processing failed: {e}") | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| raise RuntimeError(f"Video generation failed: {str(e)}") | |
| def _get_video_duration(self, video_path: str) -> float: | |
| """الحصول على مدة الفيديو بالثواني""" | |
| try: | |
| if MOVIEPY_AVAILABLE: | |
| from moviepy.editor import VideoFileClip | |
| clip = VideoFileClip(video_path) | |
| duration = clip.duration | |
| clip.close() | |
| return duration | |
| except Exception as e: | |
| self.log.warning(f"Could not get video duration: {e}") | |
| return 0.0 | |
| def get_history(self) -> List[Dict[str, Any]]: | |
| """الحصول على سجل الفيديوهات""" | |
| return self.history | |
| def get_voice_rotation_info(self) -> Dict[str, Any]: | |
| """الحصول على معلومات حالة تدوير الأصوات""" | |
| return { | |
| "current_voice": self.voice_manager.get_current_voice(), | |
| "current_index": self.voice_manager.current_index, | |
| "total_voices": len(KOKORO_VOICES), | |
| "all_voices": KOKORO_VOICES | |
| } | |
| # ---------------- Global Agent Instance ---------------- | |
| agent = VideoAgent() | |
| if IS_SERVICE_READY: | |
| if agent.tts is not None: | |
| current_voice = agent.voice_manager.get_current_voice() | |
| STATUS_MESSAGE = f"✅ Video Agent ready | Format: YouTube Shorts ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT}) | Voice: {current_voice} | Speed: 0.8" | |
| else: | |
| STATUS_MESSAGE = "⚠️ Video Agent ready but Kokoro-ONNX failed to initialize." | |
| else: | |
| STATUS_MESSAGE = "❌ Service not ready" | |
| # ---------------- Gradio Functions ---------------- | |
| def gradio_generate_video_custom( | |
| image_input, | |
| tts_text: str, | |
| voice_override: str, | |
| speed: float | |
| ) -> Tuple[Optional[str], str, str]: | |
| """دالة Gradio للواجهة التفاعلية مع نص مخصص.""" | |
| if not IS_SERVICE_READY: | |
| return None, f"❌ Service not ready: {STATUS_MESSAGE}", "" | |
| if not image_input: | |
| return None, "❌ Please provide an image.", "" | |
| if not tts_text: | |
| return None, "❌ Please provide TTS text.", "" | |
| try: | |
| if isinstance(image_input, str): | |
| with open(image_input, "rb") as f: | |
| image_bytes = f.read() | |
| else: | |
| buffered = io.BytesIO() | |
| image_input.save(buffered, format="JPEG") | |
| image_bytes = buffered.getvalue() | |
| image_base64 = base64.b64encode(image_bytes).decode('utf-8') | |
| voice = None if voice_override == "Auto" or not voice_override else voice_override | |
| result = agent.process_request_custom( | |
| image_base64=image_base64, | |
| tts_text=tts_text, | |
| voice=voice, | |
| speed=speed | |
| ) | |
| video_path = result["video_path"] | |
| voice_used = result["voice_used"] | |
| voice_info = agent.get_voice_rotation_info() | |
| next_voice = voice_info["current_voice"] | |
| current_index = voice_info["current_index"] | |
| total_voices = voice_info["total_voices"] | |
| status_msg = ( | |
| f"✅ {result['message']}\n\n" | |
| f"**Format:** YouTube Shorts ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT})\n" | |
| f"**TTS Text:** {result['tts_text'][:100]}...\n" | |
| f"**Duration:** {result.get('duration', 0):.2f}s\n" | |
| f"**Effect:** Moving image + fixed text overlay" | |
| ) | |
| voice_rotation_msg = ( | |
| f"**Voice Used:** {voice_used}\n" | |
| f"**Next Voice:** {next_voice}\n" | |
| f"**Progress:** {current_index}/{total_voices}" | |
| ) | |
| return video_path, status_msg, voice_rotation_msg | |
| except Exception as e: | |
| log.error(f"Video generation failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, f"❌ Error: {str(e)}", "" | |
| def gradio_api_endpoint_custom( | |
| image_base64: str, | |
| tts_text: str, | |
| voice: Optional[str] = None, | |
| speed: float = 0.8 | |
| ) -> Dict[str, Any]: | |
| """نقطة النهاية للـ API مع نص مخصص (مع معاملات اختيارية)""" | |
| if not IS_SERVICE_READY: | |
| raise RuntimeError(f"Service not ready: {STATUS_MESSAGE}") | |
| log.info(f"API request received for TTS text: {tts_text[:50]}... with speed: {speed}") | |
| return agent.process_request_custom( | |
| image_base64=image_base64, | |
| tts_text=tts_text, | |
| voice=voice, | |
| speed=speed | |
| ) | |
| def gradio_api_simple( | |
| image_base64: str, | |
| tts_text: str | |
| ) -> Dict[str, Any]: | |
| """ | |
| نقطة نهاية API مبسطة للاتصال من وكيل النشر. | |
| تستقبل فقط الصورة والنص الصوتي مع استخدام القيم الافتراضية للصوت والسرعة. | |
| """ | |
| if not IS_SERVICE_READY: | |
| raise RuntimeError(f"Service not ready: {STATUS_MESSAGE}") | |
| log.info(f"🎬 Simple API request received for TTS text: {tts_text[:50]}...") | |
| return agent.process_request_custom( | |
| image_base64=image_base64, | |
| tts_text=tts_text, | |
| voice=None, # استخدام التدوير التلقائي | |
| speed=0.8 # السرعة الافتراضية | |
| ) | |
| def format_history_for_gallery() -> List[Tuple[str, str]]: | |
| """تنسيق السجل لعرضه في Gallery""" | |
| formatted = [] | |
| for entry in agent.get_history(): | |
| if entry.get("video_path") and os.path.exists(entry["video_path"]): | |
| caption = ( | |
| f'{entry["tts_text"][:80]}...\n' | |
| f'Voice: {entry.get("voice", "N/A")} | Duration: {entry.get("duration", 0):.1f}s' | |
| ) | |
| formatted.append((entry["video_path"], caption)) | |
| return formatted | |
| def gradio_refresh_history(): | |
| return format_history_for_gallery() | |
| def gradio_reset_voice_rotation(): | |
| agent.voice_manager.reset() | |
| voice_info = agent.get_voice_rotation_info() | |
| return f"✅ Voice rotation reset! Next voice: {voice_info['current_voice']}" | |
| def gradio_get_voice_info(): | |
| voice_info = agent.get_voice_rotation_info() | |
| return ( | |
| f"**Current Voice:** {voice_info['current_voice']}\n" | |
| f"**Index:** {voice_info['current_index']}/{voice_info['total_voices']}\n" | |
| f"**Total Voices:** {len(voice_info['all_voices'])}" | |
| ) | |
| # ---------------- Gradio Interface ---------------- | |
| with gr.Blocks(title="Video Agent - YouTube Shorts") as demo: | |
| gr.Markdown("# 🎬 Video Agent - YouTube Shorts Format (1080x1920)") | |
| gr.Markdown(f"**Status:** {STATUS_MESSAGE}") | |
| if IS_SERVICE_READY: | |
| gr.Markdown(f"**Features:** YouTube Shorts ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT}) | {len(KOKORO_VOICES)} voices | Auto rotation | Smart background color | Fixed text overlay") | |
| gr.Markdown("✅ **NEW:** *Videos optimized for YouTube Shorts with intelligent background filling*") | |
| gr.Markdown("---") | |
| with gr.Tab("Generate Video"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| image_input = gr.Image(label="Upload Image (any size - will be adapted)", type="pil") | |
| tts_text_input = gr.Textbox( | |
| label="TTS Text (will appear as fixed overlay)", | |
| lines=4, | |
| placeholder="Enter the text to display on screen..." | |
| ) | |
| voice_dropdown = gr.Dropdown( | |
| choices=["Auto"] + KOKORO_VOICES, | |
| value="Auto", | |
| label="Voice (Auto = rotation)" | |
| ) | |
| speed_slider = gr.Slider( | |
| minimum=0.5, | |
| maximum=2.0, | |
| value=0.8, | |
| step=0.1, | |
| label="Speech Speed" | |
| ) | |
| generate_btn = gr.Button("🎬 Generate YouTube Shorts Video", variant="primary") | |
| status_output = gr.Textbox(label="Status", lines=5) | |
| voice_rotation_output = gr.Textbox(label="Voice Info", lines=3) | |
| with gr.Column(scale=1): | |
| video_output = gr.Video(label="Generated YouTube Shorts Video (1080x1920)") | |
| generate_btn.click( | |
| fn=gradio_generate_video_custom, | |
| inputs=[image_input, tts_text_input, voice_dropdown, speed_slider], | |
| outputs=[video_output, status_output, voice_rotation_output] | |
| ) | |
| with gr.Tab("History"): | |
| refresh_btn = gr.Button("🔄 Refresh") | |
| history_gallery = gr.Gallery(label="Recent Videos", columns=2) | |
| refresh_btn.click(fn=gradio_refresh_history, outputs=[history_gallery]) | |
| with gr.Tab("Voice Management"): | |
| with gr.Row(): | |
| voice_info_btn = gr.Button("📊 Get Info") | |
| reset_btn = gr.Button("🔄 Reset Rotation") | |
| voice_mgmt_output = gr.Textbox(label="Voice Info", lines=5) | |
| voice_info_btn.click(fn=gradio_get_voice_info, outputs=[voice_mgmt_output]) | |
| reset_btn.click(fn=gradio_reset_voice_rotation, outputs=[voice_mgmt_output]) | |
| gr.Markdown(f"### {len(KOKORO_VOICES)} Voices Available") | |
| gr.Markdown("**British Female (4):** " + ", ".join([v for v in KOKORO_VOICES if v.startswith("bf_")])) | |
| gr.Markdown("**American Female (11):** " + ", ".join([v for v in KOKORO_VOICES if v.startswith("af_")])) | |
| gr.Markdown("**British Male (4):** " + ", ".join([v for v in KOKORO_VOICES if v.startswith("bm_")])) | |
| gr.Markdown("**American Male (8):** " + ", ".join([v for v in KOKORO_VOICES if v.startswith("am_")])) | |
| with gr.Tab("API Endpoints") as api_tab: | |
| gr.Markdown("### 🔌 API Endpoints for External Integration") | |
| gr.Markdown("Use these endpoints to integrate with other agents (e.g., Publisher Agent)") | |
| gr.Markdown("---") | |
| gr.Markdown("#### 📡 Endpoint 1: Simple API (Recommended for Publisher Agent)") | |
| gr.Markdown("- **API Name**: `generate_video_custom`") | |
| gr.Markdown("- **Parameters**: `image_base64` (str), `tts_text` (str)") | |
| gr.Markdown("- **Returns**: JSON with video_base64, video_path, status") | |
| gr.Markdown(f"- **Format**: YouTube Shorts ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT})") | |
| gr.Markdown("- **Auto-settings**: Voice rotation enabled, Speed = 0.8, Smart background filling") | |
| gr.Markdown("---") | |
| gr.Markdown("#### 📡 Endpoint 2: Full API (Advanced)") | |
| gr.Markdown("- **API Name**: `generate_video_full`") | |
| gr.Markdown("- **Parameters**: `image_base64` (str), `tts_text` (str), `voice` (str, optional), `speed` (float, optional)") | |
| gr.Markdown("- **Returns**: JSON with video_base64, video_path, status") | |
| gr.Markdown(f"- **Format**: YouTube Shorts ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT})") | |
| # ✅ CRITICAL FIX: Define API endpoints as separate gr.Interface outside Blocks | |
| # This is the CORRECT way to expose API endpoints in Gradio | |
| # API Endpoint 1: Simple API (for Publisher Agent) | |
| simple_api = gr.Interface( | |
| fn=gradio_api_simple, | |
| inputs=[ | |
| gr.Textbox(label="image_base64", placeholder="Base64 encoded image (any size - will be adapted to 1080x1920)"), | |
| gr.Textbox(label="tts_text", placeholder="Text for TTS audio synthesis") | |
| ], | |
| outputs=gr.JSON(label="API Response"), | |
| title="Simple YouTube Shorts Video API", | |
| description=f"Generate YouTube Shorts video ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT}) with automatic voice rotation and speed=0.8", | |
| api_name="generate_video_custom" | |
| ) | |
| # API Endpoint 2: Full API (with all options) | |
| def gradio_api_full_wrapper(image_base64: str, tts_text: str, voice: str = "Auto", speed: float = 0.8): | |
| """Wrapper to handle optional parameters""" | |
| voice_to_use = None if voice == "Auto" or voice == "" else voice | |
| return gradio_api_endpoint_custom(image_base64, tts_text, voice_to_use, speed) | |
| full_api = gr.Interface( | |
| fn=gradio_api_full_wrapper, | |
| inputs=[ | |
| gr.Textbox(label="image_base64", placeholder="Base64 encoded image (any size)"), | |
| gr.Textbox(label="tts_text", placeholder="Text for TTS"), | |
| gr.Dropdown(choices=["Auto"] + KOKORO_VOICES, value="Auto", label="voice"), | |
| gr.Slider(minimum=0.5, maximum=2.0, value=0.8, step=0.1, label="speed") | |
| ], | |
| outputs=gr.JSON(label="API Response"), | |
| title="Full YouTube Shorts Video API", | |
| description=f"Generate YouTube Shorts video ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT}) with custom voice and speed settings", | |
| api_name="generate_video_full" | |
| ) | |
| # Mount both APIs into the main demo using TabbedInterface | |
| combined_demo = gr.TabbedInterface( | |
| [demo, simple_api, full_api], | |
| ["Main Interface", "API: Simple", "API: Full"], | |
| title=f"🎬 Video Agent - YouTube Shorts ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT})" | |
| ) | |
| if __name__ == "__main__": | |
| PORT = int(os.getenv("PORT", "7860")) | |
| log.info(f"Starting Video Agent with YouTube Shorts format ({YOUTUBE_SHORTS_WIDTH}x{YOUTUBE_SHORTS_HEIGHT})...") | |
| combined_demo.launch(server_name="0.0.0.0", server_port=PORT) | |