""" 🤖 نظام RAG كامل للمستندات - نسخة HuggingFace Spaces 🎯 إصدار نظيف بدون أي تبعيات لـ Google Colab 📚 يدعم العربية والإنجليزية - معالجة ملفات PDF كبيرة """ # ==================== 1️⃣ استيراد المكتبات ==================== import os import sys import numpy as np import faiss import nltk from pypdf import PdfReader from sentence_transformers import SentenceTransformer from nltk.tokenize import word_tokenize import pickle import warnings warnings.filterwarnings('ignore') # ==================== 2️⃣ تحميل بيانات NLTK (مرة واحدة) ==================== def download_nltk_resources(): """تحميل موارد NLTK المطلوبة""" try: nltk.download('punkt', quiet=True) nltk.download('punkt_tab', quiet=True) print("✅ موارد NLTK جاهزة") except Exception as e: print(f"⚠️ ملاحظة: بعض موارد NLTK غير متوفرة: {e}") # ==================== 3️⃣ فئات النظام الأساسية ==================== class PDFProcessor: """معالج PDF ذكي""" def __init__(self, chunk_size=350, overlap=70): self.chunk_size = chunk_size self.overlap = overlap def read_pdf(self, pdf_path): """قراءة PDF واستخراج النص""" print(f"📖 جاري قراءة: {os.path.basename(pdf_path)}") try: reader = PdfReader(pdf_path) total_pages = len(reader.pages) pages_data = [] for i in range(total_pages): try: page = reader.pages[i] text = page.extract_text() if text and text.strip(): pages_data.append({ 'page_num': i + 1, 'text': text.strip(), 'char_count': len(text) }) # عرض التقدم if (i + 1) % 100 == 0 or i == total_pages - 1: print(f" 📄 تمت {i + 1}/{total_pages} صفحة") except Exception as page_error: print(f" ⚠️ خطأ في صفحة {i+1}: {page_error}") continue print(f"✅ تم قراءة {len(pages_data)} صفحة تحتوي على نص") if pages_data: total_chars = sum(p['char_count'] for p in pages_data) total_words = sum(len(p['text'].split()) for p in pages_data) print(f" 📊 إجمالي الأحرف: {total_chars:,}") print(f" 📊 إجمالي الكلمات: {total_words:,}") return pages_data except Exception as e: print(f"❌ فشل في قراءة PDF: {e}") return [] def chunk_text(self, pages_data): """تقسيم النص إلى أجزاء ذكية""" print(f"✂️ جاري تقسيم النص إلى أجزاء...") all_chunks = [] chunk_id = 0 for page in pages_data: text = page['text'] page_num = page['page_num'] # استخدام تقسيم بسيط للكلمات words = text.split() if len(words) == 0: continue # تقسيم النص مع التداخل start = 0 while start < len(words): end = start + self.chunk_size chunk_words = words[start:end] if chunk_words: chunk_text = ' '.join(chunk_words) all_chunks.append({ 'chunk_id': chunk_id, 'text': chunk_text, 'page': page_num, 'word_count': len(chunk_words), 'start_word': start, 'end_word': min(end, len(words)) }) chunk_id += 1 start += self.chunk_size - self.overlap print(f"✅ تم إنشاء {len(all_chunks)} جزء نصي") if all_chunks: avg_words = sum(c['word_count'] for c in all_chunks) // len(all_chunks) print(f" 📊 متوسط الكلمات لكل جزء: {avg_words}") return all_chunks class VectorStore: """مخزن المتجهات باستخدام FAISS""" def __init__(self, model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"): self.model_name = model_name self.model = None self.index = None self.chunks = None self.embeddings = None def load_model(self): """تحميل نموذج Embeddings""" print(f"🚀 جاري تحميل نموذج: {self.model_name}") try: self.model = SentenceTransformer(self.model_name) print(f"✅ تم تحميل النموذج بنجاح") print(f" 📏 أبعاد المتجهات: {self.model.get_sentence_embedding_dimension()}") return True except Exception as e: print(f"❌ خطأ في تحميل النموذج: {e}") return False def create_embeddings(self, chunks): """إنشاء Embeddings للنصوص""" print(f"🧠 جاري إنشاء Embeddings لـ {len(chunks)} جزء...") self.chunks = chunks chunk_texts = [chunk['text'] for chunk in chunks] try: self.embeddings = self.model.encode( chunk_texts, show_progress_bar=True, normalize_embeddings=True, batch_size=32, convert_to_numpy=True ) print(f"✅ تم إنشاء {len(self.embeddings)} متجه embedding") return True except Exception as e: print(f"❌ خطأ في إنشاء Embeddings: {e}") return False def build_index(self): """بناء فهرس FAISS""" if self.embeddings is None: print("❌ لا توجد embeddings لبناء الفهرس") return False print("🔧 جاري بناء Vector Store...") try: dimension = self.embeddings.shape[1] self.index = faiss.IndexFlatIP(dimension) # تطبيع وإضافة المتجهات faiss.normalize_L2(self.embeddings) self.index.add(self.embeddings) print(f"✅ تم بناء Vector Store: {self.index.ntotal} متجه") return True except Exception as e: print(f"❌ خطأ في بناء الفهرس: {e}") return False def save_index(self, path="vector_store"): """حفظ الفهرس للاستخدام المستقبلي""" try: # حفظ الفهرس faiss.write_index(self.index, f"{path}.faiss") # حفظ البيانات النصية with open(f"{path}_chunks.pkl", "wb") as f: pickle.dump(self.chunks, f) print(f"💾 تم حفظ الفهرس والبيانات في: {path}") return True except Exception as e: print(f"❌ خطأ في حفظ الفهرس: {e}") return False def load_index(self, path="vector_store"): """تحميل الفهرس المحفوظ""" try: # تحميل الفهرس self.index = faiss.read_index(f"{path}.faiss") # تحميل البيانات النصية with open(f"{path}_chunks.pkl", "rb") as f: self.chunks = pickle.load(f) print(f"📂 تم تحميل الفهرس: {self.index.ntotal} متجه") print(f"📂 تم تحميل البيانات: {len(self.chunks)} جزء") return True except Exception as e: print(f"❌ خطأ في تحميل الفهرس: {e}") return False def search(self, query, top_k=5, similarity_threshold=0.25): """بحث دلالي في المستندات""" if self.index is None or self.model is None or self.chunks is None: print("❌ النظام غير مهيء للبحث") return [] # إنشاء embedding للاستعلام query_embedding = self.model.encode([query], normalize_embeddings=True) # البحث عن عدد أكبر ثم تصفية search_k = top_k * 3 scores, indices = self.index.search(query_embedding, search_k) # تجميع النتائج المؤهلة results = [] for i, (score, idx) in enumerate(zip(scores[0], indices[0])): # التحقق من أن الفهرس صالح والتشابه مقبول if 0 <= idx < len(self.chunks) and score >= similarity_threshold: chunk = self.chunks[idx] results.append({ 'rank': len(results) + 1, 'score': float(score), 'similarity_percent': f"{score * 100:.1f}%", 'similarity_raw': score, 'text': chunk['text'], 'page': chunk['page'], 'word_count': chunk['word_count'], 'preview': chunk['text'][:150] + "..." if len(chunk['text']) > 150 else chunk['text'] }) # التوقف عند الوصول إلى العدد المطلوب if len(results) >= top_k: break return results class RAGSystem: """النظام الرئيسي RAG""" def __init__(self): self.processor = PDFProcessor() self.vector_store = VectorStore() self.is_ready = False def initialize(self): """تهيئة النظام""" print("=" * 60) print("🤖 نظام RAG للمستندات الذكي") print("=" * 60) # تحميل موارد NLTK download_nltk_resources() # تحميل نموذج Embeddings if not self.vector_store.load_model(): return False self.is_ready = True return True def process_pdf(self, pdf_path): """معالجة ملف PDF جديد""" if not self.is_ready: print("❌ النظام غير مهيء") return False # قراءة PDF pages_data = self.processor.read_pdf(pdf_path) if not pages_data: return False # تقسيم النص chunks = self.processor.chunk_text(pages_data) if not chunks: return False # إنشاء embeddings وفهرس if not self.vector_store.create_embeddings(chunks): return False if not self.vector_store.build_index(): return False print("✨ تم معالجة المستند بنجاح!") return True def ask_question(self, question, top_k=3, similarity_threshold=0.25): """طرح سؤال على النظام""" if not self.is_ready or self.vector_store.index is None: return { 'success': False, 'error': 'النظام غير مهيء. يرجى معالجة مستند أولاً.', 'results': [] } print(f"\n🔍 البحث عن: '{question}'") # البحث في المستند results = self.vector_store.search(question, top_k, similarity_threshold) if not results: return { 'success': False, 'error': 'لم أجد نتائج ذات صلة في المستند.', 'results': [] } # تقييم جودة النتائج evaluation = self._evaluate_results(results) return { 'success': True, 'question': question, 'results': results, 'evaluation': evaluation, 'total_results': len(results), 'best_similarity': results[0]['similarity_percent'] if results else "0%" } def _evaluate_results(self, results): """تقييم جودة نتائج البحث""" if not results: return "❌ لا توجد نتائج للتقييم" # حساب متوسط التشابه similarities = [r['similarity_raw'] for r in results] avg_similarity = sum(similarities) / len(similarities) * 100 # تحديد الجودة if avg_similarity >= 50: quality = "ممتازة 🏆" emoji = "✅" elif avg_similarity >= 40: quality = "جيدة 👍" emoji = "✓" elif avg_similarity >= 30: quality = "متوسطة ⚠️" emoji = "~" else: quality = "ضعيفة ❌" emoji = "✗" # حساب تغطية الصفحات unique_pages = len(set(r['page'] for r in results)) evaluation = f""" 📊 **تقرير التقييم:** {emoji} **الجودة:** {quality} 📈 **متوسط التشابه:** {avg_similarity:.1f}% 🔢 **أفضل نتيجة:** {results[0]['similarity_percent']} 📖 **صفحات مختلفة:** {unique_pages} 📝 **إجمالي النتائج:** {len(results)} """ return evaluation def save_state(self, path="rag_system_state"): """حفظ حالة النظام""" return self.vector_store.save_index(path) def load_state(self, path="rag_system_state"): """تحميل حالة النظام""" if not self.vector_store.load_model(): return False if self.vector_store.load_index(path): self.is_ready = True return True return False # ==================== 4️⃣ واجهة Streamlit لـ HuggingFace ==================== def create_streamlit_app(): """إنشاء واجهة ويب باستخدام Streamlit""" try: import streamlit as st from streamlit.runtime.uploaded_file_manager import UploadedFile # إعداد صفحة Streamlit st.set_page_config( page_title="نظام RAG الذكي للمستندات", page_icon="🤖", layout="wide" ) # CSS مخصص st.markdown(""" """, unsafe_allow_html=True) # العنوان الرئيسي st.markdown("""

