""" RAG System for Hugging Face Spaces Optimized for deployment on HF Spaces with GPU support """ import os import re import time import json import gradio as gr from dataclasses import dataclass from typing import List, Dict, Tuple, Any, Optional from collections import defaultdict import numpy as np from tqdm.auto import tqdm # NLP import nltk from nltk.tokenize import sent_tokenize, word_tokenize import langdetect # Embedding & ranking models from sentence_transformers import SentenceTransformer import faiss from rank_bm25 import BM25Okapi # Ensure punkt tokenizer is available try: nltk.download('punkt', quiet=True) nltk.download('punkt_tab', quiet=True) except Exception: pass # ------------------------- # Data classes # ------------------------- @dataclass class Chunk: id: str text: str meta: Dict[str, Any] chunk_id: int embedding: Optional[np.ndarray] = None language: str = "unknown" # ------------------------- # Document processing # ------------------------- class DocumentProcessor: def __init__(self): self.supported_languages = ['fa', 'en', 'ar', 'es', 'fr'] def detect_language(self, text: str) -> str: if not text or not text.strip(): return 'unknown' try: lang = langdetect.detect(text[:500]) return lang if lang in self.supported_languages else 'unknown' except Exception: return 'unknown' def clean_text(self, text: str, language: str = 'fa') -> str: if not text: return "" text = str(text) text = re.sub(r'\s+', ' ', text).strip() return text def smart_sent_tokenize(self, text: str, language: str) -> List[str]: try: if language == 'fa': sentences = re.split(r'[.!?؟۔]+', text) else: sentences = sent_tokenize(text) return [s.strip() for s in sentences if len(s.strip()) > 10] except Exception: return [text.strip()] if text else [] def semantic_chunking(self, text: str, doc_id: str, meta: Dict, target_chunk_size: int = 300, overlap: int = 50) -> List[Chunk]: language = self.detect_language(text) cleaned_text = self.clean_text(text, language) sentences = self.smart_sent_tokenize(cleaned_text, language) chunks: List[Chunk] = [] current_chunk: List[str] = [] current_length = 0 chunk_id = 0 for sentence in sentences: sentence_words = max(1, len(sentence.split())) if current_length + sentence_words > target_chunk_size and current_chunk: chunk_text = " ".join(current_chunk) chunks.append(Chunk(id=doc_id, text=chunk_text, meta=meta, chunk_id=chunk_id, language=language)) chunk_id += 1 overlap_sentences = current_chunk[-2:] if len(current_chunk) > 2 else current_chunk[-1:] if current_chunk else [] current_chunk = overlap_sentences + [sentence] current_length = sum(len(s.split()) for s in current_chunk) else: current_chunk.append(sentence) current_length += sentence_words if current_chunk: chunk_text = " ".join(current_chunk) chunks.append(Chunk(id=doc_id, text=chunk_text, meta=meta, chunk_id=chunk_id, language=language)) return chunks # ------------------------- # Hybrid index (BM25 + FAISS) # ------------------------- class AdvancedHybridIndex: def __init__(self, embedding_model: str = 'sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2'): print(f"Loading embedding model: {embedding_model}") try: self.embedder = SentenceTransformer(embedding_model) except Exception as e: raise RuntimeError(f"Failed to load SentenceTransformer '{embedding_model}': {e}") self.faiss_index = None self.id_to_chunk: List[Chunk] = [] self.bm25_indices: Dict[str, BM25Okapi] = {} self.lang_to_global_indices: Dict[str, List[int]] = defaultdict(list) self.corpus_by_language: Dict[str, List[str]] = defaultdict(list) self.embeddings: Optional[np.ndarray] = None self.doc_processor = DocumentProcessor() def _tokenize_for_bm25(self, text: str, language: str) -> List[str]: if not text: return [] if language == 'fa': return re.findall(r'[\w\u0600-\u06FF]+', text.lower()) else: try: return [t.lower() for t in word_tokenize(text)] except Exception: return re.findall(r'\w+', text.lower()) def build_index(self, chunks: List[Chunk], normalize: bool = True): print(f"Building index for {len(chunks)} chunks...") self.id_to_chunk = chunks # Group texts by language and build mapping for global_idx, chunk in enumerate(chunks): lang = chunk.language self.corpus_by_language[lang].append(chunk.text) self.lang_to_global_indices[lang].append(global_idx) # BM25 per language for lang, texts in self.corpus_by_language.items(): tokenized = [self._tokenize_for_bm25(t, lang) for t in texts] if not tokenized: continue try: self.bm25_indices[lang] = BM25Okapi(tokenized) print(f" BM25 index built for language '{lang}' with {len(texts)} docs") except Exception as e: print(f" Warning: BM25 build failed for lang {lang}: {e}") # Dense embeddings texts = [c.text for c in chunks] print(" Computing dense embeddings...") try: embeddings = self.embedder.encode(texts, show_progress_bar=False, convert_to_numpy=True, batch_size=16) except Exception as e: print(f" Embedding failed: {e}") embeddings = np.random.rand(len(texts), 384).astype('float32') if normalize and embeddings is not None and len(embeddings) > 0: norms = np.linalg.norm(embeddings, axis=1, keepdims=True) norms[norms == 0] = 1.0 embeddings = embeddings / norms self.embeddings = embeddings.astype('float32') if self.embeddings.size and self.embeddings.shape[0] > 0: dim = self.embeddings.shape[1] try: self.faiss_index = faiss.IndexFlatIP(dim) self.faiss_index.add(self.embeddings) print(f" FAISS index created with {self.embeddings.shape[0]} vectors (dim={dim})") except Exception as e: print(f" Failed to create FAISS index: {e}") else: self.faiss_index = None print(" Warning: No embeddings to add to FAISS") def search_bm25(self, query: str, language: str, top_k: int = 50) -> List[Tuple[int, float]]: if language not in self.bm25_indices: return [] tokenized = self._tokenize_for_bm25(query, language) if not tokenized: return [] try: scores = self.bm25_indices[language].get_scores(tokenized) except Exception: return [] if scores is None or len(scores) == 0: return [] top_idxs = np.argsort(scores)[::-1][:top_k] results: List[Tuple[int, float]] = [] for local_idx in top_idxs: score = float(scores[local_idx]) if score <= 0: continue try: global_idx = self.lang_to_global_indices[language][int(local_idx)] results.append((int(global_idx), score)) except Exception: continue return results def search_dense(self, query: str, top_k: int = 50) -> List[Tuple[int, float]]: if self.faiss_index is None or self.embeddings is None or self.embeddings.size == 0: return [] try: q_emb = self.embedder.encode([query], convert_to_numpy=True) except Exception: return [] qnorm = np.linalg.norm(q_emb, axis=1, keepdims=True) qnorm[qnorm == 0] = 1.0 q_emb = (q_emb / qnorm).astype('float32') try: D, I = self.faiss_index.search(q_emb, top_k) except Exception: return [] results: List[Tuple[int, float]] = [] for idx, score in zip(I[0], D[0]): if idx != -1: results.append((int(idx), float(score))) return results # ------------------------- # Retrieval system with IMPROVED relevance detection # ------------------------- class AdvancedRetrievalSystem: def __init__(self, index: AdvancedHybridIndex, relevance_threshold: float = 0.6, semantic_threshold: float = 0.25): self.index = index self.relevance_threshold = relevance_threshold self.semantic_threshold = semantic_threshold def _calculate_semantic_similarity(self, query: str, chunk_text: str) -> float: """Calculate semantic similarity between query and chunk""" try: query_emb = self.index.embedder.encode([query], convert_to_numpy=True) chunk_emb = self.index.embedder.encode([chunk_text], convert_to_numpy=True) similarity = np.dot(query_emb[0], chunk_emb[0]) / ( np.linalg.norm(query_emb[0]) * np.linalg.norm(chunk_emb[0]) ) return float(similarity) except Exception: return 0.0 def _calculate_keyword_overlap(self, query: str, chunk_text: str, language: str) -> float: """Calculate keyword overlap between query and chunk""" if language == 'fa': query_words = set(re.findall(r'[\w\u0600-\u06FF]+', query.lower())) chunk_words = set(re.findall(r'[\w\u0600-\u06FF]+', chunk_text.lower())) else: query_words = set(re.findall(r'\w+', query.lower())) chunk_words = set(re.findall(r'\w+', chunk_text.lower())) if not query_words: return 0.0 overlap = len(query_words.intersection(chunk_words)) / len(query_words) return overlap def hybrid_search(self, query: str, dense_weight: float = 0.7, bm25_weight: float = 0.3) -> Optional[Tuple[Chunk, float]]: """ Returns the highest-scoring chunk only if it meets multiple relevance criteria """ start = time.time() language = self.index.doc_processor.detect_language(query) # Get results from both methods dense_results = self.index.search_dense(query, top_k=10) bm25_results = self.index.search_bm25(query, language, top_k=10) combined = {} # Process dense results if dense_results: dense_scores = np.array([s for _, s in dense_results]) if len(dense_scores) > 0: if dense_scores.max() - dense_scores.min() == 0: dense_norm = np.ones_like(dense_scores) else: dense_norm = (dense_scores - dense_scores.min()) / (dense_scores.max() - dense_scores.min() + 1e-8) for (idx, _), norm in zip(dense_results, dense_norm): combined[idx] = dense_weight * float(norm) # Process BM25 results if bm25_results: bm25_scores = np.array([s for _, s in bm25_results]) if len(bm25_scores) > 0: if bm25_scores.max() - bm25_scores.min() == 0: bm25_norm = np.ones_like(bm25_scores) else: bm25_norm = (bm25_scores - bm25_scores.min()) / (bm25_scores.max() - bm25_scores.min() + 1e-8) for (idx, _), norm in zip(bm25_results, bm25_norm): if idx in combined: combined[idx] += bm25_weight * float(norm) else: combined[idx] = bm25_weight * float(norm) # Find the single highest-scoring chunk if not combined: return None best_idx, best_score = max(combined.items(), key=lambda x: x[1]) if 0 <= best_idx < len(self.index.id_to_chunk): best_chunk = self.index.id_to_chunk[best_idx] # ADDITIONAL RELEVANCE CHECKS semantic_similarity = self._calculate_semantic_similarity(query, best_chunk.text) keyword_overlap = self._calculate_keyword_overlap(query, best_chunk.text, language) # STRICT RELEVANCE CHECK is_relevant = ( best_score >= self.relevance_threshold and semantic_similarity >= self.semantic_threshold and keyword_overlap >= 0.05 # Reduced threshold for better coverage ) if not is_relevant: return None return (best_chunk, best_score) else: return None # ------------------------- # Professional RAG system for HF Spaces # ------------------------- class HuggingFaceRAGSystem: def __init__(self): print("🚀 Initializing RAG System for Hugging Face Spaces...") self.doc_processor = DocumentProcessor() self.index = AdvancedHybridIndex('sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') self.retrieval_system = AdvancedRetrievalSystem(self.index, relevance_threshold=0.6, semantic_threshold=0.25) self.is_initialized = False self.default_documents_loaded = False def load_default_documents(self): """Load default documents for demo""" if self.default_documents_loaded: return default_docs = [ { "id": "doc1", "title": "یادگیری عمیق چیست؟", "text": "یادگیری عمیق (Deep Learning) شاخه‌ای از یادگیری ماشین است که از شبکه‌های عصبی مصنوعی با چندین لایه استفاده می‌کند. این تکنیک برای کارهایی مانند تشخیص تصویر، پردازش زبان طبیعی و تشخیص صوت بسیار مناسب است. شبکه‌های عصبی در یادگیری عمیق می‌توانند ویژگی‌های پیچیده را به طور خودکار از داده‌ها یاد بگیرند.", "meta": {"source": "ویکی‌پدیا", "category": "هوش مصنوعی"} }, { "id": "doc2", "title": "معماری Transformer", "text": "معماری Transformer یک مدل برای پردازش زبان طبیعی است که از مکانیزم توجه (attention) استفاده می‌کند. این معماری در مدل‌هایی مانند BERT و GPT استفاده شده و در ترجمه ماشینی و درک متن کاربرد دارد. Transformerها نسبت به مدل‌های قدیمی‌تر سرعت و دقت بیشتری در پردازش متون طولانی دارند.", "meta": {"source": "مقاله تحقیقاتی", "category": "پردازش زبان"} }, { "id": "doc3", "title": "شبکه‌های عصبی کانولوشنی", "text": "شبکه‌های عصبی کانولوشنی (CNN) مخصوص پردازش داده‌های شبکه‌ای مانند تصاویر هستند. این شبکه‌ها از لایه‌های کانولوشن برای استخراج ویژگی‌ها استفاده می‌کنند. کاربردهای اصلی CNN شامل تشخیص اشیاء، طبقه‌بندی تصاویر و بینایی کامپیوتر است.", "meta": {"source": "کتاب آموزشی", "category": "بینایی ماشین"} }, { "id": "doc4", "title": "پردازش زبان طبیعی فارسی", "text": "پردازش زبان طبیعی برای فارسی با چالش‌هایی مانند کمبود داده‌های برچسب‌دار، پیچیدگی‌های صرفی و نحوی و نویسه‌های خاص روبرو است. با این حال اخیراً مدل‌های زیادی برای زبان فارسی توسعه یافته‌اند.", "meta": {"source": "مقاله پژوهشی", "category": "پردازش زبان فارسی"} }, { "id": "doc5", "title": "تغذیه سالم", "text": "تغذیه سالم شامل مصرف متعادل میوه‌ها، سبزیجات، پروتئین‌ها و غلات کامل است. نوشیدن آب کافی و کاهش مصرف قند و نمک برای سلامت بدن بسیار مهم می‌باشد.", "meta": {"source": "کتاب سلامت", "category": "تغذیه"} }, { "id": "doc6", "title": "ورزش و تناسب اندام", "text": "ورزش منظم باعث بهبود سلامت قلبی عروقی، تقویت عضلات و کاهش استرس می‌شود. پیاده‌روی، شنا و دوچرخه‌سواری از ورزش‌های مفید هستند.", "meta": {"source": "مجله ورزشی", "category": "سلامت"} } ] self.index_documents(default_docs) self.default_documents_loaded = True print("✅ Default documents loaded and indexed!") def index_documents(self, documents: List[Dict]): """Index documents""" print(f"📚 Indexing {len(documents)} documents...") all_chunks: List[Chunk] = [] for doc in documents: chunks = self.doc_processor.semantic_chunking( doc.get('text', ''), doc.get('id', 'unknown'), doc.get('meta', {}), target_chunk_size=300, overlap=50 ) all_chunks.extend(chunks) print(f"Created {len(all_chunks)} chunks from {len(documents)} documents") self.index.build_index(all_chunks) self.is_initialized = True def query(self, question: str) -> Dict[str, Any]: """Query the RAG system""" if not self.is_initialized: self.load_default_documents() start = time.time() # Retrieve only the top chunk (if highly relevant) result = self.retrieval_system.hybrid_search(question) if not result: return { "answer": "متأسفانه اطلاعات مرتبطی در اسناد موجود برای پاسخ به این سوال یافت نشد.", "sources": [], "confidence": 0.0, "processing_time": round(time.time() - start, 2), "relevant_content_found": False } top_chunk, score = result # Store score in chunk for reference top_chunk.score = score # Generate answer from top chunk language = self.doc_processor.detect_language(question) answer_text = top_chunk.text source = top_chunk.meta.get('source', 'Unknown') sources = [source] if source else [] confidence = min(1.0, float(score)) return { "question": question, "answer": answer_text, "sources": sources, "confidence": round(confidence, 2), "retrieved_score": round(score, 3), "processing_time": round(time.time() - start, 2), "language": language, "chunk_source": source, "relevant_content_found": True } # ------------------------- # Gradio Interface # ------------------------- class RAGInterface: def __init__(self): self.rag_system = HuggingFaceRAGSystem() self.rag_system.load_default_documents() def process_query(self, question: str, history): """Process query and return formatted response""" if not question.strip(): return history, "لطفاً یک سوال وارد کنید." # Add user question to history history.append([question, ""]) # Get response from RAG system result = self.rag_system.query(question) # Format response if result['relevant_content_found']: response = f"**🤖 پاسخ:**\n{result['answer']}\n\n" response += f"**🏷️ منبع:** {result['chunk_source']}\n" response += f"**🎯 امتیاز اطمینان:** {result['confidence']}\n" response += f"**⏱️ زمان پردازش:** {result['processing_time']} ثانیه" else: response = f"**❌ پاسخ:**\n{result['answer']}\n\n" response += f"**⏱️ زمان پردازش:** {result['processing_time']} ثانیه" # Update history history[-1][1] = response return history, "" def clear_chat(self): """Clear chat history""" return [], "" # ------------------------- # Create and launch Gradio app # ------------------------- def create_interface(): """Create Gradio interface""" # Initialize RAG system rag_interface = RAGInterface() # Custom CSS for better styling css = """ .gradio-container { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } .title { text-align: center; background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); -webkit-background-clip: text; -webkit-text-fill-color: transparent; font-weight: bold; } """ with gr.Blocks(css=css, theme=gr.themes.Soft()) as demo: gr.Markdown( """ # 🧠 سیستم هوشمند پاسخگویی (RAG) **سیستم بازیابی و تولید پاسخ مبتنی بر اسناد** این سیستم از هوش مصنوعی برای یافتن مرتبط‌ترین اطلاعات از اسناد موجود و ارائه پاسخ دقیق استفاده می‌کند. """ ) with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot( label="مکالمه", height=500, show_copy_button=True, avatar_images=("👤", "🤖") ) with gr.Row(): question_input = gr.Textbox( label="سوال خود را بپرسید", placeholder="مثلاً: یادگیری عمیق چیست؟ یا یک تمرین ورزشی پیشنهاد بده...", lines=2, scale=4 ) submit_btn = gr.Button("ارسال سوال 🚀", scale=1) with gr.Row(): clear_btn = gr.Button("پاک کردن مکالمه 🗑️") examples = gr.Examples( examples=[ "یادگیری عمیق چیست؟", "Transformer چیست و چه کاربردی دارد؟", "یک تمرین ورزشی پیشنهاد بده", "تغذیه سالم چیست؟", "پردازش زبان فارسی چه مشکلاتی دارد؟" ], inputs=question_input ) with gr.Column(scale=1): gr.Markdown("### 📊 اطلاعات سیستم") with gr.Accordion("اسناد موجود", open=False): gr.Markdown(""" **موضوعات پوشش داده شده:** - 🤖 هوش مصنوعی و یادگیری عمیق - 🔤 پردازش زبان طبیعی - 👁️ بینایی کامپیوتر - 🍎 تغذیه و سلامت - 🏃‍♂️ ورزش و تناسب اندام """) with gr.Accordion("راهنمای استفاده", open=True): gr.Markdown(""" **نحوه کار سیستم:** 1. سوال خود را به فارسی یا انگلیسی وارد کنید 2. سیستم مرتبط‌ترین سند را پیدا می‌کند 3. در صورت وجود اطلاعات کافی، پاسخ ارائه می‌شود 4. در غیر این صورت، سیستم اطلاع می‌دهد **محدودیت‌ها:** - فقط به سوالات مرتبط با اسناد موجود پاسخ می‌دهد - پاسخ‌ها مستقیماً از اسناد استخراج می‌شوند - از تولید پاسخ‌های تخیلی خودداری می‌کند """) # Event handlers submit_btn.click( fn=rag_interface.process_query, inputs=[question_input, chatbot], outputs=[chatbot, question_input] ) question_input.submit( fn=rag_interface.process_query, inputs=[question_input, chatbot], outputs=[chatbot, question_input] ) clear_btn.click( fn=rag_interface.clear_chat, inputs=[], outputs=[chatbot] ) return demo # ------------------------- # Main execution for Hugging Face Spaces # ------------------------- if __name__ == "__main__": # For Hugging Face Spaces demo = create_interface() demo.launch( server_name="0.0.0.0", share=False, show_error=True )