OmidSakaki's picture
Update app.py
1e8a285 verified
"""
RAG System for Hugging Face Spaces
Optimized for deployment on HF Spaces with GPU support
"""
import os
import re
import time
import json
import gradio as gr
from dataclasses import dataclass
from typing import List, Dict, Tuple, Any, Optional
from collections import defaultdict
import numpy as np
from tqdm.auto import tqdm
# NLP
import nltk
from nltk.tokenize import sent_tokenize, word_tokenize
import langdetect
# Embedding & ranking models
from sentence_transformers import SentenceTransformer
import faiss
from rank_bm25 import BM25Okapi
# Ensure punkt tokenizer is available
try:
nltk.download('punkt', quiet=True)
nltk.download('punkt_tab', quiet=True)
except Exception:
pass
# -------------------------
# Data classes
# -------------------------
@dataclass
class Chunk:
id: str
text: str
meta: Dict[str, Any]
chunk_id: int
embedding: Optional[np.ndarray] = None
language: str = "unknown"
# -------------------------
# Document processing
# -------------------------
class DocumentProcessor:
def __init__(self):
self.supported_languages = ['fa', 'en', 'ar', 'es', 'fr']
def detect_language(self, text: str) -> str:
if not text or not text.strip():
return 'unknown'
try:
lang = langdetect.detect(text[:500])
return lang if lang in self.supported_languages else 'unknown'
except Exception:
return 'unknown'
def clean_text(self, text: str, language: str = 'fa') -> str:
if not text:
return ""
text = str(text)
text = re.sub(r'\s+', ' ', text).strip()
return text
def smart_sent_tokenize(self, text: str, language: str) -> List[str]:
try:
if language == 'fa':
sentences = re.split(r'[.!?؟۔]+', text)
else:
sentences = sent_tokenize(text)
return [s.strip() for s in sentences if len(s.strip()) > 10]
except Exception:
return [text.strip()] if text else []
def semantic_chunking(self, text: str, doc_id: str, meta: Dict, target_chunk_size: int = 300, overlap: int = 50) -> List[Chunk]:
language = self.detect_language(text)
cleaned_text = self.clean_text(text, language)
sentences = self.smart_sent_tokenize(cleaned_text, language)
chunks: List[Chunk] = []
current_chunk: List[str] = []
current_length = 0
chunk_id = 0
for sentence in sentences:
sentence_words = max(1, len(sentence.split()))
if current_length + sentence_words > target_chunk_size and current_chunk:
chunk_text = " ".join(current_chunk)
chunks.append(Chunk(id=doc_id, text=chunk_text, meta=meta, chunk_id=chunk_id, language=language))
chunk_id += 1
overlap_sentences = current_chunk[-2:] if len(current_chunk) > 2 else current_chunk[-1:] if current_chunk else []
current_chunk = overlap_sentences + [sentence]
current_length = sum(len(s.split()) for s in current_chunk)
else:
current_chunk.append(sentence)
current_length += sentence_words
if current_chunk:
chunk_text = " ".join(current_chunk)
chunks.append(Chunk(id=doc_id, text=chunk_text, meta=meta, chunk_id=chunk_id, language=language))
return chunks
# -------------------------
# Hybrid index (BM25 + FAISS)
# -------------------------
class AdvancedHybridIndex:
def __init__(self, embedding_model: str = 'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'):
print(f"Loading embedding model: {embedding_model}")
try:
self.embedder = SentenceTransformer(embedding_model)
except Exception as e:
raise RuntimeError(f"Failed to load SentenceTransformer '{embedding_model}': {e}")
self.faiss_index = None
self.id_to_chunk: List[Chunk] = []
self.bm25_indices: Dict[str, BM25Okapi] = {}
self.lang_to_global_indices: Dict[str, List[int]] = defaultdict(list)
self.corpus_by_language: Dict[str, List[str]] = defaultdict(list)
self.embeddings: Optional[np.ndarray] = None
self.doc_processor = DocumentProcessor()
def _tokenize_for_bm25(self, text: str, language: str) -> List[str]:
if not text:
return []
if language == 'fa':
return re.findall(r'[\w\u0600-\u06FF]+', text.lower())
else:
try:
return [t.lower() for t in word_tokenize(text)]
except Exception:
return re.findall(r'\w+', text.lower())
def build_index(self, chunks: List[Chunk], normalize: bool = True):
print(f"Building index for {len(chunks)} chunks...")
self.id_to_chunk = chunks
# Group texts by language and build mapping
for global_idx, chunk in enumerate(chunks):
lang = chunk.language
self.corpus_by_language[lang].append(chunk.text)
self.lang_to_global_indices[lang].append(global_idx)
# BM25 per language
for lang, texts in self.corpus_by_language.items():
tokenized = [self._tokenize_for_bm25(t, lang) for t in texts]
if not tokenized:
continue
try:
self.bm25_indices[lang] = BM25Okapi(tokenized)
print(f" BM25 index built for language '{lang}' with {len(texts)} docs")
except Exception as e:
print(f" Warning: BM25 build failed for lang {lang}: {e}")
# Dense embeddings
texts = [c.text for c in chunks]
print(" Computing dense embeddings...")
try:
embeddings = self.embedder.encode(texts, show_progress_bar=False, convert_to_numpy=True, batch_size=16)
except Exception as e:
print(f" Embedding failed: {e}")
embeddings = np.random.rand(len(texts), 384).astype('float32')
if normalize and embeddings is not None and len(embeddings) > 0:
norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
norms[norms == 0] = 1.0
embeddings = embeddings / norms
self.embeddings = embeddings.astype('float32')
if self.embeddings.size and self.embeddings.shape[0] > 0:
dim = self.embeddings.shape[1]
try:
self.faiss_index = faiss.IndexFlatIP(dim)
self.faiss_index.add(self.embeddings)
print(f" FAISS index created with {self.embeddings.shape[0]} vectors (dim={dim})")
except Exception as e:
print(f" Failed to create FAISS index: {e}")
else:
self.faiss_index = None
print(" Warning: No embeddings to add to FAISS")
def search_bm25(self, query: str, language: str, top_k: int = 50) -> List[Tuple[int, float]]:
if language not in self.bm25_indices:
return []
tokenized = self._tokenize_for_bm25(query, language)
if not tokenized:
return []
try:
scores = self.bm25_indices[language].get_scores(tokenized)
except Exception:
return []
if scores is None or len(scores) == 0:
return []
top_idxs = np.argsort(scores)[::-1][:top_k]
results: List[Tuple[int, float]] = []
for local_idx in top_idxs:
score = float(scores[local_idx])
if score <= 0:
continue
try:
global_idx = self.lang_to_global_indices[language][int(local_idx)]
results.append((int(global_idx), score))
except Exception:
continue
return results
def search_dense(self, query: str, top_k: int = 50) -> List[Tuple[int, float]]:
if self.faiss_index is None or self.embeddings is None or self.embeddings.size == 0:
return []
try:
q_emb = self.embedder.encode([query], convert_to_numpy=True)
except Exception:
return []
qnorm = np.linalg.norm(q_emb, axis=1, keepdims=True)
qnorm[qnorm == 0] = 1.0
q_emb = (q_emb / qnorm).astype('float32')
try:
D, I = self.faiss_index.search(q_emb, top_k)
except Exception:
return []
results: List[Tuple[int, float]] = []
for idx, score in zip(I[0], D[0]):
if idx != -1:
results.append((int(idx), float(score)))
return results
# -------------------------
# Retrieval system with IMPROVED relevance detection
# -------------------------
class AdvancedRetrievalSystem:
def __init__(self, index: AdvancedHybridIndex, relevance_threshold: float = 0.6, semantic_threshold: float = 0.25):
self.index = index
self.relevance_threshold = relevance_threshold
self.semantic_threshold = semantic_threshold
def _calculate_semantic_similarity(self, query: str, chunk_text: str) -> float:
"""Calculate semantic similarity between query and chunk"""
try:
query_emb = self.index.embedder.encode([query], convert_to_numpy=True)
chunk_emb = self.index.embedder.encode([chunk_text], convert_to_numpy=True)
similarity = np.dot(query_emb[0], chunk_emb[0]) / (
np.linalg.norm(query_emb[0]) * np.linalg.norm(chunk_emb[0])
)
return float(similarity)
except Exception:
return 0.0
def _calculate_keyword_overlap(self, query: str, chunk_text: str, language: str) -> float:
"""Calculate keyword overlap between query and chunk"""
if language == 'fa':
query_words = set(re.findall(r'[\w\u0600-\u06FF]+', query.lower()))
chunk_words = set(re.findall(r'[\w\u0600-\u06FF]+', chunk_text.lower()))
else:
query_words = set(re.findall(r'\w+', query.lower()))
chunk_words = set(re.findall(r'\w+', chunk_text.lower()))
if not query_words:
return 0.0
overlap = len(query_words.intersection(chunk_words)) / len(query_words)
return overlap
def hybrid_search(self, query: str, dense_weight: float = 0.7, bm25_weight: float = 0.3) -> Optional[Tuple[Chunk, float]]:
"""
Returns the highest-scoring chunk only if it meets multiple relevance criteria
"""
start = time.time()
language = self.index.doc_processor.detect_language(query)
# Get results from both methods
dense_results = self.index.search_dense(query, top_k=10)
bm25_results = self.index.search_bm25(query, language, top_k=10)
combined = {}
# Process dense results
if dense_results:
dense_scores = np.array([s for _, s in dense_results])
if len(dense_scores) > 0:
if dense_scores.max() - dense_scores.min() == 0:
dense_norm = np.ones_like(dense_scores)
else:
dense_norm = (dense_scores - dense_scores.min()) / (dense_scores.max() - dense_scores.min() + 1e-8)
for (idx, _), norm in zip(dense_results, dense_norm):
combined[idx] = dense_weight * float(norm)
# Process BM25 results
if bm25_results:
bm25_scores = np.array([s for _, s in bm25_results])
if len(bm25_scores) > 0:
if bm25_scores.max() - bm25_scores.min() == 0:
bm25_norm = np.ones_like(bm25_scores)
else:
bm25_norm = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-8)
for (idx, _), norm in zip(bm25_results, bm25_norm):
if idx in combined:
combined[idx] += bm25_weight * float(norm)
else:
combined[idx] = bm25_weight * float(norm)
# Find the single highest-scoring chunk
if not combined:
return None
best_idx, best_score = max(combined.items(), key=lambda x: x[1])
if 0 <= best_idx < len(self.index.id_to_chunk):
best_chunk = self.index.id_to_chunk[best_idx]
# ADDITIONAL RELEVANCE CHECKS
semantic_similarity = self._calculate_semantic_similarity(query, best_chunk.text)
keyword_overlap = self._calculate_keyword_overlap(query, best_chunk.text, language)
# STRICT RELEVANCE CHECK
is_relevant = (
best_score >= self.relevance_threshold and
semantic_similarity >= self.semantic_threshold and
keyword_overlap >= 0.05 # Reduced threshold for better coverage
)
if not is_relevant:
return None
return (best_chunk, best_score)
else:
return None
# -------------------------
# Professional RAG system for HF Spaces
# -------------------------
class HuggingFaceRAGSystem:
def __init__(self):
print("🚀 Initializing RAG System for Hugging Face Spaces...")
self.doc_processor = DocumentProcessor()
self.index = AdvancedHybridIndex('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2')
self.retrieval_system = AdvancedRetrievalSystem(self.index, relevance_threshold=0.6, semantic_threshold=0.25)
self.is_initialized = False
self.default_documents_loaded = False
def load_default_documents(self):
"""Load default documents for demo"""
if self.default_documents_loaded:
return
default_docs = [
{
"id": "doc1",
"title": "یادگیری عمیق چیست؟",
"text": "یادگیری عمیق (Deep Learning) شاخه‌ای از یادگیری ماشین است که از شبکه‌های عصبی مصنوعی با چندین لایه استفاده می‌کند. این تکنیک برای کارهایی مانند تشخیص تصویر، پردازش زبان طبیعی و تشخیص صوت بسیار مناسب است. شبکه‌های عصبی در یادگیری عمیق می‌توانند ویژگی‌های پیچیده را به طور خودکار از داده‌ها یاد بگیرند.",
"meta": {"source": "ویکی‌پدیا", "category": "هوش مصنوعی"}
},
{
"id": "doc2",
"title": "معماری Transformer",
"text": "معماری Transformer یک مدل برای پردازش زبان طبیعی است که از مکانیزم توجه (attention) استفاده می‌کند. این معماری در مدل‌هایی مانند BERT و GPT استفاده شده و در ترجمه ماشینی و درک متن کاربرد دارد. Transformerها نسبت به مدل‌های قدیمی‌تر سرعت و دقت بیشتری در پردازش متون طولانی دارند.",
"meta": {"source": "مقاله تحقیقاتی", "category": "پردازش زبان"}
},
{
"id": "doc3",
"title": "شبکه‌های عصبی کانولوشنی",
"text": "شبکه‌های عصبی کانولوشنی (CNN) مخصوص پردازش داده‌های شبکه‌ای مانند تصاویر هستند. این شبکه‌ها از لایه‌های کانولوشن برای استخراج ویژگی‌ها استفاده می‌کنند. کاربردهای اصلی CNN شامل تشخیص اشیاء، طبقه‌بندی تصاویر و بینایی کامپیوتر است.",
"meta": {"source": "کتاب آموزشی", "category": "بینایی ماشین"}
},
{
"id": "doc4",
"title": "پردازش زبان طبیعی فارسی",
"text": "پردازش زبان طبیعی برای فارسی با چالش‌هایی مانند کمبود داده‌های برچسب‌دار، پیچیدگی‌های صرفی و نحوی و نویسه‌های خاص روبرو است. با این حال اخیراً مدل‌های زیادی برای زبان فارسی توسعه یافته‌اند.",
"meta": {"source": "مقاله پژوهشی", "category": "پردازش زبان فارسی"}
},
{
"id": "doc5",
"title": "تغذیه سالم",
"text": "تغذیه سالم شامل مصرف متعادل میوه‌ها، سبزیجات، پروتئین‌ها و غلات کامل است. نوشیدن آب کافی و کاهش مصرف قند و نمک برای سلامت بدن بسیار مهم می‌باشد.",
"meta": {"source": "کتاب سلامت", "category": "تغذیه"}
},
{
"id": "doc6",
"title": "ورزش و تناسب اندام",
"text": "ورزش منظم باعث بهبود سلامت قلبی عروقی، تقویت عضلات و کاهش استرس می‌شود. پیاده‌روی، شنا و دوچرخه‌سواری از ورزش‌های مفید هستند.",
"meta": {"source": "مجله ورزشی", "category": "سلامت"}
}
]
self.index_documents(default_docs)
self.default_documents_loaded = True
print("✅ Default documents loaded and indexed!")
def index_documents(self, documents: List[Dict]):
"""Index documents"""
print(f"📚 Indexing {len(documents)} documents...")
all_chunks: List[Chunk] = []
for doc in documents:
chunks = self.doc_processor.semantic_chunking(
doc.get('text', ''),
doc.get('id', 'unknown'),
doc.get('meta', {}),
target_chunk_size=300,
overlap=50
)
all_chunks.extend(chunks)
print(f"Created {len(all_chunks)} chunks from {len(documents)} documents")
self.index.build_index(all_chunks)
self.is_initialized = True
def query(self, question: str) -> Dict[str, Any]:
"""Query the RAG system"""
if not self.is_initialized:
self.load_default_documents()
start = time.time()
# Retrieve only the top chunk (if highly relevant)
result = self.retrieval_system.hybrid_search(question)
if not result:
return {
"answer": "متأسفانه اطلاعات مرتبطی در اسناد موجود برای پاسخ به این سوال یافت نشد.",
"sources": [],
"confidence": 0.0,
"processing_time": round(time.time() - start, 2),
"relevant_content_found": False
}
top_chunk, score = result
# Store score in chunk for reference
top_chunk.score = score
# Generate answer from top chunk
language = self.doc_processor.detect_language(question)
answer_text = top_chunk.text
source = top_chunk.meta.get('source', 'Unknown')
sources = [source] if source else []
confidence = min(1.0, float(score))
return {
"question": question,
"answer": answer_text,
"sources": sources,
"confidence": round(confidence, 2),
"retrieved_score": round(score, 3),
"processing_time": round(time.time() - start, 2),
"language": language,
"chunk_source": source,
"relevant_content_found": True
}
# -------------------------
# Gradio Interface
# -------------------------
class RAGInterface:
def __init__(self):
self.rag_system = HuggingFaceRAGSystem()
self.rag_system.load_default_documents()
def process_query(self, question: str, history):
"""Process query and return formatted response"""
if not question.strip():
return history, "لطفاً یک سوال وارد کنید."
# Add user question to history
history.append([question, ""])
# Get response from RAG system
result = self.rag_system.query(question)
# Format response
if result['relevant_content_found']:
response = f"**🤖 پاسخ:**\n{result['answer']}\n\n"
response += f"**🏷️ منبع:** {result['chunk_source']}\n"
response += f"**🎯 امتیاز اطمینان:** {result['confidence']}\n"
response += f"**⏱️ زمان پردازش:** {result['processing_time']} ثانیه"
else:
response = f"**❌ پاسخ:**\n{result['answer']}\n\n"
response += f"**⏱️ زمان پردازش:** {result['processing_time']} ثانیه"
# Update history
history[-1][1] = response
return history, ""
def clear_chat(self):
"""Clear chat history"""
return [], ""
# -------------------------
# Create and launch Gradio app
# -------------------------
def create_interface():
"""Create Gradio interface"""
# Initialize RAG system
rag_interface = RAGInterface()
# Custom CSS for better styling
css = """
.gradio-container {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.title {
text-align: center;
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
font-weight: bold;
}
"""
with gr.Blocks(css=css, theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🧠 سیستم هوشمند پاسخگویی (RAG)
**سیستم بازیابی و تولید پاسخ مبتنی بر اسناد**
این سیستم از هوش مصنوعی برای یافتن مرتبط‌ترین اطلاعات از اسناد موجود و ارائه پاسخ دقیق استفاده می‌کند.
"""
)
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(
label="مکالمه",
height=500,
show_copy_button=True,
avatar_images=("👤", "🤖")
)
with gr.Row():
question_input = gr.Textbox(
label="سوال خود را بپرسید",
placeholder="مثلاً: یادگیری عمیق چیست؟ یا یک تمرین ورزشی پیشنهاد بده...",
lines=2,
scale=4
)
submit_btn = gr.Button("ارسال سوال 🚀", scale=1)
with gr.Row():
clear_btn = gr.Button("پاک کردن مکالمه 🗑️")
examples = gr.Examples(
examples=[
"یادگیری عمیق چیست؟",
"Transformer چیست و چه کاربردی دارد؟",
"یک تمرین ورزشی پیشنهاد بده",
"تغذیه سالم چیست؟",
"پردازش زبان فارسی چه مشکلاتی دارد؟"
],
inputs=question_input
)
with gr.Column(scale=1):
gr.Markdown("### 📊 اطلاعات سیستم")
with gr.Accordion("اسناد موجود", open=False):
gr.Markdown("""
**موضوعات پوشش داده شده:**
- 🤖 هوش مصنوعی و یادگیری عمیق
- 🔤 پردازش زبان طبیعی
- 👁️ بینایی کامپیوتر
- 🍎 تغذیه و سلامت
- 🏃‍♂️ ورزش و تناسب اندام
""")
with gr.Accordion("راهنمای استفاده", open=True):
gr.Markdown("""
**نحوه کار سیستم:**
1. سوال خود را به فارسی یا انگلیسی وارد کنید
2. سیستم مرتبط‌ترین سند را پیدا می‌کند
3. در صورت وجود اطلاعات کافی، پاسخ ارائه می‌شود
4. در غیر این صورت، سیستم اطلاع می‌دهد
**محدودیت‌ها:**
- فقط به سوالات مرتبط با اسناد موجود پاسخ می‌دهد
- پاسخ‌ها مستقیماً از اسناد استخراج می‌شوند
- از تولید پاسخ‌های تخیلی خودداری می‌کند
""")
# Event handlers
submit_btn.click(
fn=rag_interface.process_query,
inputs=[question_input, chatbot],
outputs=[chatbot, question_input]
)
question_input.submit(
fn=rag_interface.process_query,
inputs=[question_input, chatbot],
outputs=[chatbot, question_input]
)
clear_btn.click(
fn=rag_interface.clear_chat,
inputs=[],
outputs=[chatbot]
)
return demo
# -------------------------
# Main execution for Hugging Face Spaces
# -------------------------
if __name__ == "__main__":
# For Hugging Face Spaces
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
share=False,
show_error=True
)