Spaces:
Running
Running
| from telegram_imports import * | |
| # analyzer.py | |
| # -*- coding: utf-8 -*- | |
| """ | |
| نظام تحليل الوسائط المتعدد - محسّن ومطوّر | |
| يدعم تحليل الصور، الفيديو، الصوت، PDF، والروابط | |
| """ | |
| import os | |
| import tempfile | |
| import requests | |
| import logging | |
| from PIL import Image | |
| from io import BytesIO | |
| import PyPDF2 | |
| from urllib.parse import urlparse | |
| from typing import Dict, Optional, Tuple, Any, List | |
| import time | |
| import re | |
| # إعداد التسجيل | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============= إدارة التبعيات ============= | |
| class DependencyManager: | |
| """إدارة التبعيات الاختيارية مع fallbacks""" | |
| def __init__(self): | |
| self.dependencies = {} | |
| self._check_dependencies() | |
| def _check_dependencies(self): | |
| """فحص توفر المكتبات""" | |
| # moviepy للفيديو | |
| try: | |
| import moviepy.editor as mp | |
| self.dependencies['moviepy'] = True | |
| self.moviepy = mp | |
| except ImportError: | |
| self.dependencies['moviepy'] = False | |
| logger.warning("moviepy غير متوفر - تحليل الفيديو محدود") | |
| # OpenCV | |
| try: | |
| import cv2 | |
| self.dependencies['opencv'] = True | |
| self.cv2 = cv2 | |
| except ImportError: | |
| self.dependencies['opencv'] = False | |
| logger.warning("OpenCV غير متوفر - معالجة الفيديو محدودة") | |
| # pydub للصوت | |
| try: | |
| from pydub import AudioSegment | |
| self.dependencies['pydub'] = True | |
| self.AudioSegment = AudioSegment | |
| except ImportError: | |
| self.dependencies['pydub'] = False | |
| logger.warning("pydub غير متوفر - معالجة الصوت محدودة") | |
| # PyAV | |
| try: | |
| import av | |
| self.dependencies['pyav'] = True | |
| self.av = av | |
| except ImportError: | |
| self.dependencies['pyav'] = False | |
| # speech_recognition | |
| try: | |
| import speech_recognition as sr | |
| self.dependencies['speech_recognition'] = True | |
| self.sr = sr | |
| except ImportError: | |
| self.dependencies['speech_recognition'] = False | |
| logger.warning("speech_recognition غير متوفر - التعرف على الصوت غير متاح") | |
| def is_available(self, lib_name: str) -> bool: | |
| """التحقق من توفر مكتبة""" | |
| return self.dependencies.get(lib_name, False) | |
| # تهيئة مدير التبعيات | |
| deps = DependencyManager() | |
| # ============= أدوات مساعدة ============= | |
| class URLValidator: | |
| """التحقق من صحة وإصلاح الروابط""" | |
| def is_valid_url(url: str) -> bool: | |
| """التحقق من صحة الرابط""" | |
| try: | |
| result = urlparse(url) | |
| return all([result.scheme, result.netloc]) | |
| except Exception: | |
| return False | |
| def fix_url(url: str) -> str: | |
| """إصلاح الروابط الناقصة""" | |
| if not url or not isinstance(url, str): | |
| return "" | |
| url = url.strip() | |
| if not url: | |
| return "" | |
| if not url.startswith(("http://", "https://")): | |
| url = "https://" + url.lstrip(":/") | |
| if not URLValidator.is_valid_url(url): | |
| return url # إرجاع الرابط كما هو حتى لو كان غير صالح | |
| return url | |
| class FileManager: | |
| """إدارة آمنة للملفات المؤقتة""" | |
| def download_file(url: str, timeout: int = 30) -> Tuple[str, str]: | |
| """تحميل ملف مؤقت مع إدارة الآمنة""" | |
| response = requests.get(url, timeout=timeout, stream=True) | |
| response.raise_for_status() | |
| # تحديد الامتداد من نوع المحتوى أو الرابط | |
| content_type = response.headers.get('content-type', '') | |
| extension = FileManager._get_extension_from_content_type(content_type, url) | |
| # إنشاء ملف مؤقت | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as temp_file: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| temp_file.write(chunk) | |
| temp_path = temp_file.name | |
| return temp_path, content_type | |
| def _get_extension_from_content_type(content_type: str, url: str) -> str: | |
| """استخراج الامتداد المناسب""" | |
| extensions = { | |
| 'image/jpeg': '.jpg', | |
| 'image/png': '.png', | |
| 'image/gif': '.gif', | |
| 'image/webp': '.webp', | |
| 'application/pdf': '.pdf', | |
| 'video/mp4': '.mp4', | |
| 'audio/mpeg': '.mp3', | |
| 'audio/wav': '.wav' | |
| } | |
| # محاولة من نوع المحتوى أولاً | |
| for ct, ext in extensions.items(): | |
| if ct in content_type: | |
| return ext | |
| # fallback إلى الرابط | |
| url_lower = url.lower() | |
| for ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.pdf', '.mp4', '.mp3', '.wav']: | |
| if url_lower.endswith(ext): | |
| return ext | |
| return '.tmp' | |
| def safe_cleanup(file_path: str): | |
| """تنظيف آمن للملفات""" | |
| try: | |
| if file_path and os.path.exists(file_path): | |
| os.remove(file_path) | |
| except Exception as e: | |
| logger.warning(f"فشل في حذف الملف {file_path}: {e}") | |
| # ============= محلل الوسائط الأساسي ============= | |
| class MediaAnalyzer: | |
| """الفئة الرئيسية لتحليل الوسائط""" | |
| def __init__(self, timeout: int = 30, max_file_size: int = 100 * 1024 * 1024): # 100MB | |
| self.timeout = timeout | |
| self.max_file_size = max_file_size | |
| def analyze(self, url: str) -> Dict[str, Any]: | |
| """ | |
| تحليل الوسائط من رابط - الوظيفة الرئيسية | |
| Args: | |
| url: رابط الملف أو الوسائط | |
| Returns: | |
| dict: معلومات التحليل | |
| """ | |
| try: | |
| # التحقق من صحة الرابط | |
| fixed_url = URLValidator.fix_url(url) | |
| # تحديد نوع الوسائط | |
| media_type = self._detect_media_type(fixed_url) | |
| logger.info(f"تحليل {media_type} من: {fixed_url}") | |
| # التحليل حسب النوع | |
| if media_type == 'image': | |
| return self._analyze_image(fixed_url) | |
| elif media_type == 'pdf': | |
| return self._analyze_pdf(fixed_url) | |
| elif media_type == 'audio': | |
| return self._analyze_audio(fixed_url) | |
| elif media_type == 'video': | |
| return self._analyze_video(fixed_url) | |
| else: | |
| return self._analyze_general_link(fixed_url) | |
| except Exception as e: | |
| logger.error(f"خطأ في تحليل {url}: {e}") | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'type': 'error', | |
| 'url': url | |
| } | |
| def _detect_media_type(self, url: str) -> str: | |
| """تحديد نوع الوسائط""" | |
| if not url: | |
| return 'unknown' | |
| url_lower = url.lower() | |
| # الصور | |
| image_extensions = ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp', '.svg'] | |
| if any(url_lower.endswith(ext) for ext in image_extensions): | |
| return 'image' | |
| if url_lower.endswith('.pdf'): | |
| return 'pdf' | |
| # الفيديو | |
| video_extensions = ['.mp4', '.mov', '.avi', '.webm', '.mkv', '.flv', '.wmv'] | |
| if any(url_lower.endswith(ext) for ext in video_extensions): | |
| return 'video' | |
| # الصوت | |
| audio_extensions = ['.mp3', '.wav', '.ogg', '.m4a', '.flac', '.aac'] | |
| if any(url_lower.endswith(ext) for ext in audio_extensions): | |
| return 'audio' | |
| return 'link' | |
| def _analyze_image(self, url: str) -> Dict[str, Any]: | |
| """تحليل الصور""" | |
| temp_path = None | |
| try: | |
| temp_path, content_type = FileManager.download_file(url, self.timeout) | |
| with Image.open(temp_path) as img: | |
| info = { | |
| 'success': True, | |
| 'type': 'image', | |
| 'url': url, | |
| 'format': img.format, | |
| 'size': img.size, | |
| 'mode': img.mode, | |
| 'content_type': content_type | |
| } | |
| return info | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'type': 'image', | |
| 'url': url, | |
| 'error': f"خطأ في تحليل الصورة: {e}" | |
| } | |
| finally: | |
| if temp_path: | |
| FileManager.safe_cleanup(temp_path) | |
| def _analyze_pdf(self, url: str) -> Dict[str, Any]: | |
| """تحليل ملفات PDF""" | |
| temp_path = None | |
| try: | |
| temp_path, content_type = FileManager.download_file(url, self.timeout) | |
| with open(temp_path, "rb") as f: | |
| reader = PyPDF2.PdfReader(f) | |
| text = "" | |
| for page in reader.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| info = { | |
| 'success': True, | |
| 'type': 'pdf', | |
| 'url': url, | |
| 'page_count': len(reader.pages), | |
| 'text_preview': text[:1000] + "..." if len(text) > 1000 else text, | |
| 'content_type': content_type | |
| } | |
| return info | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'type': 'pdf', | |
| 'url': url, | |
| 'error': f"خطأ في تحليل PDF: {e}" | |
| } | |
| finally: | |
| if temp_path: | |
| FileManager.safe_cleanup(temp_path) | |
| def _analyze_audio(self, url: str) -> Dict[str, Any]: | |
| """تحليل الملفات الصوتية""" | |
| if not deps.is_available('speech_recognition'): | |
| return { | |
| 'success': False, | |
| 'type': 'audio', | |
| 'url': url, | |
| 'error': "مكتبة التعرف على الصوت غير متوفرة" | |
| } | |
| temp_path = None | |
| try: | |
| temp_path, content_type = FileManager.download_file(url, self.timeout) | |
| # التحويل إلى WAV إذا لزم الأمر | |
| if deps.is_available('pydub') and not temp_path.endswith('.wav'): | |
| audio = deps.AudioSegment.from_file(temp_path) | |
| wav_path = temp_path + '.wav' | |
| audio.export(wav_path, format='wav') | |
| FileManager.safe_cleanup(temp_path) | |
| temp_path = wav_path | |
| # التعرف على الكلام | |
| recognizer = deps.sr.Recognizer() | |
| with deps.sr.AudioFile(temp_path) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data, language="ar-SA") | |
| return { | |
| 'success': True, | |
| 'type': 'audio', | |
| 'url': url, | |
| 'transcript': text, | |
| 'content_type': content_type | |
| } | |
| except deps.sr.UnknownValueError: | |
| return { | |
| 'success': True, | |
| 'type': 'audio', | |
| 'url': url, | |
| 'transcript': "لم يتم التعرف على كلام واضح", | |
| 'content_type': content_type | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'type': 'audio', | |
| 'url': url, | |
| 'error': f"خطأ في تحليل الصوت: {e}" | |
| } | |
| finally: | |
| if temp_path: | |
| FileManager.safe_cleanup(temp_path) | |
| def _analyze_video(self, url: str) -> Dict[str, Any]: | |
| """تحليل الفيديو""" | |
| temp_path = None | |
| audio_path = None | |
| try: | |
| temp_path, content_type = FileManager.download_file(url, self.timeout) | |
| info = { | |
| 'success': True, | |
| 'type': 'video', | |
| 'url': url, | |
| 'content_type': content_type, | |
| 'analysis_methods': [] | |
| } | |
| # التحليل باستخدام moviepy إذا متوفر | |
| if deps.is_available('moviepy'): | |
| try: | |
| video = deps.moviepy.VideoFileClip(temp_path) | |
| info['duration'] = video.duration | |
| info['resolution'] = video.size | |
| info['fps'] = video.fps | |
| info['analysis_methods'].append('moviepy') | |
| # استخراج الصوت إذا أمكن | |
| if deps.is_available('speech_recognition') and video.audio: | |
| audio_path = temp_path + '.wav' | |
| video.audio.write_audiofile(audio_path, verbose=False, logger=None) | |
| recognizer = deps.sr.Recognizer() | |
| with deps.sr.AudioFile(audio_path) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data, language="ar-SA") | |
| info['audio_transcript'] = text | |
| video.close() | |
| except Exception as e: | |
| logger.warning(f"فشل تحليل moviepy: {e}") | |
| info['moviepy_error'] = str(e) | |
| # التحليل باستخدام OpenCV إذا متوفر | |
| if deps.is_available('opencv'): | |
| try: | |
| cap = deps.cv2.VideoCapture(temp_path) | |
| frame_count = int(cap.get(deps.cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = cap.get(deps.cv2.CAP_PROP_FPS) | |
| info['frame_count'] = frame_count | |
| info['opencv_fps'] = fps | |
| info['analysis_methods'].append('opencv') | |
| cap.release() | |
| except Exception as e: | |
| logger.warning(f"فشل تحليل OpenCV: {e}") | |
| info['opencv_error'] = str(e) | |
| return info | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'type': 'video', | |
| 'url': url, | |
| 'error': f"خطأ في تحليل الفيديو: {e}" | |
| } | |
| finally: | |
| if temp_path: | |
| FileManager.safe_cleanup(temp_path) | |
| if audio_path and os.path.exists(audio_path): | |
| FileManager.safe_cleanup(audio_path) | |
| def _analyze_general_link(self, url: str) -> Dict[str, Any]: | |
| """تحليل الروابط العامة""" | |
| try: | |
| response = requests.head(url, timeout=self.timeout, allow_redirects=True) | |
| domain = urlparse(url).netloc | |
| link_type = self._classify_link_type(domain, response.headers) | |
| return { | |
| 'success': True, | |
| 'type': 'link', | |
| 'url': url, | |
| 'link_type': link_type, | |
| 'domain': domain, | |
| 'content_type': response.headers.get('content-type', ''), | |
| 'status_code': response.status_code | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'type': 'link', | |
| 'url': url, | |
| 'error': f"خطأ في تحليل الرابط: {e}" | |
| } | |
| def _classify_link_type(self, domain: str, headers: Dict) -> str: | |
| """تصنيف نوع الرابط""" | |
| domain_lower = domain.lower() | |
| content_type = headers.get('content-type', '').lower() | |
| if any(platform in domain_lower for platform in ['youtube', 'youtu.be']): | |
| return 'youtube' | |
| elif 'github.com' in domain_lower: | |
| return 'github' | |
| elif any(platform in domain_lower for platform in ['twitter', 'x.com']): | |
| return 'twitter' | |
| elif 'instagram.com' in domain_lower: | |
| return 'instagram' | |
| elif 'tiktok.com' in domain_lower: | |
| return 'tiktok' | |
| elif 'text/html' in content_type: | |
| return 'webpage' | |
| else: | |
| return 'general_link' | |
| # ============= الدوال المطلوبة لـ learner.py ============= | |
| def fix_url(url: str) -> str: | |
| """ | |
| إصلاح الروابط وإكمالها إذا كانت ناقصة | |
| هذه الدالة مطلوبة لـ learner.py | |
| """ | |
| return URLValidator.fix_url(url) | |
| def detect_media_type(url: str) -> str: | |
| """ | |
| كشف نوع الوسائط من الرابط | |
| هذه الدالة مطلوبة لـ learner.py | |
| """ | |
| if not url: | |
| return "unknown" | |
| analyzer = MediaAnalyzer() | |
| return analyzer._detect_media_type(url) | |
| def is_valid_url(url: str) -> bool: | |
| """ | |
| التحقق من صحة الرابط | |
| هذه الدالة مطلوبة للتكامل مع النظام | |
| """ | |
| return URLValidator.is_valid_url(url) | |
| def extract_domain(url: str) -> str: | |
| """ | |
| استخراج النطاق من الرابط | |
| """ | |
| try: | |
| return urlparse(url).netloc | |
| except: | |
| return "" | |
| def analyze_text_content(text: str) -> Dict[str, Any]: | |
| """ | |
| تحليل المحتوى النصي | |
| """ | |
| analysis = { | |
| "length": len(text), | |
| "word_count": len(text.split()), | |
| "has_questions": any(char in text for char in '؟?'), | |
| "has_links": 'http' in text.lower(), | |
| "language": "arabic" if any(char in text for char in 'ء-ي') else "unknown", | |
| "has_media_indicators": any(keyword in text.lower() for keyword in [ | |
| 'صورة', 'فيديو', 'صوت', 'ملف', 'رابط', 'صور', 'فيديوهات' | |
| ]) | |
| } | |
| return analysis | |
| def extract_urls_from_text(text: str) -> List[str]: | |
| """ | |
| استخراج جميع الروابط من النص | |
| """ | |
| url_pattern = r'https?://[^\s]+|www\.[^\s]+' | |
| urls = re.findall(url_pattern, text) | |
| # إصلاح الروابط التي تبدأ بـ www | |
| fixed_urls = [] | |
| for url in urls: | |
| if url.startswith('www.'): | |
| fixed_urls.append('https://' + url) | |
| else: | |
| fixed_urls.append(url) | |
| return fixed_urls | |
| # ============= واجهة مبسطة ============= | |
| def analyze_media(url: str) -> Dict[str, Any]: | |
| """ | |
| واجهة مبسطة لتحليل الوسائط | |
| Args: | |
| url: رابط الملف أو الوسائط | |
| Returns: | |
| dict: نتائج التحليل | |
| """ | |
| analyzer = MediaAnalyzer() | |
| return analyzer.analyze(url) | |
| def get_media_summary(url: str) -> str: | |
| """ | |
| الحصول على ملخص نصي لتحليل الوسائط | |
| Args: | |
| url: رابط الوسائط | |
| Returns: | |
| str: ملوصف نصي | |
| """ | |
| result = analyze_media(url) | |
| if not result.get('success', False): | |
| return f"❌ فشل التحليل: {result.get('error', 'خطأ غير معروف')}" | |
| media_type = result.get('type', 'unknown') | |
| if media_type == 'image': | |
| return f"🖼️ صورة: {result.get('format', '')} - الحجم: {result.get('size', '')}" | |
| elif media_type == 'pdf': | |
| return f"📄 PDF: {result.get('page_count', 0)} صفحة - النص: {result.get('text_preview', '')[:100]}..." | |
| elif media_type == 'audio': | |
| return f"🎵 صوت: {result.get('transcript', 'لا يوجد نص معروف')}" | |
| elif media_type == 'video': | |
| return f"🎥 فيديو: {result.get('duration', 'مجهول')} ثانية - الدقة: {result.get('resolution', 'مجهول')}" | |
| elif media_type == 'link': | |
| return f"🔗 رابط: {result.get('link_type', 'عام')} - النطاق: {result.get('domain', '')}" | |
| return f"📎 وسائط: {media_type}" | |
| # ============= فئة مساعدة للتحليل السريع ============= | |
| class QuickAnalyzer: | |
| """محلل سريع للاستخدام العام""" | |
| def quick_analyze(url: str) -> Dict[str, Any]: | |
| """تحليل سريع بدون تحميل الملف""" | |
| try: | |
| fixed_url = fix_url(url) | |
| media_type = detect_media_type(fixed_url) | |
| return { | |
| 'success': True, | |
| 'url': fixed_url, | |
| 'type': media_type, | |
| 'domain': extract_domain(fixed_url), | |
| 'is_valid': is_valid_url(fixed_url) | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'url': url, | |
| 'error': str(e) | |
| } | |
| # ============= إنشاء نسخ عالمية ============= | |
| media_analyzer = MediaAnalyzer() | |
| quick_analyzer = QuickAnalyzer() | |
| url_validator = URLValidator() | |
| # ============= اختبارات ============= | |
| # analyzer.py - أضف في نهاية الملف | |
| def analyze_media_async(media_data): | |
| """نسخة غير متزامنة من تحليل الوسائط""" | |
| return analyze_media(media_data) | |
| def get_media_summary(analysis_result): | |
| """إنشاء ملخص للوسائط""" | |
| return {"summary": "ملخص الوسائط"} | |
| def extract_urls_from_text(text): | |
| """استخراج الروابط من النص""" | |
| import re | |
| return re.findall(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', text) | |
| def analyze_url_type(url): | |
| """تحليل نوع الرابط""" | |
| return {"type": "unknown", "status": "not_analyzed"} | |
| def fix_url(url): | |
| """إصلاح الرابط""" | |
| return url | |
| def is_valid_url(url): | |
| """التحقق من صحة الرابط""" | |
| import re | |
| return bool(re.match(r'^https?://', url)) | |
| if __name__ == "__main__": | |
| # اختبارات متنوعة | |
| test_urls = [ | |
| "https://example.com/image.jpg", | |
| "https://example.com/document.pdf", | |
| "https://www.youtube.com/watch?v=example", | |
| "github.com/example/repo", # رابط ناقص | |
| "example.com/video.mp4" # رابط ناقص | |
| ] | |
| print("=== اختبار نظام تحليل الوسائط ===") | |
| print(f"✅ الدوال المتاحة: fix_url, detect_media_type, is_valid_url") | |
| for url in test_urls: | |
| print(f"\nتحليل: {url}") | |
| try: | |
| # اختبار الدوال الجديدة | |
| fixed = fix_url(url) | |
| media_type = detect_media_type(fixed) | |
| valid = is_valid_url(fixed) | |
| print(f"📝 الرابط المصلح: {fixed}") | |
| print(f"🔍 نوع الوسائط: {media_type}") | |
| print(f"✅ الرابط صالح: {valid}") | |
| # تحليل مفصل إذا كان الرابط صالحاً | |
| if valid: | |
| result = analyze_media(fixed) | |
| summary = get_media_summary(fixed) | |
| print(f"📊 النتيجة: {summary}") | |
| except Exception as e: | |
| print(f"❌ خطأ: {e}") | |
| # اختبار استخراج الروابط من النص | |
| test_text = "راجع هذا الموقع https://example.com وصفحة www.github.com للمزيد" | |
| print(f"\n📝 اختبار استخراج الروابط من النص:") | |
| print(f"النص: {test_text}") | |
| urls = extract_urls_from_text(test_text) | |
| print(f"الروابط المستخرجة: {urls}") | |
| print("\n✅ جميع الدوال المطلوبة جاهزة للاستخدام!") |