chatbox / simple_rag.py
anaspro
🔧 إصلاحات مهمة للـ deployment
df14f5f
raw
history blame
13.2 kB
"""
نظام RAG بسيط لشركة NBTEL
يحول ملف PDF إلى قاعدة معرفة قابلة للبحث
"""
import os
import json
import re
from typing import List, Dict, Any
from sentence_transformers import SentenceTransformer
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
import pickle
from pathlib import Path
class SimpleRAG:
"""نظام RAG بسيط بدون مكتبات معقدة"""
def __init__(self):
"""تهيئة النظام"""
print("🔄 جاري تحميل نموذج التضمين...")
# استخدام نموذج صغير وسريع
self.model = SentenceTransformer('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
self.documents = []
self.embeddings = None
# مجلدات البيانات
os.makedirs("./data", exist_ok=True)
os.makedirs("./rag_index", exist_ok=True)
print("✅ تم تحميل النظام بنجاح")
def load_markdown_file(self, file_path: str) -> List[Dict[str, Any]]:
"""
تحميل ملف Markdown وتقسيمه إلى أقسام
Args:
file_path: مسار الملف
Returns:
قائمة الأقسام
"""
documents = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
print(f"📄 قراءة الملف، الحجم: {len(content)} حرف")
# تقسيم بسيط حسب العناوين
sections = re.split(r'\n(#{1,3}\s+.*?)\n', content)
current_title = "مقدمة"
current_content = ""
for i, section in enumerate(sections):
section = section.strip()
if not section:
continue
# إذا كان عنوان (يبدأ بـ #)
if section.startswith('#'):
# حفظ القسم السابق
if current_content.strip():
documents.append({
'title': current_title,
'content': current_content.strip(),
'source': 'nbtel_profile',
'section_type': 'main'
})
# بدء قسم جديد
current_title = section.replace('#', '').strip()
current_content = ""
else:
# إضافة المحتوى للقسم الحالي
current_content += section + "\n"
# إضافة القسم الأخير
if current_content.strip():
documents.append({
'title': current_title,
'content': current_content.strip(),
'source': 'nbtel_profile',
'section_type': 'main'
})
# تقسيم إضافي للأقسام الطويلة جداً
final_docs = []
for doc in documents:
if len(doc['content']) > 1500:
chunks = self._split_long_text(doc['content'], max_length=1200)
for i, chunk in enumerate(chunks):
if chunk.strip(): # تأكد من وجود محتوى
final_docs.append({
'title': f"{doc['title']} - جزء {i+1}",
'content': chunk,
'source': doc['source'],
'section_type': 'chunk'
})
else:
if doc['content'].strip(): # تأكد من وجود محتوى
final_docs.append(doc)
# إزالة المستندات الفارغة
final_docs = [doc for doc in final_docs if len(doc['content'].strip()) > 50]
print(f"✅ تم تحميل {len(final_docs)} قسم من الملف")
return final_docs
except Exception as e:
print(f"❌ خطأ في تحميل الملف: {str(e)}")
return []
def _split_long_text(self, text: str, max_length: int = 800) -> List[str]:
"""تقسيم النص الطويل إلى أجزاء"""
# تقسيم حسب الفقرات أولاً
paragraphs = text.split('\n\n')
chunks = []
current_chunk = ""
for para in paragraphs:
if len(current_chunk + para) < max_length:
current_chunk += para + "\n\n"
else:
if current_chunk.strip():
chunks.append(current_chunk.strip())
current_chunk = para + "\n\n"
if current_chunk.strip():
chunks.append(current_chunk.strip())
return chunks
def add_documents(self, documents: List[Dict[str, Any]]):
"""إضافة مستندات جديدة"""
print(f"🔄 جاري إضافة {len(documents)} مستند...")
for doc in documents:
# إضافة معرف فريد
doc['id'] = len(self.documents)
# تنظيف وتحسين المحتوى
content = doc['content']
content = re.sub(r'\s+', ' ', content) # إزالة المسافات الزائدة
content = content.strip()
doc['content'] = content
doc['content_length'] = len(content)
self.documents.append(doc)
print(f"✅ تم إضافة {len(documents)} مستند")
def build_index(self):
"""بناء فهرس البحث"""
if not self.documents:
print("⚠️ لا توجد مستندات")
return
print(f"🔄 جاري بناء فهرس لـ {len(self.documents)} مستند...")
# استخراج النصوص
texts = []
for doc in self.documents:
# دمج العنوان والمحتوى للبحث الأفضل
search_text = f"{doc['title']}\n{doc['content']}"
texts.append(search_text)
# حساب التضمينات
self.embeddings = self.model.encode(texts, show_progress_bar=True)
print(f"✅ تم بناء الفهرس - {len(self.documents)} مستند")
def search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]:
"""
البحث في المستندات
Args:
query: الاستعلام
top_k: عدد النتائج
Returns:
النتائج مرتبة حسب الصلة
"""
if self.embeddings is None:
print("⚠️ لم يتم بناء الفهرس")
return []
# تحويل الاستعلام إلى تضمين
query_embedding = self.model.encode([query])
# حساب التشابه
similarities = cosine_similarity(query_embedding, self.embeddings)[0]
# ترتيب النتائج
top_indices = np.argsort(similarities)[::-1][:top_k]
results = []
for i, idx in enumerate(top_indices):
if similarities[idx] > 0.1: # عتبة التشابه الدنيا
doc = self.documents[idx].copy()
doc['similarity_score'] = float(similarities[idx])
doc['rank'] = i + 1
results.append(doc)
return results
def get_context_for_query(self, query: str, max_results: int = 3) -> str:
"""
الحصول على السياق للاستعلام
Args:
query: الاستعلام
max_results: أقصى عدد نتائج
Returns:
النص المنسق للسياق
"""
results = self.search(query, top_k=max_results)
if not results:
return "لم أجد معلومات ذات صلة في قاعدة المعرفة."
context = "معلومات من قاعدة المعرفة:\n\n"
for result in results:
score = result['similarity_score']
title = result['title']
content = result['content']
# قطع النص إذا كان طويلاً جداً
if len(content) > 500:
content = content[:500] + "..."
context += f"📄 **{title}** (درجة الصلة: {score:.2f}):\n"
context += f"{content}\n\n"
return context
def save_index(self, path: str = "./rag_index"):
"""حفظ الفهرس"""
path = Path(path)
path.mkdir(exist_ok=True)
# حفظ المستندات
with open(path / "documents.pkl", "wb") as f:
pickle.dump(self.documents, f)
# حفظ التضمينات
if self.embeddings is not None:
np.save(path / "embeddings.npy", self.embeddings)
# حفظ معلومات النظام
info = {
"num_documents": len(self.documents),
"embedding_dim": self.embeddings.shape[1] if self.embeddings is not None else 0,
"model_name": self.model.get_sentence_embedding_dimension()
}
with open(path / "info.json", "w", encoding="utf-8") as f:
json.dump(info, f, ensure_ascii=False, indent=2)
print(f"✅ تم حفظ الفهرس في {path}")
def load_index(self, path: str = "./rag_index") -> bool:
"""تحميل فهرس محفوظ"""
path = Path(path)
if not path.exists():
print(f"⚠️ لم يتم العثور على فهرس في {path}")
return False
try:
# تحميل المستندات
with open(path / "documents.pkl", "rb") as f:
self.documents = pickle.load(f)
# تحميل التضمينات
embeddings_path = path / "embeddings.npy"
if embeddings_path.exists():
self.embeddings = np.load(embeddings_path)
print(f"✅ تم تحميل الفهرس - {len(self.documents)} مستند")
return True
except Exception as e:
print(f"❌ خطأ في تحميل الفهرس: {str(e)}")
return False
def get_stats(self) -> Dict[str, Any]:
"""إحصائيات النظام"""
if not self.documents:
return {"message": "لا توجد مستندات"}
total_chars = sum(doc['content_length'] for doc in self.documents)
avg_length = total_chars / len(self.documents)
return {
"total_documents": len(self.documents),
"total_characters": total_chars,
"average_document_length": round(avg_length, 1),
"index_built": self.embeddings is not None,
"embedding_dimension": self.embeddings.shape[1] if self.embeddings is not None else 0
}
def main():
"""الدالة الرئيسية لاختبار النظام"""
# إنشاء نظام RAG
rag = SimpleRAG()
# محاولة تحميل فهرس موجود
if not rag.load_index():
print("🔄 إنشاء فهرس جديد...")
# تحميل ملف الشركة
file_path = "./data/nbtel_company_profile.md"
if os.path.exists(file_path):
documents = rag.load_markdown_file(file_path)
rag.add_documents(documents)
rag.build_index()
rag.save_index()
else:
print(f"❌ لم يتم العثور على الملف: {file_path}")
return
# عرض الإحصائيات
stats = rag.get_stats()
print(f"\n📊 إحصائيات النظام:")
for key, value in stats.items():
print(f" {key}: {value}")
# اختبار البحث
print(f"\n🔍 اختبار البحث:")
test_queries = [
"ما هي خدمات الشركة؟",
"كيف أتواصل مع الدعم الفني؟",
"ما هي أسعار الباقات؟",
"مشكلة في الواي فاي",
"معلومات عن الشركة"
]
for query in test_queries:
print(f"\n❓ السؤال: {query}")
context = rag.get_context_for_query(query, max_results=2)
print(f"📋 السياق:")
print(context[:300] + "..." if len(context) > 300 else context)
print("-" * 50)
if __name__ == "__main__":
main()