Der11 / qwen_omni_utils.py
Derr11's picture
Create qwen_omni_utils.py
2fdaf8f verified
raw
history blame
5.45 kB
import copy
import os
import requests
import io
import numpy as np
import soundfile as sf
from PIL import Image
# محاولة استيراد decord لمعالجة الفيديو، إذا لم يكن موجوداً لن يتوقف الكود بالكامل
try:
import decord
decord.bridge.set_bridge('torch')
except ImportError:
decord = None
print("Warning: 'decord' module not found. Video processing will not work.")
# محاولة استيراد librosa لمعالجة الصوت المتقدمة
try:
import librosa
except ImportError:
librosa = None
def _load_image(image_path):
"""تحميل الصورة من رابط أو مسار محلي وتحويلها إلى RGB"""
if image_path.startswith("http://") or image_path.startswith("https://"):
response = requests.get(image_path, timeout=10)
image = Image.open(io.BytesIO(response.content))
else:
image = Image.open(image_path)
return image.convert("RGB")
def _load_audio(audio_path, target_sr=16000):
"""تحميل الصوت وإعادة تعيين معدل الترميز (Sampling Rate)"""
if audio_path.startswith("http://") or audio_path.startswith("https://"):
response = requests.get(audio_path, timeout=10)
# استخدام io.BytesIO للقراءة من الذاكرة
audio_data, sr = sf.read(io.BytesIO(response.content))
else:
audio_data, sr = sf.read(audio_path)
# تحويل إلى Mono إذا كان Stereo
if len(audio_data.shape) > 1:
audio_data = audio_data.mean(axis=1)
# إعادة تشكيل التردد (Resampling) إذا توفر librosa وكان التردد مختلفاً
if librosa and sr != target_sr:
audio_data = librosa.resample(audio_data, orig_sr=sr, target_sr=target_sr)
return audio_data
def _load_video(video_path, n_frames=8, use_audio=True):
"""معالجة الفيديو: استخراج الإطارات والصوت"""
if decord is None:
raise ImportError("Please install 'decord' to support video processing.")
# تحميل الفيديو (يدعم الروابط المباشرة في بعض إصدارات decord، ولكن يفضل تحميله مؤقتاً)
if video_path.startswith("http"):
# تحميل الملف مؤقتاً
response = requests.get(video_path, stream=True)
temp_filename = "temp_video.mp4"
with open(temp_filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
vr = decord.VideoReader(temp_filename)
else:
vr = decord.VideoReader(video_path)
# استخراج الإطارات (Sampling Frames)
total_frames = len(vr)
# اختيار إطارات موزعة بانتظام
frame_indices = np.linspace(0, total_frames - 1, n_frames, dtype=int)
frames = vr.get_batch(frame_indices).asnumpy()
# تحويل الإطارات إلى قائمة من صور PIL
pil_frames = [Image.fromarray(frame) for frame in frames]
audio_data = None
if use_audio:
# ملاحظة: استخراج الصوت من الفيديو يتطلب معالجة إضافية (عادة عبر ffmpeg)
# هنا سنضع قيمة فارغة لأن decord يركز على الصور،
# في التطبيقات الفعلية يتم استخدام moviepy أو ffmpeg لاستخراج المسار الصوتي
pass
return pil_frames, audio_data
def process_mm_info(conversation, use_audio_in_video=True):
"""
الدالة الرئيسية لمعالجة الوسائط المتعددة.
تقوم بتحويل الروابط النصية إلى كائنات بيانات (Tensors/Images) يفهمها النموذج.
"""
conversation = copy.deepcopy(conversation)
audios = []
images = []
videos = []
for message in conversation:
if "content" in message and isinstance(message["content"], list):
for item in message["content"]:
try:
if item["type"] == "audio":
# تحميل ومعالجة الصوت
audio_data = _load_audio(item["audio"])
audios.append(audio_data)
elif item["type"] == "image":
# تحميل ومعالجة الصورة
image_data = _load_image(item["image"])
images.append(image_data)
elif item["type"] == "video":
# تحميل ومعالجة الفيديو
video_frames, video_audio = _load_video(
item["video"],
use_audio=use_audio_in_video
)
videos.append(video_frames)
if use_audio_in_video and video_audio is not None:
audios.append(video_audio)
except Exception as e:
print(f"Error processing {item['type']}: {e}")
# في حالة الخطأ، يمكن تجاهل العنصر أو إضافة عنصر فارغ لتجنب انهيار الكود
pass
return audios, images, videos