🤖 نظام RAG الذكي للمستندات

بحث ذكي في ملفات PDF - يدعم العربية والإنجليزية

""", unsafe_allow_html=True) # تهيئة النظام في حالة الجلسة if 'rag_system' not in st.session_state: with st.spinner("🚀 جاري تهيئة النظام..."): st.session_state.rag_system = RAGSystem() if st.session_state.rag_system.initialize(): st.success("✅ تم تهيئة النظام بنجاح!") else: st.error("❌ فشل في تهيئة النظام") return rag_system = st.session_state.rag_system # الشريط الجانبي with st.sidebar: st.header("⚙️ الإعدادات") # تحميل ملف PDF st.subheader("📁 رفع ملف PDF") uploaded_file = st.file_uploader( "اختر ملف PDF", type=["pdf"], help="يمكنك رفع أي ملف PDF للبحث فيه" ) if uploaded_file is not None: # حفظ الملف المؤقت temp_path = f"temp_{uploaded_file.name}" with open(temp_path, "wb") as f: f.write(uploaded_file.getbuffer()) # معالجة الملف if st.button("🚀 معالجة المستند", type="primary"): with st.spinner("جاري معالجة المستند..."): if rag_system.process_pdf(temp_path): st.success(f"✅ تم معالجة: {uploaded_file.name}") st.session_state.processed_file = uploaded_file.name else: st.error("❌ فشل في معالجة الملف") # إعدادات البحث st.subheader("🔍 إعدادات البحث") top_k = st.slider("عدد النتائج", 1, 10, 3) similarity_threshold = st.slider("عتبة التشابه", 0.0, 1.0, 0.25, 0.05) # معلومات النظام st.subheader("📊 معلومات النظام") if rag_system.is_ready and rag_system.vector_store.chunks: st.info(f"📄 الأجزاء النصية: {len(rag_system.vector_store.chunks)}") st.info(f"🧮 المتجهات: {rag_system.vector_store.index.ntotal if rag_system.vector_store.index else 0}") # المنطقة الرئيسية col1, col2 = st.columns([2, 1]) with col1: st.header("💬 اسأل عن المستند") # حقل إدخال السؤال question = st.text_area( "اكتب سؤالك هنا", placeholder="مثال: ما هي حالة التدفق؟ أو What is flow state?", height=100 ) # أزرار الأسئلة السريعة st.subheader("💡 أسئلة سريعة") quick_questions = [ "ما هي حالة التدفق؟", "What is flow state?", "ما هي عناصر التجربة المثلى؟", "كيف يحقق الإنسان السعادة في العمل؟" ] cols = st.columns(4) for idx, q in enumerate(quick_questions): if cols[idx].button(q, use_container_width=True): question = q with col2: st.header("🎯 نصائح البحث") st.info(""" **للحصول على أفضل النتائج:** 1. استخدم مصطلحات محددة 2. جرب اللغتين (عربي/إنجليزي) 3. اطرح أسئلة واضحة 4. استخدم مصطلحات الكتاب **مثال:** ✅ "ما هي خصائص flow state؟" ❌ "شرح لي" """) # زر البحث if st.button("🔍 ابحث في المستند", type="primary", use_container_width=True): if not question: st.warning("⚠️ يرجى إدخال سؤال") elif not (rag_system.is_ready and rag_system.vector_store.chunks): st.error("❌ يرجى معالجة مستند أولاً") else: with st.spinner("جاري البحث..."): result = rag_system.ask_question( question, top_k=top_k, similarity_threshold=similarity_threshold ) if result['success']: # عرض التقييم with st.expander("📊 تقرير التقييم", expanded=True): st.markdown(result['evaluation']) # عرض النتائج st.subheader(f"📄 النتائج ({len(result['results'])})") for r in result['results']: # تحديد لون التشابه similarity = r['similarity_raw'] if similarity >= 0.5: sim_class = "similarity-high" elif similarity >= 0.3: sim_class = "similarity-medium" else: sim_class = "similarity-low" # عرض البطاقة with st.container(): st.markdown(f"""

