Because / analyzer.py
mrwabnalas40's picture
Upload 134 files
6661bb6 verified
from telegram_imports import *
# analyzer.py
# -*- coding: utf-8 -*-
"""
نظام تحليل الوسائط المتعدد - محسّن ومطوّر
يدعم تحليل الصور، الفيديو، الصوت، PDF، والروابط
"""
import os
import tempfile
import requests
import logging
from PIL import Image
from io import BytesIO
import PyPDF2
from urllib.parse import urlparse
from typing import Dict, Optional, Tuple, Any, List
import time
import re
# إعداد التسجيل
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ============= إدارة التبعيات =============
class DependencyManager:
"""إدارة التبعيات الاختيارية مع fallbacks"""
def __init__(self):
self.dependencies = {}
self._check_dependencies()
def _check_dependencies(self):
"""فحص توفر المكتبات"""
# moviepy للفيديو
try:
import moviepy.editor as mp
self.dependencies['moviepy'] = True
self.moviepy = mp
except ImportError:
self.dependencies['moviepy'] = False
logger.warning("moviepy غير متوفر - تحليل الفيديو محدود")
# OpenCV
try:
import cv2
self.dependencies['opencv'] = True
self.cv2 = cv2
except ImportError:
self.dependencies['opencv'] = False
logger.warning("OpenCV غير متوفر - معالجة الفيديو محدودة")
# pydub للصوت
try:
from pydub import AudioSegment
self.dependencies['pydub'] = True
self.AudioSegment = AudioSegment
except ImportError:
self.dependencies['pydub'] = False
logger.warning("pydub غير متوفر - معالجة الصوت محدودة")
# PyAV
try:
import av
self.dependencies['pyav'] = True
self.av = av
except ImportError:
self.dependencies['pyav'] = False
# speech_recognition
try:
import speech_recognition as sr
self.dependencies['speech_recognition'] = True
self.sr = sr
except ImportError:
self.dependencies['speech_recognition'] = False
logger.warning("speech_recognition غير متوفر - التعرف على الصوت غير متاح")
def is_available(self, lib_name: str) -> bool:
"""التحقق من توفر مكتبة"""
return self.dependencies.get(lib_name, False)
# تهيئة مدير التبعيات
deps = DependencyManager()
# ============= أدوات مساعدة =============
class URLValidator:
"""التحقق من صحة وإصلاح الروابط"""
@staticmethod
def is_valid_url(url: str) -> bool:
"""التحقق من صحة الرابط"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except Exception:
return False
@staticmethod
def fix_url(url: str) -> str:
"""إصلاح الروابط الناقصة"""
if not url or not isinstance(url, str):
return ""
url = url.strip()
if not url:
return ""
if not url.startswith(("http://", "https://")):
url = "https://" + url.lstrip(":/")
if not URLValidator.is_valid_url(url):
return url # إرجاع الرابط كما هو حتى لو كان غير صالح
return url
class FileManager:
"""إدارة آمنة للملفات المؤقتة"""
@staticmethod
def download_file(url: str, timeout: int = 30) -> Tuple[str, str]:
"""تحميل ملف مؤقت مع إدارة الآمنة"""
response = requests.get(url, timeout=timeout, stream=True)
response.raise_for_status()
# تحديد الامتداد من نوع المحتوى أو الرابط
content_type = response.headers.get('content-type', '')
extension = FileManager._get_extension_from_content_type(content_type, url)
# إنشاء ملف مؤقت
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as temp_file:
for chunk in response.iter_content(chunk_size=8192):
temp_file.write(chunk)
temp_path = temp_file.name
return temp_path, content_type
@staticmethod
def _get_extension_from_content_type(content_type: str, url: str) -> str:
"""استخراج الامتداد المناسب"""
extensions = {
'image/jpeg': '.jpg',
'image/png': '.png',
'image/gif': '.gif',
'image/webp': '.webp',
'application/pdf': '.pdf',
'video/mp4': '.mp4',
'audio/mpeg': '.mp3',
'audio/wav': '.wav'
}
# محاولة من نوع المحتوى أولاً
for ct, ext in extensions.items():
if ct in content_type:
return ext
# fallback إلى الرابط
url_lower = url.lower()
for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.pdf', '.mp4', '.mp3', '.wav']:
if url_lower.endswith(ext):
return ext
return '.tmp'
@staticmethod
def safe_cleanup(file_path: str):
"""تنظيف آمن للملفات"""
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
except Exception as e:
logger.warning(f"فشل في حذف الملف {file_path}: {e}")
# ============= محلل الوسائط الأساسي =============
class MediaAnalyzer:
"""الفئة الرئيسية لتحليل الوسائط"""
def __init__(self, timeout: int = 30, max_file_size: int = 100 * 1024 * 1024): # 100MB
self.timeout = timeout
self.max_file_size = max_file_size
def analyze(self, url: str) -> Dict[str, Any]:
"""
تحليل الوسائط من رابط - الوظيفة الرئيسية
Args:
url: رابط الملف أو الوسائط
Returns:
dict: معلومات التحليل
"""
try:
# التحقق من صحة الرابط
fixed_url = URLValidator.fix_url(url)
# تحديد نوع الوسائط
media_type = self._detect_media_type(fixed_url)
logger.info(f"تحليل {media_type} من: {fixed_url}")
# التحليل حسب النوع
if media_type == 'image':
return self._analyze_image(fixed_url)
elif media_type == 'pdf':
return self._analyze_pdf(fixed_url)
elif media_type == 'audio':
return self._analyze_audio(fixed_url)
elif media_type == 'video':
return self._analyze_video(fixed_url)
else:
return self._analyze_general_link(fixed_url)
except Exception as e:
logger.error(f"خطأ في تحليل {url}: {e}")
return {
'success': False,
'error': str(e),
'type': 'error',
'url': url
}
def _detect_media_type(self, url: str) -> str:
"""تحديد نوع الوسائط"""
if not url:
return 'unknown'
url_lower = url.lower()
# الصور
image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg']
if any(url_lower.endswith(ext) for ext in image_extensions):
return 'image'
# PDF
if url_lower.endswith('.pdf'):
return 'pdf'
# الفيديو
video_extensions = ['.mp4', '.mov', '.avi', '.webm', '.mkv', '.flv', '.wmv']
if any(url_lower.endswith(ext) for ext in video_extensions):
return 'video'
# الصوت
audio_extensions = ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac']
if any(url_lower.endswith(ext) for ext in audio_extensions):
return 'audio'
return 'link'
def _analyze_image(self, url: str) -> Dict[str, Any]:
"""تحليل الصور"""
temp_path = None
try:
temp_path, content_type = FileManager.download_file(url, self.timeout)
with Image.open(temp_path) as img:
info = {
'success': True,
'type': 'image',
'url': url,
'format': img.format,
'size': img.size,
'mode': img.mode,
'content_type': content_type
}
return info
except Exception as e:
return {
'success': False,
'type': 'image',
'url': url,
'error': f"خطأ في تحليل الصورة: {e}"
}
finally:
if temp_path:
FileManager.safe_cleanup(temp_path)
def _analyze_pdf(self, url: str) -> Dict[str, Any]:
"""تحليل ملفات PDF"""
temp_path = None
try:
temp_path, content_type = FileManager.download_file(url, self.timeout)
with open(temp_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
text = ""
for page in reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
info = {
'success': True,
'type': 'pdf',
'url': url,
'page_count': len(reader.pages),
'text_preview': text[:1000] + "..." if len(text) > 1000 else text,
'content_type': content_type
}
return info
except Exception as e:
return {
'success': False,
'type': 'pdf',
'url': url,
'error': f"خطأ في تحليل PDF: {e}"
}
finally:
if temp_path:
FileManager.safe_cleanup(temp_path)
def _analyze_audio(self, url: str) -> Dict[str, Any]:
"""تحليل الملفات الصوتية"""
if not deps.is_available('speech_recognition'):
return {
'success': False,
'type': 'audio',
'url': url,
'error': "مكتبة التعرف على الصوت غير متوفرة"
}
temp_path = None
try:
temp_path, content_type = FileManager.download_file(url, self.timeout)
# التحويل إلى WAV إذا لزم الأمر
if deps.is_available('pydub') and not temp_path.endswith('.wav'):
audio = deps.AudioSegment.from_file(temp_path)
wav_path = temp_path + '.wav'
audio.export(wav_path, format='wav')
FileManager.safe_cleanup(temp_path)
temp_path = wav_path
# التعرف على الكلام
recognizer = deps.sr.Recognizer()
with deps.sr.AudioFile(temp_path) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data, language="ar-SA")
return {
'success': True,
'type': 'audio',
'url': url,
'transcript': text,
'content_type': content_type
}
except deps.sr.UnknownValueError:
return {
'success': True,
'type': 'audio',
'url': url,
'transcript': "لم يتم التعرف على كلام واضح",
'content_type': content_type
}
except Exception as e:
return {
'success': False,
'type': 'audio',
'url': url,
'error': f"خطأ في تحليل الصوت: {e}"
}
finally:
if temp_path:
FileManager.safe_cleanup(temp_path)
def _analyze_video(self, url: str) -> Dict[str, Any]:
"""تحليل الفيديو"""
temp_path = None
audio_path = None
try:
temp_path, content_type = FileManager.download_file(url, self.timeout)
info = {
'success': True,
'type': 'video',
'url': url,
'content_type': content_type,
'analysis_methods': []
}
# التحليل باستخدام moviepy إذا متوفر
if deps.is_available('moviepy'):
try:
video = deps.moviepy.VideoFileClip(temp_path)
info['duration'] = video.duration
info['resolution'] = video.size
info['fps'] = video.fps
info['analysis_methods'].append('moviepy')
# استخراج الصوت إذا أمكن
if deps.is_available('speech_recognition') and video.audio:
audio_path = temp_path + '.wav'
video.audio.write_audiofile(audio_path, verbose=False, logger=None)
recognizer = deps.sr.Recognizer()
with deps.sr.AudioFile(audio_path) as source:
audio_data = recognizer.record(source)
text = recognizer.recognize_google(audio_data, language="ar-SA")
info['audio_transcript'] = text
video.close()
except Exception as e:
logger.warning(f"فشل تحليل moviepy: {e}")
info['moviepy_error'] = str(e)
# التحليل باستخدام OpenCV إذا متوفر
if deps.is_available('opencv'):
try:
cap = deps.cv2.VideoCapture(temp_path)
frame_count = int(cap.get(deps.cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(deps.cv2.CAP_PROP_FPS)
info['frame_count'] = frame_count
info['opencv_fps'] = fps
info['analysis_methods'].append('opencv')
cap.release()
except Exception as e:
logger.warning(f"فشل تحليل OpenCV: {e}")
info['opencv_error'] = str(e)
return info
except Exception as e:
return {
'success': False,
'type': 'video',
'url': url,
'error': f"خطأ في تحليل الفيديو: {e}"
}
finally:
if temp_path:
FileManager.safe_cleanup(temp_path)
if audio_path and os.path.exists(audio_path):
FileManager.safe_cleanup(audio_path)
def _analyze_general_link(self, url: str) -> Dict[str, Any]:
"""تحليل الروابط العامة"""
try:
response = requests.head(url, timeout=self.timeout, allow_redirects=True)
domain = urlparse(url).netloc
link_type = self._classify_link_type(domain, response.headers)
return {
'success': True,
'type': 'link',
'url': url,
'link_type': link_type,
'domain': domain,
'content_type': response.headers.get('content-type', ''),
'status_code': response.status_code
}
except Exception as e:
return {
'success': False,
'type': 'link',
'url': url,
'error': f"خطأ في تحليل الرابط: {e}"
}
def _classify_link_type(self, domain: str, headers: Dict) -> str:
"""تصنيف نوع الرابط"""
domain_lower = domain.lower()
content_type = headers.get('content-type', '').lower()
if any(platform in domain_lower for platform in ['youtube', 'youtu.be']):
return 'youtube'
elif 'github.com' in domain_lower:
return 'github'
elif any(platform in domain_lower for platform in ['twitter', 'x.com']):
return 'twitter'
elif 'instagram.com' in domain_lower:
return 'instagram'
elif 'tiktok.com' in domain_lower:
return 'tiktok'
elif 'text/html' in content_type:
return 'webpage'
else:
return 'general_link'
# ============= الدوال المطلوبة لـ learner.py =============
def fix_url(url: str) -> str:
"""
إصلاح الروابط وإكمالها إذا كانت ناقصة
هذه الدالة مطلوبة لـ learner.py
"""
return URLValidator.fix_url(url)
def detect_media_type(url: str) -> str:
"""
كشف نوع الوسائط من الرابط
هذه الدالة مطلوبة لـ learner.py
"""
if not url:
return "unknown"
analyzer = MediaAnalyzer()
return analyzer._detect_media_type(url)
def is_valid_url(url: str) -> bool:
"""
التحقق من صحة الرابط
هذه الدالة مطلوبة للتكامل مع النظام
"""
return URLValidator.is_valid_url(url)
def extract_domain(url: str) -> str:
"""
استخراج النطاق من الرابط
"""
try:
return urlparse(url).netloc
except:
return ""
def analyze_text_content(text: str) -> Dict[str, Any]:
"""
تحليل المحتوى النصي
"""
analysis = {
"length": len(text),
"word_count": len(text.split()),
"has_questions": any(char in text for char in '؟?'),
"has_links": 'http' in text.lower(),
"language": "arabic" if any(char in text for char in 'ء-ي') else "unknown",
"has_media_indicators": any(keyword in text.lower() for keyword in [
'صورة', 'فيديو', 'صوت', 'ملف', 'رابط', 'صور', 'فيديوهات'
])
}
return analysis
def extract_urls_from_text(text: str) -> List[str]:
"""
استخراج جميع الروابط من النص
"""
url_pattern = r'https?://[^\s]+|www\.[^\s]+'
urls = re.findall(url_pattern, text)
# إصلاح الروابط التي تبدأ بـ www
fixed_urls = []
for url in urls:
if url.startswith('www.'):
fixed_urls.append('https://' + url)
else:
fixed_urls.append(url)
return fixed_urls
# ============= واجهة مبسطة =============
def analyze_media(url: str) -> Dict[str, Any]:
"""
واجهة مبسطة لتحليل الوسائط
Args:
url: رابط الملف أو الوسائط
Returns:
dict: نتائج التحليل
"""
analyzer = MediaAnalyzer()
return analyzer.analyze(url)
def get_media_summary(url: str) -> str:
"""
الحصول على ملخص نصي لتحليل الوسائط
Args:
url: رابط الوسائط
Returns:
str: ملوصف نصي
"""
result = analyze_media(url)
if not result.get('success', False):
return f"❌ فشل التحليل: {result.get('error', 'خطأ غير معروف')}"
media_type = result.get('type', 'unknown')
if media_type == 'image':
return f"🖼️ صورة: {result.get('format', '')} - الحجم: {result.get('size', '')}"
elif media_type == 'pdf':
return f"📄 PDF: {result.get('page_count', 0)} صفحة - النص: {result.get('text_preview', '')[:100]}..."
elif media_type == 'audio':
return f"🎵 صوت: {result.get('transcript', 'لا يوجد نص معروف')}"
elif media_type == 'video':
return f"🎥 فيديو: {result.get('duration', 'مجهول')} ثانية - الدقة: {result.get('resolution', 'مجهول')}"
elif media_type == 'link':
return f"🔗 رابط: {result.get('link_type', 'عام')} - النطاق: {result.get('domain', '')}"
return f"📎 وسائط: {media_type}"
# ============= فئة مساعدة للتحليل السريع =============
class QuickAnalyzer:
"""محلل سريع للاستخدام العام"""
@staticmethod
def quick_analyze(url: str) -> Dict[str, Any]:
"""تحليل سريع بدون تحميل الملف"""
try:
fixed_url = fix_url(url)
media_type = detect_media_type(fixed_url)
return {
'success': True,
'url': fixed_url,
'type': media_type,
'domain': extract_domain(fixed_url),
'is_valid': is_valid_url(fixed_url)
}
except Exception as e:
return {
'success': False,
'url': url,
'error': str(e)
}
# ============= إنشاء نسخ عالمية =============
media_analyzer = MediaAnalyzer()
quick_analyzer = QuickAnalyzer()
url_validator = URLValidator()
# ============= اختبارات =============
# analyzer.py - أضف في نهاية الملف
def analyze_media_async(media_data):
"""نسخة غير متزامنة من تحليل الوسائط"""
return analyze_media(media_data)
def get_media_summary(analysis_result):
"""إنشاء ملخص للوسائط"""
return {"summary": "ملخص الوسائط"}
def extract_urls_from_text(text):
"""استخراج الروابط من النص"""
import re
return re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text)
def analyze_url_type(url):
"""تحليل نوع الرابط"""
return {"type": "unknown", "status": "not_analyzed"}
def fix_url(url):
"""إصلاح الرابط"""
return url
def is_valid_url(url):
"""التحقق من صحة الرابط"""
import re
return bool(re.match(r'^https?://', url))
if __name__ == "__main__":
# اختبارات متنوعة
test_urls = [
"https://example.com/image.jpg",
"https://example.com/document.pdf",
"https://www.youtube.com/watch?v=example",
"github.com/example/repo", # رابط ناقص
"example.com/video.mp4" # رابط ناقص
]
print("=== اختبار نظام تحليل الوسائط ===")
print(f"✅ الدوال المتاحة: fix_url, detect_media_type, is_valid_url")
for url in test_urls:
print(f"\nتحليل: {url}")
try:
# اختبار الدوال الجديدة
fixed = fix_url(url)
media_type = detect_media_type(fixed)
valid = is_valid_url(fixed)
print(f"📝 الرابط المصلح: {fixed}")
print(f"🔍 نوع الوسائط: {media_type}")
print(f"✅ الرابط صالح: {valid}")
# تحليل مفصل إذا كان الرابط صالحاً
if valid:
result = analyze_media(fixed)
summary = get_media_summary(fixed)
print(f"📊 النتيجة: {summary}")
except Exception as e:
print(f"❌ خطأ: {e}")
# اختبار استخراج الروابط من النص
test_text = "راجع هذا الموقع https://example.com وصفحة www.github.com للمزيد"
print(f"\n📝 اختبار استخراج الروابط من النص:")
print(f"النص: {test_text}")
urls = extract_urls_from_text(test_text)
print(f"الروابط المستخرجة: {urls}")
print("\n✅ جميع الدوال المطلوبة جاهزة للاستخدام!")