Spaces:
Build error
Build error
| import gradio as gr | |
| import json | |
| import numpy as np | |
| from transformers import pipeline | |
| import torch | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| import re | |
| import math | |
| from collections import defaultdict, Counter | |
| from pathlib import Path | |
| import logging | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configure device | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| logger.info(f"Using device: {device}") | |
| class DocumentProcessor: | |
| """Handles document processing and text extraction from markdown files.""" | |
| def __init__(self, knowledge_base_dir: str = "knowledge_base"): | |
| self.knowledge_base_dir = Path(knowledge_base_dir) | |
| def load_markdown_files(self) -> List[Dict[str, Any]]: | |
| """Load and process all markdown files in the knowledge base directory.""" | |
| documents = [] | |
| file_priorities = { | |
| 'about.md': 10, | |
| 'research_details.md': 9, | |
| 'publications_detailed.md': 8, | |
| 'skills_expertise.md': 7, | |
| 'experience_detailed.md': 8, | |
| 'statistics.md': 9 | |
| } | |
| for file_path in self.knowledge_base_dir.glob("*.md"): | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| file_type = file_path.stem | |
| priority = file_priorities.get(file_path.name, 5) | |
| sections = self._split_markdown_into_sections(content) | |
| for section in sections: | |
| if len(section['content'].strip()) > 100: | |
| doc = { | |
| "id": f"{file_path.name}_{section['title']}_{len(documents)}", | |
| "content": section['content'], | |
| "metadata": { | |
| "type": file_type, | |
| "priority": priority, | |
| "section": section['title'], | |
| "source": file_path.name | |
| } | |
| } | |
| documents.append(doc) | |
| logger.info(f"β Loaded {file_path.name}") | |
| except Exception as e: | |
| logger.error(f"β Error loading {file_path.name}: {e}") | |
| return documents | |
| def _split_markdown_into_sections(self, content: str) -> List[Dict[str, str]]: | |
| """Split markdown content into sections based on headers.""" | |
| sections = [] | |
| lines = content.split('\n') | |
| current_section = {'title': 'Introduction', 'content': ''} | |
| for line in lines: | |
| if line.startswith('#'): | |
| if current_section['content'].strip(): | |
| sections.append(current_section.copy()) | |
| title = line.lstrip('#').strip() | |
| current_section = { | |
| 'title': title, | |
| 'content': line + '\n' | |
| } | |
| else: | |
| current_section['content'] += line + '\n' | |
| if current_section['content'].strip(): | |
| sections.append(current_section) | |
| return sections | |
| class BM25Searcher: | |
| """Implements BM25 search algorithm for keyword-based document retrieval.""" | |
| def __init__(self, k1: float = 1.5, b: float = 0.75): | |
| self.k1 = k1 | |
| self.b = b | |
| self.term_frequencies = {} | |
| self.document_frequency = defaultdict(int) | |
| self.document_lengths = {} | |
| self.average_doc_length = 0 | |
| self.total_documents = 0 | |
| def build_index(self, documents: List[Dict[str, Any]]): | |
| """Build BM25 index from documents.""" | |
| logger.info("Building BM25 index...") | |
| self.term_frequencies = {} | |
| self.document_frequency = defaultdict(int) | |
| self.document_lengths = {} | |
| total_length = 0 | |
| for doc in documents: | |
| doc_id = doc['id'] | |
| terms = self._tokenize(doc['content']) | |
| term_freq = Counter(terms) | |
| self.term_frequencies[doc_id] = dict(term_freq) | |
| doc_length = len(terms) | |
| self.document_lengths[doc_id] = doc_length | |
| total_length += doc_length | |
| unique_terms = set(terms) | |
| for term in unique_terms: | |
| self.document_frequency[term] += 1 | |
| self.total_documents = len(documents) | |
| self.average_doc_length = total_length / self.total_documents if self.total_documents > 0 else 0 | |
| logger.info(f"β BM25 index built: {len(self.document_frequency)} unique terms") | |
| def search(self, query: str, documents: List[Dict[str, Any]], top_k: int = 10) -> List[Dict[str, Any]]: | |
| """Perform BM25 search.""" | |
| query_terms = self._tokenize(query) | |
| if not query_terms: | |
| return [] | |
| scores = {} | |
| for doc in documents: | |
| doc_id = doc['id'] | |
| score = 0.0 | |
| for term in query_terms: | |
| score += self._calculate_bm25_score(term, doc_id) | |
| if score > 0: | |
| priority_boost = 1 + (doc['metadata']['priority'] / 50) | |
| final_score = score * priority_boost | |
| scores[doc_id] = { | |
| 'document': doc, | |
| 'score': final_score, | |
| 'search_type': 'bm25' | |
| } | |
| sorted_results = sorted(scores.values(), key=lambda x: x['score'], reverse=True) | |
| return sorted_results[:top_k] | |
| def _tokenize(self, text: str) -> List[str]: | |
| """Tokenize text for BM25.""" | |
| text = re.sub(r'[^\w\s]', ' ', text.lower()) | |
| words = [word for word in text.split() if len(word) > 2 and not self._is_stop_word(word)] | |
| return words | |
| def _is_stop_word(self, word: str) -> bool: | |
| """Check if word is a stop word.""" | |
| stop_words = { | |
| 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', | |
| 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', | |
| 'will', 'would', 'could', 'should', 'may', 'might', 'can', 'this', 'that', 'these', 'those' | |
| } | |
| return word in stop_words | |
| def _calculate_bm25_score(self, term: str, doc_id: str) -> float: | |
| """Calculate BM25 score for a term in a document.""" | |
| tf = self.term_frequencies.get(doc_id, {}).get(term, 0) | |
| if tf == 0: | |
| return 0.0 | |
| df = self.document_frequency.get(term, 1) | |
| doc_length = self.document_lengths.get(doc_id, 0) | |
| idf = math.log((self.total_documents - df + 0.5) / (df + 0.5)) | |
| numerator = tf * (self.k1 + 1) | |
| denominator = tf + self.k1 * (1 - self.b + self.b * (doc_length / self.average_doc_length)) | |
| return idf * (numerator / denominator) | |
| class VectorSearcher: | |
| """Implements vector-based semantic search using transformer embeddings.""" | |
| def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"): | |
| self.model_name = model_name | |
| self.embedder = None | |
| self.embeddings = [] | |
| def initialize_model(self): | |
| """Initialize the embedding model.""" | |
| try: | |
| logger.info("Loading embedding model...") | |
| self.embedder = pipeline( | |
| 'feature-extraction', | |
| self.model_name, | |
| device=0 if device == "cuda" else -1 | |
| ) | |
| logger.info("β Embedding model loaded successfully") | |
| except Exception as e: | |
| logger.error(f"β Error loading embedding model: {e}") | |
| raise e | |
| def build_embeddings(self, documents: List[Dict[str, Any]]): | |
| """Build embeddings for all documents.""" | |
| logger.info("Generating embeddings for knowledge base...") | |
| self.embeddings = [] | |
| for i, doc in enumerate(documents): | |
| try: | |
| content = doc["content"][:500] # Limit to 500 characters | |
| embedding = self.embedder(content, return_tensors="pt") | |
| embedding_np = embedding[0].mean(dim=0).detach().cpu().numpy() | |
| self.embeddings.append(embedding_np) | |
| except Exception as e: | |
| logger.error(f"Error generating embedding for doc {doc['id']}: {e}") | |
| self.embeddings.append(np.zeros(384)) | |
| logger.info(f"β Generated {len(self.embeddings)} embeddings") | |
| def search(self, query: str, documents: List[Dict[str, Any]], top_k: int = 10) -> List[Dict[str, Any]]: | |
| """Perform vector similarity search.""" | |
| try: | |
| query_embedding = self.embedder(query[:500], return_tensors="pt") | |
| query_vector = query_embedding[0].mean(dim=0).detach().cpu().numpy() | |
| similarities = [] | |
| for i, doc_embedding in enumerate(self.embeddings): | |
| if doc_embedding is not None and len(doc_embedding) > 0: | |
| similarity = self._cosine_similarity(query_vector, doc_embedding) | |
| priority_boost = 1 + (documents[i]['metadata']['priority'] / 100) | |
| final_score = similarity * priority_boost | |
| similarities.append({ | |
| 'document': documents[i], | |
| 'score': float(final_score), | |
| 'search_type': 'vector' | |
| }) | |
| similarities.sort(key=lambda x: x['score'], reverse=True) | |
| return similarities[:top_k] | |
| except Exception as e: | |
| logger.error(f"Error in vector search: {e}") | |
| return [] | |
| def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float: | |
| """Calculate cosine similarity between two vectors.""" | |
| return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) | |
| class HybridSearchSystem: | |
| """Main hybrid search system combining BM25 and vector search.""" | |
| def __init__(self): | |
| self.doc_processor = DocumentProcessor() | |
| self.bm25_searcher = BM25Searcher() | |
| self.vector_searcher = VectorSearcher() | |
| self.documents = [] | |
| def initialize(self): | |
| """Initialize the entire search system.""" | |
| logger.info("Initializing Hybrid Search RAGtim Bot...") | |
| # Load documents | |
| self.documents = self.doc_processor.load_markdown_files() | |
| # Initialize models and build indices | |
| self.vector_searcher.initialize_model() | |
| self.vector_searcher.build_embeddings(self.documents) | |
| self.bm25_searcher.build_index(self.documents) | |
| logger.info(f"β System initialized with {len(self.documents)} documents") | |
| def search(self, query: str, search_type: str = "hybrid", top_k: int = 5, | |
| vector_weight: float = 0.6, bm25_weight: float = 0.4) -> List[Dict[str, Any]]: | |
| """Perform search based on specified method.""" | |
| if search_type == "vector": | |
| return self.vector_searcher.search(query, self.documents, top_k) | |
| elif search_type == "bm25": | |
| return self.bm25_searcher.search(query, self.documents, top_k) | |
| else: # hybrid | |
| return self._hybrid_search(query, top_k, vector_weight, bm25_weight) | |
| def _hybrid_search(self, query: str, top_k: int = 10, | |
| vector_weight: float = 0.6, bm25_weight: float = 0.4) -> List[Dict[str, Any]]: | |
| """Perform hybrid search combining vector and BM25 results.""" | |
| try: | |
| vector_results = self.vector_searcher.search(query, self.documents, top_k * 2) | |
| bm25_results = self.bm25_searcher.search(query, self.documents, top_k * 2) | |
| # Normalize scores | |
| if vector_results: | |
| max_vector_score = max(r['score'] for r in vector_results) | |
| if max_vector_score > 0: | |
| for result in vector_results: | |
| result['normalized_score'] = result['score'] / max_vector_score | |
| else: | |
| for result in vector_results: | |
| result['normalized_score'] = 0 | |
| if bm25_results: | |
| max_bm25_score = max(r['score'] for r in bm25_results) | |
| if max_bm25_score > 0: | |
| for result in bm25_results: | |
| result['normalized_score'] = result['score'] / max_bm25_score | |
| else: | |
| for result in bm25_results: | |
| result['normalized_score'] = 0 | |
| # Combine results | |
| combined_scores = {} | |
| for result in vector_results: | |
| doc_id = result['document']['id'] | |
| combined_scores[doc_id] = { | |
| 'document': result['document'], | |
| 'vector_score': result['normalized_score'], | |
| 'bm25_score': 0.0, | |
| 'search_type': 'vector' | |
| } | |
| for result in bm25_results: | |
| doc_id = result['document']['id'] | |
| if doc_id in combined_scores: | |
| combined_scores[doc_id]['bm25_score'] = result['normalized_score'] | |
| combined_scores[doc_id]['search_type'] = 'hybrid' | |
| else: | |
| combined_scores[doc_id] = { | |
| 'document': result['document'], | |
| 'vector_score': 0.0, | |
| 'bm25_score': result['normalized_score'], | |
| 'search_type': 'bm25' | |
| } | |
| # Calculate final hybrid scores | |
| final_results = [] | |
| for doc_id, data in combined_scores.items(): | |
| hybrid_score = (vector_weight * data['vector_score']) + (bm25_weight * data['bm25_score']) | |
| final_results.append({ | |
| 'document': data['document'], | |
| 'score': hybrid_score, | |
| 'vector_score': data['vector_score'], | |
| 'bm25_score': data['bm25_score'], | |
| 'search_type': data['search_type'] | |
| }) | |
| final_results.sort(key=lambda x: x['score'], reverse=True) | |
| return final_results[:top_k] | |
| except Exception as e: | |
| logger.error(f"Error in hybrid search: {e}") | |
| return self.vector_searcher.search(query, self.documents, top_k) | |
| # Initialize the search system | |
| search_system = HybridSearchSystem() | |
| search_system.initialize() | |
| # API Functions | |
| def search_api(query: str, top_k: int = 5, search_type: str = "hybrid", | |
| vector_weight: float = 0.6, bm25_weight: float = 0.4) -> Dict[str, Any]: | |
| """API endpoint for search functionality.""" | |
| try: | |
| results = search_system.search(query, search_type, top_k, vector_weight, bm25_weight) | |
| return { | |
| "results": results, | |
| "query": query, | |
| "top_k": top_k, | |
| "search_type": search_type, | |
| "total_documents": len(search_system.documents), | |
| "search_parameters": { | |
| "vector_weight": vector_weight if search_type == "hybrid" else None, | |
| "bm25_weight": bm25_weight if search_type == "hybrid" else None, | |
| "bm25_k1": search_system.bm25_searcher.k1, | |
| "bm25_b": search_system.bm25_searcher.b | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in search API: {e}") | |
| return {"error": str(e), "results": []} | |
| def get_stats_api() -> Dict[str, Any]: | |
| """API endpoint for system statistics.""" | |
| try: | |
| doc_types = {} | |
| sections_by_file = {} | |
| for doc in search_system.documents: | |
| doc_type = doc["metadata"]["type"] | |
| source_file = doc["metadata"]["source"] | |
| doc_types[doc_type] = doc_types.get(doc_type, 0) + 1 | |
| sections_by_file[source_file] = sections_by_file.get(source_file, 0) + 1 | |
| return { | |
| "total_documents": len(search_system.documents), | |
| "document_types": doc_types, | |
| "sections_by_file": sections_by_file, | |
| "model_name": search_system.vector_searcher.model_name, | |
| "embedding_dimension": 384, | |
| "search_capabilities": [ | |
| "Hybrid Search (Vector + BM25)", | |
| "Semantic Vector Search", | |
| "BM25 Keyword Search", | |
| "GPU Accelerated", | |
| "Transformer Embeddings" | |
| ], | |
| "bm25_parameters": { | |
| "k1": search_system.bm25_searcher.k1, | |
| "b": search_system.bm25_searcher.b, | |
| "unique_terms": len(search_system.bm25_searcher.document_frequency), | |
| "average_doc_length": search_system.bm25_searcher.average_doc_length | |
| }, | |
| "backend_type": "Hugging Face Space with Hybrid Search", | |
| "knowledge_sources": list(sections_by_file.keys()), | |
| "status": "healthy" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in get_stats_api: {e}") | |
| return { | |
| "error": str(e), | |
| "status": "error", | |
| "total_documents": 0, | |
| "search_capabilities": ["Error"] | |
| } | |
| def chat_interface(message: str) -> str: | |
| """Enhanced chat interface with better formatting.""" | |
| if not message.strip(): | |
| return "Please ask me something about Raktim Mondol! I use hybrid search combining semantic similarity and keyword matching for the best results." | |
| try: | |
| search_results = search_system.search(message, "hybrid", 6) | |
| if search_results: | |
| response_parts = [] | |
| response_parts.append(f"π **Found {len(search_results)} relevant results using hybrid search**\n") | |
| best_match = search_results[0] | |
| response_parts.append(f"**Primary Answer** (Score: {best_match['score']:.3f})") | |
| response_parts.append(f"π Source: {best_match['document']['metadata']['source']} - {best_match['document']['metadata']['section']}") | |
| response_parts.append(f"π Search Type: {best_match['search_type'].upper()}") | |
| if 'vector_score' in best_match and 'bm25_score' in best_match: | |
| response_parts.append(f"π Vector: {best_match['vector_score']:.3f} | BM25: {best_match['bm25_score']:.3f}") | |
| response_parts.append(f"\n{best_match['document']['content']}\n") | |
| if len(search_results) > 1: | |
| response_parts.append("**Additional Context:**") | |
| for i, result in enumerate(search_results[1:3], 1): | |
| section_info = f"{result['document']['metadata']['source']} - {result['document']['metadata']['section']}" | |
| search_info = f"({result['search_type'].upper()}, Score: {result['score']:.3f})" | |
| response_parts.append(f"{i}. {section_info} {search_info}") | |
| excerpt = result['document']['content'][:200] + "..." if len(result['document']['content']) > 200 else result['document']['content'] | |
| response_parts.append(f" {excerpt}\n") | |
| response_parts.append("\nπ€ **Powered by Hybrid Search Technology**") | |
| response_parts.append("β’ Vector Search: Semantic understanding with transformers") | |
| response_parts.append("β’ BM25 Search: Advanced keyword ranking") | |
| response_parts.append("β’ Smart Fusion: Optimal relevance through weighted combination") | |
| return "\n".join(response_parts) | |
| else: | |
| return "I don't have specific information about that topic in my knowledge base. Could you please ask something else about Raktim Mondol?" | |
| except Exception as e: | |
| logger.error(f"Error in chat interface: {e}") | |
| return "I'm sorry, I encountered an error while processing your question. Please try again." | |
| # Create Gradio Interface with modern Gradio 5 features | |
| with gr.Blocks( | |
| title="π₯ Hybrid Search RAGtim Bot", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| } | |
| .chat-container { | |
| height: 600px; | |
| } | |
| """ | |
| ) as demo: | |
| gr.Markdown(""" | |
| # π₯ Hybrid Search RAGtim Bot | |
| **Advanced AI-powered search system combining semantic understanding with keyword precision** | |
| π§ **Semantic Vector Search** + π **BM25 Keyword Search** = β‘ **Optimal Results** | |
| Built with Gradio 5, featuring modern UI components and enhanced performance | |
| """) | |
| with gr.Tabs(): | |
| with gr.Tab("π¬ Chat Interface"): | |
| gr.Markdown("### Ask anything about Raktim Mondol's research, skills, or experience") | |
| chatbot = gr.Chatbot( | |
| value=[], | |
| label="RAGtim Bot", | |
| height=400, | |
| show_copy_button=True, | |
| bubble_full_width=False | |
| ) | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| label="Your Question", | |
| placeholder="What would you like to know about Raktim's research or expertise?", | |
| scale=4, | |
| lines=2 | |
| ) | |
| submit_btn = gr.Button("Ask", variant="primary", scale=1) | |
| gr.Examples( | |
| examples=[ | |
| "What is Raktim's research in LLMs and RAG?", | |
| "Tell me about BioFusionNet and statistical methods", | |
| "What are his multimodal AI capabilities?", | |
| "Describe his biostatistics expertise" | |
| ], | |
| inputs=msg | |
| ) | |
| def respond(message, history): | |
| response = chat_interface(message) | |
| history.append((message, response)) | |
| return history, "" | |
| submit_btn.click(respond, [msg, chatbot], [chatbot, msg]) | |
| msg.submit(respond, [msg, chatbot], [chatbot, msg]) | |
| with gr.Tab("π Advanced Search API"): | |
| gr.Markdown("### Direct access to the hybrid search engine") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| search_query = gr.Textbox( | |
| label="Search Query", | |
| placeholder="Enter your search query here..." | |
| ) | |
| with gr.Row(): | |
| search_type = gr.Radio( | |
| choices=["hybrid", "vector", "bm25"], | |
| value="hybrid", | |
| label="Search Method" | |
| ) | |
| top_k = gr.Slider( | |
| minimum=1, maximum=20, value=5, step=1, | |
| label="Number of Results" | |
| ) | |
| with gr.Row(): | |
| vector_weight = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.6, step=0.1, | |
| label="Vector Weight" | |
| ) | |
| bm25_weight = gr.Slider( | |
| minimum=0.0, maximum=1.0, value=0.4, step=0.1, | |
| label="BM25 Weight" | |
| ) | |
| search_btn = gr.Button("π Search", variant="primary") | |
| with gr.Column(scale=3): | |
| search_results = gr.JSON( | |
| label="Search Results", | |
| show_label=True | |
| ) | |
| search_btn.click( | |
| search_api, | |
| inputs=[search_query, top_k, search_type, vector_weight, bm25_weight], | |
| outputs=search_results | |
| ) | |
| with gr.Tab("π System Statistics"): | |
| gr.Markdown("### Knowledge base and system information") | |
| stats_btn = gr.Button("π Get Statistics", variant="secondary") | |
| stats_output = gr.JSON( | |
| label="System Statistics", | |
| show_label=True | |
| ) | |
| stats_btn.click(get_stats_api, outputs=stats_output) | |
| # Auto-load stats on tab open | |
| demo.load(get_stats_api, outputs=stats_output) | |
| if __name__ == "__main__": | |
| logger.info("π Launching Hybrid Search RAGtim Bot...") | |
| logger.info(f"π Loaded {len(search_system.documents)} documents") | |
| logger.info(f"π BM25 index: {len(search_system.bm25_searcher.document_frequency)} unique terms") | |
| logger.info(f"π§ Vector embeddings: {len(search_system.vector_searcher.embeddings)} documents") | |
| logger.info("π₯ Hybrid search ready!") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True, | |
| show_api=True | |
| ) |