🏆 النتيجة #{r['rank']}

التشابه: {r['similarity_percent']} | 📖 الصفحة: {r['page']} | 🔢 الكلمات: {r['word_count']}


{r['text']}

""", unsafe_allow_html=True) else: st.error(result['error']) # قسم الأمثلة التوضيحية with st.expander("📖 أمثلة توضيحية", expanded=False): st.markdown(""" **مستند كتاب Flow:** - "ما هي حالة التدفق flow state؟" - "What are the characteristics of optimal experience?" - "كيف يرتبط التحدي بالمهارة في نظرية التدفق؟" **مستندات أخرى:** - "ما هو الموضوع الرئيسي؟" - "ما هي النقاط المهمة؟" - "هل هناك أمثلة عملية؟" """) # تذييل الصفحة st.markdown("---") st.markdown("""

🤖 نظام RAG للمستندات | إصدار HuggingFace

تقنية: FAISS + Sentence Transformers + Streamlit

""", unsafe_allow_html=True) except ImportError: print("⚠️ Streamlit غير مثبت. لتشغيل الواجهة:") print(" pip install streamlit") print(" streamlit run app.py") # ==================== 5️⃣ التشغيل الرئيسي ==================== def main_cli(): """واجهة سطر الأوامر""" print("=" * 60) print("🤖 نظام RAG للمستندات - واجهة سطر الأوامر") print("=" * 60) # إنشاء النظام rag_system = RAGSystem() # تهيئة النظام if not rag_system.initialize(): print("❌ فشل في تهيئة النظام") return # قائمة الأوامر commands = """ 🎮 الأوامر المتاحة: 1. معالجة - معالجة ملف PDF جديد 2. بحث - البحث في المستند 3. حفظ - حفظ حالة النظام 4. تحميل - تحميل حالة محفوظة 5. خروج - إنهاء البرنامج 6. ويب - تشغيل واجهة الويب (يتطلب Streamlit) """ while True: print("\n" + commands) command = input("\n📝 أدخل الأمر: ").strip().lower() if command in ['خروج', 'exit', '5']: print("👋 مع السلامة!") break elif command in ['معالجة', '1']: pdf_path = input("📁 أدخل مسار ملف PDF: ").strip() if os.path.exists(pdf_path): rag_system.process_pdf(pdf_path) else: print(f"❌ الملف غير موجود: {pdf_path}") elif command in ['بحث', '2']: if not rag_system.is_ready or rag_system.vector_store.index is None: print("❌ يرجى معالجة مستند أولاً") continue question = input("🧠 أدخل سؤالك: ").strip() if question: result = rag_system.ask_question(question) if result['success']: print(result['evaluation']) for r in result['results']: print(f"\n🏆 النتيجة #{r['rank']}") print(f" 📈 التشابه: {r['similarity_percent']}") print(f" 📖 الصفحة: {r['page']}") print(f" 📝 المحتوى: {r['text'][:200]}...") else: print(result['error']) elif command in ['حفظ', '3']: path = input("💾 أدخل اسم الملف للحفظ (دون امتداد): ").strip() if path: rag_system.save_state(path) elif command in ['تحميل', '4']: path = input("📂 أدخل اسم الملف للتحميل (دون امتداد): ").strip() if path: rag_system.load_state(path) elif command in ['ويب', '6']: print("🌐 جاري تشغيل واجهة الويب...") print(" تأكد من تثبيت Streamlit أولاً: pip install streamlit") print(" ثم شغل: streamlit run app.py") break else: print("⚠️ أمر غير معروف") # ==================== 6️⃣ نقطة الدخول الرئيسية ==================== if __name__ == "__main__": # اختيار وضع التشغيل print("=" * 60) print("🎮 اختر وضع التشغيل:") print("1. واجهة سطر الأوامر (CLI)") print("2. واجهة الويب (يتطلب Streamlit)") print("=" * 60) try: choice = input("📝 أدخل رقم الخيار: ").strip() if choice == "1": main_cli() elif choice == "2": create_streamlit_app() else: print("⚠️ خيار غير صحيح، تشغيل CLI...") main_cli() except KeyboardInterrupt: print("\n\n👋 تم إيقاف البرنامج") except Exception as e: print(f"\n❌ حدث خطأ: {e}")