import copy import os import requests import io import numpy as np import soundfile as sf from PIL import Image # محاولة استيراد decord لمعالجة الفيديو، إذا لم يكن موجوداً لن يتوقف الكود بالكامل try: import decord decord.bridge.set_bridge('torch') except ImportError: decord = None print("Warning: 'decord' module not found. Video processing will not work.") # محاولة استيراد librosa لمعالجة الصوت المتقدمة try: import librosa except ImportError: librosa = None def _load_image(image_path): """تحميل الصورة من رابط أو مسار محلي وتحويلها إلى RGB""" if image_path.startswith("http://") or image_path.startswith("https://"): response = requests.get(image_path, timeout=10) image = Image.open(io.BytesIO(response.content)) else: image = Image.open(image_path) return image.convert("RGB") def _load_audio(audio_path, target_sr=16000): """تحميل الصوت وإعادة تعيين معدل الترميز (Sampling Rate)""" if audio_path.startswith("http://") or audio_path.startswith("https://"): response = requests.get(audio_path, timeout=10) # استخدام io.BytesIO للقراءة من الذاكرة audio_data, sr = sf.read(io.BytesIO(response.content)) else: audio_data, sr = sf.read(audio_path) # تحويل إلى Mono إذا كان Stereo if len(audio_data.shape) > 1: audio_data = audio_data.mean(axis=1) # إعادة تشكيل التردد (Resampling) إذا توفر librosa وكان التردد مختلفاً if librosa and sr != target_sr: audio_data = librosa.resample(audio_data, orig_sr=sr, target_sr=target_sr) return audio_data def _load_video(video_path, n_frames=8, use_audio=True): """معالجة الفيديو: استخراج الإطارات والصوت""" if decord is None: raise ImportError("Please install 'decord' to support video processing.") # تحميل الفيديو (يدعم الروابط المباشرة في بعض إصدارات decord، ولكن يفضل تحميله مؤقتاً) if video_path.startswith("http"): # تحميل الملف مؤقتاً response = requests.get(video_path, stream=True) temp_filename = "temp_video.mp4" with open(temp_filename, 'wb') as f: for chunk in response.iter_content(chunk_size=1024): if chunk: f.write(chunk) vr = decord.VideoReader(temp_filename) else: vr = decord.VideoReader(video_path) # استخراج الإطارات (Sampling Frames) total_frames = len(vr) # اختيار إطارات موزعة بانتظام frame_indices = np.linspace(0, total_frames - 1, n_frames, dtype=int) frames = vr.get_batch(frame_indices).asnumpy() # تحويل الإطارات إلى قائمة من صور PIL pil_frames = [Image.fromarray(frame) for frame in frames] audio_data = None if use_audio: # ملاحظة: استخراج الصوت من الفيديو يتطلب معالجة إضافية (عادة عبر ffmpeg) # هنا سنضع قيمة فارغة لأن decord يركز على الصور، # في التطبيقات الفعلية يتم استخدام moviepy أو ffmpeg لاستخراج المسار الصوتي pass return pil_frames, audio_data def process_mm_info(conversation, use_audio_in_video=True): """ الدالة الرئيسية لمعالجة الوسائط المتعددة. تقوم بتحويل الروابط النصية إلى كائنات بيانات (Tensors/Images) يفهمها النموذج. """ conversation = copy.deepcopy(conversation) audios = [] images = [] videos = [] for message in conversation: if "content" in message and isinstance(message["content"], list): for item in message["content"]: try: if item["type"] == "audio": # تحميل ومعالجة الصوت audio_data = _load_audio(item["audio"]) audios.append(audio_data) elif item["type"] == "image": # تحميل ومعالجة الصورة image_data = _load_image(item["image"]) images.append(image_data) elif item["type"] == "video": # تحميل ومعالجة الفيديو video_frames, video_audio = _load_video( item["video"], use_audio=use_audio_in_video ) videos.append(video_frames) if use_audio_in_video and video_audio is not None: audios.append(video_audio) except Exception as e: print(f"Error processing {item['type']}: {e}") # في حالة الخطأ، يمكن تجاهل العنصر أو إضافة عنصر فارغ لتجنب انهيار الكود pass return audios, images, videos