|
|
""" |
|
|
نظام RAG بسيط لشركة NBTEL |
|
|
يحول ملف PDF إلى قاعدة معرفة قابلة للبحث |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import re |
|
|
from typing import List, Dict, Any |
|
|
from sentence_transformers import SentenceTransformer |
|
|
import numpy as np |
|
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
import pickle |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
class SimpleRAG: |
|
|
"""نظام RAG بسيط بدون مكتبات معقدة""" |
|
|
|
|
|
def __init__(self): |
|
|
"""تهيئة النظام""" |
|
|
print("🔄 جاري تحميل نموذج التضمين...") |
|
|
|
|
|
|
|
|
self.model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') |
|
|
|
|
|
self.documents = [] |
|
|
self.embeddings = None |
|
|
|
|
|
|
|
|
os.makedirs("./data", exist_ok=True) |
|
|
os.makedirs("./rag_index", exist_ok=True) |
|
|
|
|
|
print("✅ تم تحميل النظام بنجاح") |
|
|
|
|
|
def load_markdown_file(self, file_path: str) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
تحميل ملف Markdown وتقسيمه إلى أقسام |
|
|
|
|
|
Args: |
|
|
file_path: مسار الملف |
|
|
|
|
|
Returns: |
|
|
قائمة الأقسام |
|
|
""" |
|
|
documents = [] |
|
|
|
|
|
try: |
|
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
|
|
|
print(f"📄 قراءة الملف، الحجم: {len(content)} حرف") |
|
|
|
|
|
|
|
|
sections = re.split(r'\n(#{1,3}\s+.*?)\n', content) |
|
|
|
|
|
current_title = "مقدمة" |
|
|
current_content = "" |
|
|
|
|
|
for i, section in enumerate(sections): |
|
|
section = section.strip() |
|
|
if not section: |
|
|
continue |
|
|
|
|
|
|
|
|
if section.startswith('#'): |
|
|
|
|
|
if current_content.strip(): |
|
|
documents.append({ |
|
|
'title': current_title, |
|
|
'content': current_content.strip(), |
|
|
'source': 'nbtel_profile', |
|
|
'section_type': 'main' |
|
|
}) |
|
|
|
|
|
|
|
|
current_title = section.replace('#', '').strip() |
|
|
current_content = "" |
|
|
else: |
|
|
|
|
|
current_content += section + "\n" |
|
|
|
|
|
|
|
|
if current_content.strip(): |
|
|
documents.append({ |
|
|
'title': current_title, |
|
|
'content': current_content.strip(), |
|
|
'source': 'nbtel_profile', |
|
|
'section_type': 'main' |
|
|
}) |
|
|
|
|
|
|
|
|
final_docs = [] |
|
|
for doc in documents: |
|
|
if len(doc['content']) > 1500: |
|
|
chunks = self._split_long_text(doc['content'], max_length=1200) |
|
|
for i, chunk in enumerate(chunks): |
|
|
if chunk.strip(): |
|
|
final_docs.append({ |
|
|
'title': f"{doc['title']} - جزء {i+1}", |
|
|
'content': chunk, |
|
|
'source': doc['source'], |
|
|
'section_type': 'chunk' |
|
|
}) |
|
|
else: |
|
|
if doc['content'].strip(): |
|
|
final_docs.append(doc) |
|
|
|
|
|
|
|
|
final_docs = [doc for doc in final_docs if len(doc['content'].strip()) > 50] |
|
|
|
|
|
print(f"✅ تم تحميل {len(final_docs)} قسم من الملف") |
|
|
return final_docs |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ خطأ في تحميل الملف: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def _split_long_text(self, text: str, max_length: int = 800) -> List[str]: |
|
|
"""تقسيم النص الطويل إلى أجزاء""" |
|
|
|
|
|
|
|
|
paragraphs = text.split('\n\n') |
|
|
chunks = [] |
|
|
current_chunk = "" |
|
|
|
|
|
for para in paragraphs: |
|
|
if len(current_chunk + para) < max_length: |
|
|
current_chunk += para + "\n\n" |
|
|
else: |
|
|
if current_chunk.strip(): |
|
|
chunks.append(current_chunk.strip()) |
|
|
current_chunk = para + "\n\n" |
|
|
|
|
|
if current_chunk.strip(): |
|
|
chunks.append(current_chunk.strip()) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def add_documents(self, documents: List[Dict[str, Any]]): |
|
|
"""إضافة مستندات جديدة""" |
|
|
|
|
|
print(f"🔄 جاري إضافة {len(documents)} مستند...") |
|
|
|
|
|
for doc in documents: |
|
|
|
|
|
doc['id'] = len(self.documents) |
|
|
|
|
|
|
|
|
content = doc['content'] |
|
|
content = re.sub(r'\s+', ' ', content) |
|
|
content = content.strip() |
|
|
|
|
|
doc['content'] = content |
|
|
doc['content_length'] = len(content) |
|
|
|
|
|
self.documents.append(doc) |
|
|
|
|
|
print(f"✅ تم إضافة {len(documents)} مستند") |
|
|
|
|
|
def build_index(self): |
|
|
"""بناء فهرس البحث""" |
|
|
|
|
|
if not self.documents: |
|
|
print("⚠️ لا توجد مستندات") |
|
|
return |
|
|
|
|
|
print(f"🔄 جاري بناء فهرس لـ {len(self.documents)} مستند...") |
|
|
|
|
|
|
|
|
texts = [] |
|
|
for doc in self.documents: |
|
|
|
|
|
search_text = f"{doc['title']}\n{doc['content']}" |
|
|
texts.append(search_text) |
|
|
|
|
|
|
|
|
self.embeddings = self.model.encode(texts, show_progress_bar=True) |
|
|
|
|
|
print(f"✅ تم بناء الفهرس - {len(self.documents)} مستند") |
|
|
|
|
|
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
البحث في المستندات |
|
|
|
|
|
Args: |
|
|
query: الاستعلام |
|
|
top_k: عدد النتائج |
|
|
|
|
|
Returns: |
|
|
النتائج مرتبة حسب الصلة |
|
|
""" |
|
|
|
|
|
if self.embeddings is None: |
|
|
print("⚠️ لم يتم بناء الفهرس") |
|
|
return [] |
|
|
|
|
|
|
|
|
query_embedding = self.model.encode([query]) |
|
|
|
|
|
|
|
|
similarities = cosine_similarity(query_embedding, self.embeddings)[0] |
|
|
|
|
|
|
|
|
top_indices = np.argsort(similarities)[::-1][:top_k] |
|
|
|
|
|
results = [] |
|
|
for i, idx in enumerate(top_indices): |
|
|
if similarities[idx] > 0.1: |
|
|
doc = self.documents[idx].copy() |
|
|
doc['similarity_score'] = float(similarities[idx]) |
|
|
doc['rank'] = i + 1 |
|
|
results.append(doc) |
|
|
|
|
|
return results |
|
|
|
|
|
def get_context_for_query(self, query: str, max_results: int = 3) -> str: |
|
|
""" |
|
|
الحصول على السياق للاستعلام |
|
|
|
|
|
Args: |
|
|
query: الاستعلام |
|
|
max_results: أقصى عدد نتائج |
|
|
|
|
|
Returns: |
|
|
النص المنسق للسياق |
|
|
""" |
|
|
|
|
|
results = self.search(query, top_k=max_results) |
|
|
|
|
|
if not results: |
|
|
return "لم أجد معلومات ذات صلة في قاعدة المعرفة." |
|
|
|
|
|
context = "معلومات من قاعدة المعرفة:\n\n" |
|
|
|
|
|
for result in results: |
|
|
score = result['similarity_score'] |
|
|
title = result['title'] |
|
|
content = result['content'] |
|
|
|
|
|
|
|
|
if len(content) > 500: |
|
|
content = content[:500] + "..." |
|
|
|
|
|
context += f"📄 **{title}** (درجة الصلة: {score:.2f}):\n" |
|
|
context += f"{content}\n\n" |
|
|
|
|
|
return context |
|
|
|
|
|
def save_index(self, path: str = "./rag_index"): |
|
|
"""حفظ الفهرس""" |
|
|
|
|
|
path = Path(path) |
|
|
path.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
with open(path / "documents.pkl", "wb") as f: |
|
|
pickle.dump(self.documents, f) |
|
|
|
|
|
|
|
|
if self.embeddings is not None: |
|
|
np.save(path / "embeddings.npy", self.embeddings) |
|
|
|
|
|
|
|
|
info = { |
|
|
"num_documents": len(self.documents), |
|
|
"embedding_dim": self.embeddings.shape[1] if self.embeddings is not None else 0, |
|
|
"model_name": self.model.get_sentence_embedding_dimension() |
|
|
} |
|
|
|
|
|
with open(path / "info.json", "w", encoding="utf-8") as f: |
|
|
json.dump(info, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
print(f"✅ تم حفظ الفهرس في {path}") |
|
|
|
|
|
def load_index(self, path: str = "./rag_index") -> bool: |
|
|
"""تحميل فهرس محفوظ""" |
|
|
|
|
|
path = Path(path) |
|
|
|
|
|
if not path.exists(): |
|
|
print(f"⚠️ لم يتم العثور على فهرس في {path}") |
|
|
return False |
|
|
|
|
|
try: |
|
|
|
|
|
with open(path / "documents.pkl", "rb") as f: |
|
|
self.documents = pickle.load(f) |
|
|
|
|
|
|
|
|
embeddings_path = path / "embeddings.npy" |
|
|
if embeddings_path.exists(): |
|
|
self.embeddings = np.load(embeddings_path) |
|
|
|
|
|
print(f"✅ تم تحميل الفهرس - {len(self.documents)} مستند") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
print(f"❌ خطأ في تحميل الفهرس: {str(e)}") |
|
|
return False |
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]: |
|
|
"""إحصائيات النظام""" |
|
|
|
|
|
if not self.documents: |
|
|
return {"message": "لا توجد مستندات"} |
|
|
|
|
|
total_chars = sum(doc['content_length'] for doc in self.documents) |
|
|
avg_length = total_chars / len(self.documents) |
|
|
|
|
|
return { |
|
|
"total_documents": len(self.documents), |
|
|
"total_characters": total_chars, |
|
|
"average_document_length": round(avg_length, 1), |
|
|
"index_built": self.embeddings is not None, |
|
|
"embedding_dimension": self.embeddings.shape[1] if self.embeddings is not None else 0 |
|
|
} |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""الدالة الرئيسية لاختبار النظام""" |
|
|
|
|
|
|
|
|
rag = SimpleRAG() |
|
|
|
|
|
|
|
|
if not rag.load_index(): |
|
|
print("🔄 إنشاء فهرس جديد...") |
|
|
|
|
|
|
|
|
file_path = "./data/nbtel_company_profile.md" |
|
|
if os.path.exists(file_path): |
|
|
documents = rag.load_markdown_file(file_path) |
|
|
rag.add_documents(documents) |
|
|
rag.build_index() |
|
|
rag.save_index() |
|
|
else: |
|
|
print(f"❌ لم يتم العثور على الملف: {file_path}") |
|
|
return |
|
|
|
|
|
|
|
|
stats = rag.get_stats() |
|
|
print(f"\n📊 إحصائيات النظام:") |
|
|
for key, value in stats.items(): |
|
|
print(f" {key}: {value}") |
|
|
|
|
|
|
|
|
print(f"\n🔍 اختبار البحث:") |
|
|
|
|
|
test_queries = [ |
|
|
"ما هي خدمات الشركة؟", |
|
|
"كيف أتواصل مع الدعم الفني؟", |
|
|
"ما هي أسعار الباقات؟", |
|
|
"مشكلة في الواي فاي", |
|
|
"معلومات عن الشركة" |
|
|
] |
|
|
|
|
|
for query in test_queries: |
|
|
print(f"\n❓ السؤال: {query}") |
|
|
context = rag.get_context_for_query(query, max_results=2) |
|
|
print(f"📋 السياق:") |
|
|
print(context[:300] + "..." if len(context) > 300 else context) |
|
|
print("-" * 50) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |