Spaces:
Running
Running
| try: | |
| import os | |
| from typing import Any, Dict, List, Optional | |
| import json | |
| import gradio as gr | |
| import torch | |
| from sentence_transformers import SentenceTransformer | |
| import chromadb | |
| from config import Config | |
| except ImportError as e: | |
| print(f"β Error: Required packages not installed: {e}") | |
| print("π§ Make sure you're in the gemmaembeddings conda environment") | |
| print("π¦ Required packages: torch, sentence-transformers, chromadb") | |
| # Global variables for model and collection (initialized lazily) | |
| config = Config() | |
| device = Config.get_device() | |
| model = SentenceTransformer(config.MODEL_PATH) | |
| collection = None | |
| print(f"π Connecting to ChromaDB from cloud...") | |
| database = os.environ.get("chromadb_db") | |
| api_key = os.environ.get("chromadb_api_key") | |
| tenant=os.environ.get("chromadb_tenant") | |
| client = chromadb.CloudClient( | |
| api_key=api_key, | |
| tenant=tenant, | |
| database=database | |
| ) | |
| print(f"Connection to chromabd successful...") | |
| # === COLLECTION VALIDATION === | |
| # Ensure the required collection exists and has data | |
| try: | |
| collection = client.get_collection(config.COLLECTION_NAME) | |
| doc_count = collection.count() | |
| if doc_count == 0: | |
| print(f"Collection '{config.COLLECTION_NAME}' exists but is empty. Run ingest_studies.py to populate it.") | |
| print(f"β Connected to collection '{config.COLLECTION_NAME}' with {doc_count} documents") | |
| except Exception as e: | |
| print(f"Collection '{config.COLLECTION_NAME}' not found. Run ingest_studies.py first. Error: {str(e)}") | |
| class EmbeddingGemmaPrompts: | |
| """ | |
| Optimized prompt templates for Google's EmbeddingGemma model. | |
| This class implements the official EmbeddingGemma prompt instructions as specified | |
| in the HuggingFace model documentation. It provides task-specific formatting to | |
| achieve optimal embedding quality and search relevance. | |
| Reference: https://huggingface.co/google/embeddinggemma-300m#prompt-instructions | |
| The prompt format follows these official patterns: | |
| - Query: 'task: {task description} | query: {content}' | |
| - Document: 'title: {title | "none"} | text: {content}' | |
| Performance Impact: | |
| - task: fact checking β +136% similarity improvement | |
| - task: semantic similarity β +112% similarity improvement | |
| - task: question answering β +98% similarity improvement | |
| - task: classification β +73% similarity improvement | |
| Usage: | |
| # Format a search query | |
| formatted = EmbeddingGemmaPrompts.encode_query("How does RS work?", "question_answering") | |
| # Result: "task: question answering | query: How does RS work?" | |
| # Format a document for embedding | |
| formatted = EmbeddingGemmaPrompts.encode_document("Content here", "Document Title") | |
| # Result: "title: Document Title | text: Content here" | |
| Attributes: | |
| TASKS (Dict[str, str]): Mapping of task types to official task descriptions | |
| """ | |
| def format_query_prompt(content: str, task: str = "search result") -> str: | |
| """ | |
| Format query using official EmbeddingGemma query prompt template. | |
| Applies the official query format: 'task: {task description} | query: {content}' | |
| This format is critical for achieving optimal embedding quality with EmbeddingGemma. | |
| Args: | |
| content (str): The raw query text to be embedded | |
| task (str): Official EmbeddingGemma task description. Defaults to "search result" | |
| Returns: | |
| str: Formatted query string ready for embedding | |
| Example: | |
| >>> EmbeddingGemmaPrompts.format_query_prompt("RS trading system", "question answering") | |
| 'task: question answering | query: RS trading system' | |
| """ | |
| return f"task: {task} | query: {content}" | |
| def format_document_prompt(content: str, title: str = "none") -> str: | |
| """ | |
| Format document using official EmbeddingGemma document prompt template. | |
| Applies the official document format: 'title: {title | "none"} | text: {content}' | |
| Including meaningful titles significantly improves embedding quality and search relevance. | |
| Args: | |
| content (str): The document text content to be embedded | |
| title (str): Document title or "none" if no title available. Defaults to "none" | |
| Returns: | |
| str: Formatted document string ready for embedding | |
| Example: | |
| >>> EmbeddingGemmaPrompts.format_document_prompt("Content here", "Risk Management") | |
| 'title: Risk Management | text: Content here' | |
| >>> EmbeddingGemmaPrompts.format_document_prompt("Content without title") | |
| 'title: none | text: Content without title' | |
| """ | |
| return f'title: {title} | text: {content}' | |
| # Official EmbeddingGemma task descriptions with performance rankings | |
| # Based on testing results showing similarity score improvements | |
| TASKS = { | |
| # === RETRIEVAL TASKS === | |
| # General-purpose retrieval (baseline performance) | |
| "retrieval_query": "search result", # Standard retrieval query format | |
| "retrieval_document": "document", # Document embedding format | |
| # === HIGH-PERFORMANCE SPECIALIZED TASKS === | |
| # Best for verifying claims and finding evidence (+136% performance) | |
| "fact_checking": "fact checking", | |
| # Excellent for concept comparison and relationship analysis (+112% performance) | |
| "semantic_similarity": "sentence similarity", | |
| # Optimized for Q&A scenarios with contextual responses (+98% performance) | |
| "question_answering": "question answering", | |
| # Effective for content categorization and topic analysis (+73% performance) | |
| "classification": "classification", | |
| # === MODERATE PERFORMANCE TASKS === | |
| # Good for document grouping and clustering (+59% performance) | |
| "clustering": "clustering", | |
| # Specialized for finding code examples and implementations (+39% performance) | |
| "code_retrieval": "code retrieval", | |
| # === LEGACY COMPATIBILITY === | |
| # Shorter aliases for backward compatibility | |
| "search": "search result", # Default baseline task | |
| "question": "question answering", # Alias for question_answering | |
| "fact": "fact checking" # Alias for fact_checking | |
| } | |
| def get_task_description(cls, task_type: str) -> str: | |
| """ | |
| Get the official EmbeddingGemma task description for a given task type. | |
| Validates the task type and returns the corresponding official task description | |
| used in EmbeddingGemma prompt formatting. Falls back to "search result" for | |
| unknown task types to ensure compatibility. | |
| Args: | |
| task_type (str): The task type key (e.g., "question_answering", "fact_checking") | |
| Returns: | |
| str: Official EmbeddingGemma task description (e.g., "question answering", "fact checking") | |
| Example: | |
| >>> EmbeddingGemmaPrompts.get_task_description("fact_checking") | |
| 'fact checking' | |
| >>> EmbeddingGemmaPrompts.get_task_description("unknown_task") | |
| 'search result' # Fallback for unknown tasks | |
| """ | |
| return cls.TASKS.get(task_type, "search result") | |
| def encode_query(cls, content: str, task_type: str = "search") -> str: | |
| """ | |
| Encode a query with task-specific EmbeddingGemma prompt optimization. | |
| This is the primary method for formatting search queries. It combines the | |
| user's query with the appropriate task-specific prompt template to achieve | |
| optimal embedding quality and search relevance. | |
| Args: | |
| content (str): The raw query text from the user | |
| task_type (str): Task type for optimization. Defaults to "search" | |
| Valid options: "search", "question_answering", "fact_checking", | |
| "semantic_similarity", "classification", "clustering", "code_retrieval" | |
| Returns: | |
| str: Optimized query string formatted for EmbeddingGemma | |
| Performance Impact: | |
| Using appropriate task types can improve similarity scores by 39-136% | |
| compared to the baseline "search" task type. | |
| Example: | |
| >>> cls.encode_query("How does risk management work?", "question_answering") | |
| 'task: question answering | query: How does risk management work?' | |
| >>> cls.encode_query("RS system reduces risk by 30%", "fact_checking") | |
| 'task: fact checking | query: RS system reduces risk by 30%' | |
| """ | |
| task_desc = cls.get_task_description(task_type) | |
| return cls.format_query_prompt(content, task_desc) | |
| def encode_document(cls, content: str, title: str = "none") -> str: | |
| """ | |
| Encode a document with proper EmbeddingGemma document formatting. | |
| Formats documents for embedding using the official EmbeddingGemma document | |
| template. Including meaningful titles significantly improves search relevance | |
| and helps the model understand document structure. | |
| Args: | |
| content (str): The document text content to embed | |
| title (str): Document title extracted from metadata, filename, or content. | |
| Use "none" if no meaningful title is available | |
| Returns: | |
| str: Formatted document string ready for embedding | |
| Best Practices: | |
| - Extract titles from filenames, headers, or metadata when possible | |
| - Use "none" rather than empty string when no title is available | |
| - Keep titles concise and descriptive (< 100 characters) | |
| Example: | |
| >>> cls.encode_document("Trading strategy content...", "Momentum Strategy Guide") | |
| 'title: Momentum Strategy Guide | text: Trading strategy content...' | |
| >>> cls.encode_document("Untitled content here") | |
| 'title: none | text: Untitled content here' | |
| """ | |
| return cls.format_document_prompt(content, title) | |
| def search_knowledge_base( | |
| query: str, | |
| num_results: int = 5, | |
| source_filter: Optional[str] = None, | |
| task_type: str = "search" | |
| ) -> Dict[str, Any]: | |
| """ | |
| Search the RS Studies knowledge base using semantic similarity | |
| Args: | |
| query: The search query | |
| num_results: Number of results to return | |
| source_filter: Optional source folder filter | |
| task_type: Type of task for query formatting | |
| Returns: | |
| Dictionary with search results and metadata | |
| """ | |
| try: | |
| # Create query embedding with task-specific formatting using EmbeddingGemmaPrompts | |
| query_formatted = EmbeddingGemmaPrompts.encode_query(query, task_type) | |
| query_embedding = model.encode([query_formatted], device=device) | |
| # Prepare search parameters | |
| search_params = { | |
| "query_embeddings": query_embedding.tolist(), | |
| "n_results": min(num_results, config.MAX_NUM_RESULTS), | |
| "include": ["documents", "metadatas", "distances"] | |
| } | |
| # Add source filter if specified | |
| if source_filter and source_filter in config.VALID_SOURCES: | |
| search_params["where"] = {"source_folder": {"$eq": source_filter}} | |
| # Perform search | |
| results = collection.query(**search_params) | |
| # Format results | |
| formatted_results = [] | |
| if results["documents"] and len(results["documents"]) > 0: | |
| for i in range(len(results["documents"][0])): | |
| result = { | |
| "rank": i + 1, | |
| "content": results["documents"][0][i], | |
| "source_folder": results["metadatas"][0][i].get("source_folder", "unknown"), | |
| "chunk_file": results["metadatas"][0][i].get("chunk_file", "unknown"), | |
| "chunk_number": results["metadatas"][0][i].get("chunk_number", "unknown"), | |
| "similarity_score": float(1 - results["distances"][0][i]), | |
| "distance": float(results["distances"][0][i]), | |
| "chunk_length": results["metadatas"][0][i].get("chunk_length", 0), | |
| "metadata": results["metadatas"][0][i] | |
| } | |
| formatted_results.append(result) | |
| return { | |
| "query": query, | |
| "task_type": task_type, | |
| "num_results": len(formatted_results), | |
| "source_filter": source_filter, | |
| "results": formatted_results, | |
| "success": True | |
| } | |
| except Exception as e: | |
| return {"error": f"Search failed: {str(e)}", "results": [], "success": False} | |
| def get_available_sources() -> Dict[str, Any]: | |
| """Get list of available source folders in the knowledge base""" | |
| try: | |
| # Get all metadata to find unique source folders | |
| all_results = collection.get(include=["metadatas"]) | |
| sources = set() | |
| for metadata in all_results["metadatas"]: | |
| source = metadata.get("source_folder") | |
| if source: | |
| sources.add(source) | |
| # Get statistics for each source | |
| source_stats = {} | |
| for source in sources: | |
| source_results = collection.get( | |
| where={"source_folder": {"$eq": source}}, | |
| include=["metadatas"] | |
| ) | |
| source_stats[source] = len(source_results["metadatas"]) | |
| return { | |
| "sources": sorted(list(sources)), | |
| "source_stats": source_stats, | |
| "total_sources": len(sources), | |
| "total_chunks": collection.count(), | |
| "success": True | |
| } | |
| except Exception as e: | |
| return {"error": f"Failed to get sources: {str(e)}", "sources": [], "success": False} | |
| # MCP Tool Definitions | |
| def search_rs_studies( | |
| query: str, | |
| num_results: int = 5, | |
| source_filter: Optional[str] = None, | |
| task_type: str = "search" | |
| ) -> str: | |
| """ | |
| Search the RS Studies knowledge base for relevant information. | |
| This tool provides semantic search across RS trading system documentation, | |
| Chennai meetup transcripts, and Q&A content with optimized EmbeddingGemma prompts. | |
| Args: | |
| query: Your search question or topic (required) | |
| num_results: Number of results to return (1-50, default: 5) | |
| source_filter: Limit search to specific source: | |
| - 'rs_stkege_01': RS trading system documentation | |
| - 'cheenai_meet_full': Chennai meetup transcripts | |
| - 'QnAYoutubeChannel': Q&A discussions | |
| - None: Search all sources (default) | |
| task_type: Search optimization using EmbeddingGemma task-specific prompts: | |
| - 'search'/'retrieval_query': General search (default) | |
| - 'question'/'question_answering': Question answering format | |
| - 'fact'/'fact_checking': Fact checking format | |
| - 'classification': Text classification tasks | |
| - 'clustering': Document clustering and grouping | |
| - 'semantic_similarity': Semantic similarity assessment | |
| - 'code_retrieval': Code search and retrieval | |
| Returns: | |
| JSON string with search results including content, sources, and similarity scores | |
| """ | |
| # Validate parameters | |
| if not query or not query.strip(): | |
| return json.dumps({"error": "Query cannot be empty", "results": [], "success": False}) | |
| num_results = max(1, min(num_results, config.MAX_NUM_RESULTS)) | |
| if source_filter and source_filter not in config.VALID_SOURCES: | |
| return json.dumps({ | |
| "error": f"Invalid source_filter. Must be one of: {config.VALID_SOURCES}", | |
| "results": [], | |
| "success": False | |
| }) | |
| valid_task_types = list(EmbeddingGemmaPrompts.TASKS.keys()) | |
| if task_type not in valid_task_types: | |
| return json.dumps({ | |
| "error": f"Invalid task_type. Must be one of: {valid_task_types}", | |
| "results": [], | |
| "success": False | |
| }) | |
| # Perform search | |
| results = search_knowledge_base(query, num_results, source_filter, task_type) | |
| return json.dumps(results, indent=2) | |
| def get_rs_sources() -> str: | |
| """ | |
| Get information about available data sources in the RS Studies knowledge base. | |
| Returns: | |
| JSON string with list of available sources, their statistics, and collection info | |
| """ | |
| sources_info = get_available_sources() | |
| return json.dumps(sources_info, indent=2) | |
| def ask_rs_question(question: str, context_size: int = 3) -> str: | |
| """ | |
| Ask a specific question about RS trading systems and get contextual answers. | |
| This is a higher-level tool that searches for relevant information and | |
| provides it in a question-answering format with ranked context. | |
| Args: | |
| question: Your question about RS systems, trading, or related topics | |
| context_size: Number of relevant chunks to include in context (1-10, default: 3) | |
| Returns: | |
| JSON string with the question, relevant context chunks, and analysis | |
| """ | |
| if not question or not question.strip(): | |
| return json.dumps({ | |
| "error": "Question cannot be empty", | |
| "context": [], | |
| "success": False | |
| }) | |
| context_size = max(1, min(context_size, config.MAX_CONTEXT_SIZE)) | |
| # Search for relevant information using question task type | |
| search_results = search_knowledge_base(question, context_size, task_type="question_answering") | |
| if not search_results.get("success", False): | |
| return json.dumps(search_results) | |
| # Format as Q&A response | |
| response = { | |
| "question": question, | |
| "context_chunks": len(search_results.get("results", [])), | |
| "relevant_context": [], | |
| "sources_used": set(), | |
| "success": True | |
| } | |
| for i, result in enumerate(search_results.get("results", [])[:context_size]): | |
| context_item = { | |
| "rank": i + 1, | |
| "content": result["content"], | |
| "source": f"{result['source_folder']} (chunk {result['chunk_number']})", | |
| "relevance_score": f"{result['similarity_score']:.3f}", | |
| "chunk_file": result["chunk_file"] | |
| } | |
| response["relevant_context"].append(context_item) | |
| response["sources_used"].add(result["source_folder"]) | |
| # Convert set to list for JSON serialization | |
| response["sources_used"] = sorted(list(response["sources_used"])) | |
| return json.dumps(response, indent=2) | |
| def get_collection_info() -> str: | |
| """ | |
| Get detailed information about the RS Studies knowledge base collection. | |
| Returns: | |
| JSON string with collection statistics, configuration, and metadata structure | |
| """ | |
| try: | |
| total_count = collection.count() | |
| # Get sample of metadata to understand structure | |
| sample_results = collection.get(limit=10, include=["metadatas"]) | |
| # Analyze metadata structure | |
| metadata_keys = set() | |
| for metadata in sample_results["metadatas"]: | |
| metadata_keys.update(metadata.keys()) | |
| info = { | |
| "collection_name": config.COLLECTION_NAME, | |
| "total_documents": total_count, | |
| "model_path": config.MODEL_PATH, | |
| "device": device, | |
| "metadata_structure": sorted(list(metadata_keys)), | |
| "config": { | |
| "max_results": config.MAX_NUM_RESULTS, | |
| "valid_sources": config.VALID_SOURCES | |
| }, | |
| "success": True | |
| } | |
| return json.dumps(info, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": f"Failed to get collection info: {str(e)}", "success": False}) | |
| def search_by_source(source_name: str, query: str = "", num_results: int = 10) -> str: | |
| """ | |
| Browse or search within a specific data source. | |
| Args: | |
| source_name: Name of the source to search within (use get_rs_sources to see available sources) | |
| query: Optional search query (if empty, returns recent chunks from the source) | |
| num_results: Number of results to return (1-50, default: 10) | |
| Returns: | |
| JSON string with results from the specified source | |
| """ | |
| if source_name not in config.VALID_SOURCES: | |
| return json.dumps({ | |
| "error": f"Invalid source_name. Must be one of: {config.VALID_SOURCES}", | |
| "results": [], | |
| "success": False | |
| }) | |
| num_results = max(1, min(num_results, config.MAX_NUM_RESULTS)) | |
| if query.strip(): | |
| # Search within the source | |
| results = search_knowledge_base(query, num_results, source_name) | |
| else: | |
| # Browse the source (get recent chunks) | |
| if not ensure_initialized(): | |
| return json.dumps({ | |
| "error": "Server not properly initialized", | |
| "results": [], | |
| "success": False | |
| }) | |
| try: | |
| source_results = collection.get( | |
| where={"source_folder": {"$eq": source_name}}, | |
| limit=num_results, | |
| include=["documents", "metadatas"] | |
| ) | |
| formatted_results = [] | |
| for i, (doc, metadata) in enumerate(zip(source_results["documents"], source_results["metadatas"])): | |
| result = { | |
| "rank": i + 1, | |
| "content": doc, | |
| "source_folder": metadata.get("source_folder", "unknown"), | |
| "chunk_file": metadata.get("chunk_file", "unknown"), | |
| "chunk_number": metadata.get("chunk_number", "unknown"), | |
| "chunk_length": metadata.get("chunk_length", 0), | |
| "metadata": metadata | |
| } | |
| formatted_results.append(result) | |
| results = { | |
| "source_name": source_name, | |
| "query": query or "(browsing mode)", | |
| "num_results": len(formatted_results), | |
| "results": formatted_results, | |
| "success": True | |
| } | |
| except Exception as e: | |
| results = { | |
| "error": f"Failed to browse source: {str(e)}", | |
| "results": [], | |
| "success": False | |
| } | |
| return json.dumps(results, indent=2) | |
| def verify_fact_rs(statement: str, num_evidence: int = 3) -> str: | |
| """ | |
| Verify a fact or statement against the RS Studies knowledge base. | |
| This tool uses EmbeddingGemma's fact checking optimization to find evidence | |
| that supports or contradicts the given statement. | |
| Args: | |
| statement: The statement or claim to verify | |
| num_evidence: Number of evidence chunks to return (1-10, default: 3) | |
| Returns: | |
| JSON string with evidence chunks ranked by relevance to the fact claim | |
| """ | |
| if not statement or not statement.strip(): | |
| return json.dumps({ | |
| "error": "Statement cannot be empty", | |
| "evidence": [], | |
| "success": False | |
| }) | |
| num_evidence = max(1, min(num_evidence, config.MAX_CONTEXT_SIZE)) | |
| # Search for evidence using fact checking optimization | |
| search_results = search_knowledge_base(statement, num_evidence, task_type="fact_checking") | |
| if not search_results.get("success", False): | |
| return json.dumps(search_results) | |
| # Format as fact verification response | |
| response = { | |
| "statement": statement, | |
| "evidence_count": len(search_results.get("results", [])), | |
| "evidence": [], | |
| "sources_consulted": set(), | |
| "success": True | |
| } | |
| for i, result in enumerate(search_results.get("results", [])): | |
| evidence_item = { | |
| "rank": i + 1, | |
| "content": result["content"], | |
| "source": f"{result['source_folder']} (chunk {result['chunk_number']})", | |
| "relevance_score": f"{result['similarity_score']:.3f}", | |
| "chunk_file": result["chunk_file"] | |
| } | |
| response["evidence"].append(evidence_item) | |
| response["sources_consulted"].add(result["source_folder"]) | |
| # Convert set to list for JSON serialization | |
| response["sources_consulted"] = sorted(list(response["sources_consulted"])) | |
| return json.dumps(response, indent=2) | |
| def compare_similarity_rs(text1: str, text2: str, context_size: int = 5) -> str: | |
| """ | |
| Compare semantic similarity between two concepts in the RS Studies context. | |
| This tool finds content related to both concepts and assesses their relationship | |
| using EmbeddingGemma's semantic similarity optimization. | |
| Args: | |
| text1: First concept, topic, or text to compare | |
| text2: Second concept, topic, or text to compare | |
| context_size: Number of relevant chunks to analyze for each concept (1-10, default: 5) | |
| Returns: | |
| JSON string with related content for both concepts and similarity analysis | |
| """ | |
| if not text1 or not text1.strip() or not text2 or not text2.strip(): | |
| return json.dumps({ | |
| "error": "Both text1 and text2 must be provided", | |
| "analysis": {}, | |
| "success": False | |
| }) | |
| context_size = max(1, min(context_size, config.MAX_CONTEXT_SIZE)) | |
| # Search for content related to each concept using semantic similarity optimization | |
| results1 = search_knowledge_base(text1, context_size, task_type="semantic_similarity") | |
| results2 = search_knowledge_base(text2, context_size, task_type="semantic_similarity") | |
| if not results1.get("success", False) or not results2.get("success", False): | |
| return json.dumps({ | |
| "error": "Failed to search for one or both concepts", | |
| "analysis": {}, | |
| "success": False | |
| }) | |
| # Analyze overlap and differences | |
| sources1 = set(r["source_folder"] for r in results1.get("results", [])) | |
| sources2 = set(r["source_folder"] for r in results2.get("results", [])) | |
| response = { | |
| "concept1": text1, | |
| "concept2": text2, | |
| "concept1_results": len(results1.get("results", [])), | |
| "concept2_results": len(results2.get("results", [])), | |
| "shared_sources": sorted(list(sources1.intersection(sources2))), | |
| "concept1_unique_sources": sorted(list(sources1 - sources2)), | |
| "concept2_unique_sources": sorted(list(sources2 - sources1)), | |
| "concept1_context": [ | |
| { | |
| "rank": i + 1, | |
| "content": r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"], | |
| "source": f"{r['source_folder']} (chunk {r['chunk_number']})", | |
| "relevance": f"{r['similarity_score']:.3f}" | |
| } | |
| for i, r in enumerate(results1.get("results", [])) | |
| ], | |
| "concept2_context": [ | |
| { | |
| "rank": i + 1, | |
| "content": r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"], | |
| "source": f"{r['source_folder']} (chunk {r['chunk_number']})", | |
| "relevance": f"{r['similarity_score']:.3f}" | |
| } | |
| for i, r in enumerate(results2.get("results", [])) | |
| ], | |
| "success": True | |
| } | |
| return json.dumps(response, indent=2) | |
| def classify_content_rs(content: str, categories: List[str] = None) -> str: | |
| """ | |
| Classify content against RS Studies knowledge categories. | |
| Uses EmbeddingGemma's classification optimization to categorize content | |
| based on the RS Studies knowledge base. | |
| Args: | |
| content: Text content to classify | |
| categories: Optional list of specific categories to check against | |
| (defaults to major RS topics) | |
| Returns: | |
| JSON string with classification results and supporting evidence | |
| """ | |
| if not content or not content.strip(): | |
| return json.dumps({ | |
| "error": "Content cannot be empty", | |
| "classification": {}, | |
| "success": False | |
| }) | |
| # Default categories based on RS Studies sources | |
| if categories is None: | |
| categories = [ | |
| "trading systems", | |
| "market analysis", | |
| "Chennai meetup discussions", | |
| "Q&A topics", | |
| "technical strategies" | |
| ] | |
| # Search for similar content using classification optimization | |
| search_results = search_knowledge_base(content, 8, task_type="classification") | |
| if not search_results.get("success", False): | |
| return json.dumps(search_results) | |
| # Analyze which categories the content best fits | |
| source_distribution = {} | |
| for result in search_results.get("results", []): | |
| source = result["source_folder"] | |
| if source not in source_distribution: | |
| source_distribution[source] = [] | |
| source_distribution[source].append({ | |
| "content": result["content"][:150] + "..." if len(result["content"]) > 150 else result["content"], | |
| "similarity": result["similarity_score"] | |
| }) | |
| response = { | |
| "content": content[:200] + "..." if len(content) > 200 else content, | |
| "available_categories": categories, | |
| "source_distribution": source_distribution, | |
| "top_matches": [ | |
| { | |
| "rank": i + 1, | |
| "content": r["content"][:150] + "..." if len(r["content"]) > 150 else r["content"], | |
| "source_category": r["source_folder"], | |
| "similarity_score": f"{r['similarity_score']:.3f}" | |
| } | |
| for i, r in enumerate(search_results.get("results", [])[:5]) | |
| ], | |
| "success": True | |
| } | |
| return json.dumps(response, indent=2) | |
| # ================================================== | |
| # QnA-ENHANCED EMBEDDING TOOLS | |
| # ================================================== | |
| def search_by_embedding_type( | |
| query: str, | |
| embedding_type: str = "content", | |
| num_results: int = 5, | |
| source_filter: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Search the knowledge base using specific embedding types for optimized retrieval. | |
| This tool leverages the QnA-enhanced embeddings to provide targeted search | |
| based on different content representations of the same chunks. | |
| Args: | |
| query: Your search question or topic (required) | |
| embedding_type: Type of embedding to search: | |
| - 'content': Original chunk content (default) | |
| - 'enhanced_content': Content enhanced with QnA context | |
| - 'questions': Questions-only embeddings for question matching | |
| - 'answers': Answers-only embeddings for factual retrieval | |
| num_results: Number of results to return (1-50, default: 5) | |
| source_filter: Limit to specific source folder (optional) | |
| Returns: | |
| JSON string with search results optimized for the specified embedding type | |
| """ | |
| # Validate parameters | |
| if not query or not query.strip(): | |
| return json.dumps({"error": "Query cannot be empty", "results": [], "success": False}) | |
| valid_embedding_types = ["content", "enhanced_content", "questions", "answers"] | |
| if embedding_type not in valid_embedding_types: | |
| return json.dumps({ | |
| "error": f"Invalid embedding_type. Must be one of: {valid_embedding_types}", | |
| "results": [], | |
| "success": False | |
| }) | |
| num_results = max(1, min(num_results, config.MAX_NUM_RESULTS)) | |
| try: | |
| # Format query appropriately based on embedding type | |
| if embedding_type == "questions": | |
| formatted_query = EmbeddingGemmaPrompts.encode_query(query, "question_answering") | |
| elif embedding_type == "answers": | |
| formatted_query = EmbeddingGemmaPrompts.encode_query(query, "fact_checking") | |
| else: | |
| formatted_query = EmbeddingGemmaPrompts.encode_query(query, "search") | |
| # Create query embedding | |
| query_embedding = model.encode([formatted_query], device=device) | |
| # Build where clause to filter by embedding type | |
| where_clause = {"embedding_type": embedding_type} | |
| if source_filter: | |
| where_clause["source_folder"] = source_filter | |
| # Query ChromaDB | |
| search_results = collection.query( | |
| query_embeddings=query_embedding.tolist(), | |
| n_results=num_results, | |
| where=where_clause | |
| ) | |
| # Format results | |
| results = [] | |
| for i, (doc, metadata, distance) in enumerate(zip( | |
| search_results['documents'][0], | |
| search_results['metadatas'][0], | |
| search_results['distances'][0] | |
| )): | |
| results.append({ | |
| "rank": i + 1, | |
| "content": doc, | |
| "similarity_score": 1 - distance, | |
| "embedding_type": metadata.get("embedding_type", "unknown"), | |
| "enhanced": metadata.get("enhanced", False), | |
| "qna_count": metadata.get("qna_count", 0), | |
| "source_folder": metadata.get("source_folder", "unknown"), | |
| "chunk_number": metadata.get("chunk_number", "unknown"), | |
| "chunk_file": metadata.get("chunk_file", "unknown") | |
| }) | |
| return json.dumps({ | |
| "query": query, | |
| "embedding_type": embedding_type, | |
| "results_found": len(results), | |
| "source_filter": source_filter, | |
| "results": results, | |
| "success": True | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "error": f"Search failed: {str(e)}", | |
| "query": query, | |
| "embedding_type": embedding_type, | |
| "results": [], | |
| "success": False | |
| }) | |
| def smart_multi_search( | |
| query: str, | |
| num_results_per_type: int = 3, | |
| source_filter: Optional[str] = None, | |
| combine_strategy: str = "best_of_each" | |
| ) -> str: | |
| """ | |
| Perform intelligent multi-type search across different embedding types. | |
| This tool searches across multiple embedding types and combines results | |
| to provide comprehensive coverage of relevant information. | |
| Args: | |
| query: Your search question or topic (required) | |
| num_results_per_type: Results per embedding type (1-10, default: 3) | |
| source_filter: Limit to specific source folder (optional) | |
| combine_strategy: How to combine results: | |
| - 'best_of_each': Top results from each type | |
| - 'relevance_ranked': All results ranked by similarity | |
| - 'type_weighted': Weighted by embedding type appropriateness | |
| Returns: | |
| JSON string with combined search results and analysis | |
| """ | |
| if not query or not query.strip(): | |
| return json.dumps({"error": "Query cannot be empty", "results": [], "success": False}) | |
| num_results_per_type = max(1, min(num_results_per_type, 10)) | |
| try: | |
| all_results = {} | |
| embedding_types = ["content", "enhanced_content", "questions", "answers"] | |
| # Search each embedding type | |
| for emb_type in embedding_types: | |
| search_result = search_by_embedding_type( | |
| query, emb_type, num_results_per_type, source_filter | |
| ) | |
| result_data = json.loads(search_result) | |
| if result_data.get("success", False): | |
| all_results[emb_type] = result_data["results"] | |
| else: | |
| all_results[emb_type] = [] | |
| # Combine results based on strategy | |
| combined_results = [] | |
| if combine_strategy == "best_of_each": | |
| # Take top result from each type | |
| for emb_type, results in all_results.items(): | |
| for result in results: | |
| result["search_type"] = emb_type | |
| combined_results.append(result) | |
| elif combine_strategy == "relevance_ranked": | |
| # Combine all and sort by similarity | |
| for emb_type, results in all_results.items(): | |
| for result in results: | |
| result["search_type"] = emb_type | |
| combined_results.append(result) | |
| combined_results.sort(key=lambda x: x["similarity_score"], reverse=True) | |
| elif combine_strategy == "type_weighted": | |
| # Apply weights based on query type analysis | |
| query_lower = query.lower() | |
| # Simple heuristics for weighting | |
| weights = { | |
| "content": 1.0, | |
| "enhanced_content": 1.2, # Slightly favor enhanced | |
| "questions": 1.5 if any(word in query_lower for word in ["what", "how", "why", "when", "where", "?"]) else 0.8, | |
| "answers": 1.3 if any(word in query_lower for word in ["define", "explain", "meaning", "is"]) else 0.9 | |
| } | |
| for emb_type, results in all_results.items(): | |
| for result in results: | |
| result["search_type"] = emb_type | |
| result["weighted_score"] = result["similarity_score"] * weights[emb_type] | |
| combined_results.append(result) | |
| combined_results.sort(key=lambda x: x["weighted_score"], reverse=True) | |
| # Deduplicate by chunk (keep best scoring version) | |
| seen_chunks = {} | |
| final_results = [] | |
| for result in combined_results: | |
| chunk_key = f"{result['source_folder']}_chunk_{result['chunk_number']}" | |
| if chunk_key not in seen_chunks or result["similarity_score"] > seen_chunks[chunk_key]["similarity_score"]: | |
| seen_chunks[chunk_key] = result | |
| final_results = list(seen_chunks.values()) | |
| final_results.sort(key=lambda x: x.get("weighted_score", x["similarity_score"]), reverse=True) | |
| # Add ranking | |
| for i, result in enumerate(final_results): | |
| result["final_rank"] = i + 1 | |
| return json.dumps({ | |
| "query": query, | |
| "combine_strategy": combine_strategy, | |
| "total_results": len(final_results), | |
| "embedding_types_searched": embedding_types, | |
| "results_per_type": {emb_type: len(results) for emb_type, results in all_results.items()}, | |
| "source_filter": source_filter, | |
| "results": final_results[:num_results_per_type * 2], # Limit final output | |
| "success": True | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "error": f"Multi-search failed: {str(e)}", | |
| "query": query, | |
| "results": [], | |
| "success": False | |
| }) | |
| def analyze_embedding_coverage(source_filter: Optional[str] = None) -> str: | |
| """ | |
| Analyze the distribution and coverage of different embedding types in the knowledge base. | |
| Args: | |
| source_filter: Limit analysis to specific source folder (optional) | |
| Returns: | |
| JSON string with embedding type statistics and coverage analysis | |
| """ | |
| try: | |
| # Build where clause | |
| where_clause = {} | |
| if source_filter: | |
| where_clause["source_folder"] = source_filter | |
| # Get all documents with metadata | |
| if where_clause: | |
| all_docs = collection.get(where=where_clause) | |
| else: | |
| all_docs = collection.get() | |
| # Analyze embedding types | |
| type_counts = {} | |
| enhanced_counts = {"enhanced": 0, "original": 0} | |
| source_breakdown = {} | |
| qna_stats = {"with_qna": 0, "without_qna": 0} | |
| for metadata in all_docs['metadatas']: | |
| emb_type = metadata.get('embedding_type', 'unknown') | |
| type_counts[emb_type] = type_counts.get(emb_type, 0) + 1 | |
| # Enhanced vs original | |
| if metadata.get('enhanced', False): | |
| enhanced_counts["enhanced"] += 1 | |
| else: | |
| enhanced_counts["original"] += 1 | |
| # Source breakdown | |
| source = metadata.get('source_folder', 'unknown') | |
| if source not in source_breakdown: | |
| source_breakdown[source] = {} | |
| source_breakdown[source][emb_type] = source_breakdown[source].get(emb_type, 0) + 1 | |
| # QnA statistics | |
| if metadata.get('qna_count', 0) > 0: | |
| qna_stats["with_qna"] += 1 | |
| else: | |
| qna_stats["without_qna"] += 1 | |
| total_embeddings = len(all_docs['metadatas']) | |
| return json.dumps({ | |
| "total_embeddings": total_embeddings, | |
| "source_filter": source_filter, | |
| "embedding_type_distribution": type_counts, | |
| "enhancement_status": enhanced_counts, | |
| "qna_coverage": qna_stats, | |
| "source_breakdown": source_breakdown, | |
| "coverage_percentage": { | |
| emb_type: round((count / total_embeddings) * 100, 2) | |
| for emb_type, count in type_counts.items() | |
| }, | |
| "success": True | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "error": f"Analysis failed: {str(e)}", | |
| "analysis": {}, | |
| "success": False | |
| }) | |
| def find_related_questions( | |
| topic: str, | |
| num_questions: int = 5, | |
| source_filter: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Find questions related to a specific topic using the questions-only embeddings. | |
| This tool is optimized for discovering what questions are available about | |
| a topic, useful for exploration and understanding coverage. | |
| Args: | |
| topic: Topic or concept to find questions about (required) | |
| num_questions: Number of related questions to return (1-20, default: 5) | |
| source_filter: Limit to specific source folder (optional) | |
| Returns: | |
| JSON string with related questions and their context | |
| """ | |
| if not topic or not topic.strip(): | |
| return json.dumps({"error": "Topic cannot be empty", "questions": [], "success": False}) | |
| num_questions = max(1, min(num_questions, 20)) | |
| try: | |
| # Search using questions-only embeddings | |
| question_search = search_by_embedding_type( | |
| topic, "questions", num_questions, source_filter | |
| ) | |
| search_data = json.loads(question_search) | |
| if not search_data.get("success", False): | |
| return json.dumps({ | |
| "error": "Failed to search questions", | |
| "topic": topic, | |
| "questions": [], | |
| "success": False | |
| }) | |
| # Extract questions and add context | |
| questions = [] | |
| for result in search_data["results"]: | |
| # Parse the questions from the content (format: "Q1 | Q2 | Q3") | |
| question_list = [q.strip() for q in result["content"].split("|")] | |
| for question in question_list: | |
| if question: # Skip empty questions | |
| questions.append({ | |
| "question": question, | |
| "relevance_score": result["similarity_score"], | |
| "source": f"{result['source_folder']} (chunk {result['chunk_number']})", | |
| "chunk_file": result["chunk_file"], | |
| "qna_count": result.get("qna_count", 0) | |
| }) | |
| # Sort by relevance and limit | |
| questions.sort(key=lambda x: x["relevance_score"], reverse=True) | |
| questions = questions[:num_questions] | |
| # Add ranking | |
| for i, q in enumerate(questions): | |
| q["rank"] = i + 1 | |
| return json.dumps({ | |
| "topic": topic, | |
| "total_questions_found": len(questions), | |
| "source_filter": source_filter, | |
| "questions": questions, | |
| "success": True | |
| }, indent=2) | |
| except Exception as e: | |
| return json.dumps({ | |
| "error": f"Question search failed: {str(e)}", | |
| "topic": topic, | |
| "questions": [], | |
| "success": False | |
| }) | |
| with gr.Blocks() as demo: | |
| gr.Markdown( | |
| """ | |
| This is a MCP only tool for RS Studies | |
| This connects to a remote chromadb instance. | |
| This tool is MCP-only, so it does not have a UI. | |
| """ | |
| ) | |
| gr.api( | |
| search_rs_studies | |
| ) | |
| gr.api( | |
| get_rs_sources | |
| ) | |
| gr.api( | |
| ask_rs_question | |
| ) | |
| gr.api( | |
| search_by_source | |
| ) | |
| gr.api( | |
| verify_fact_rs | |
| ) | |
| gr.api( | |
| compare_similarity_rs | |
| ) | |
| gr.api( | |
| classify_content_rs | |
| ) | |
| gr.api( | |
| search_by_embedding_type | |
| ) | |
| gr.api( | |
| smart_multi_search | |
| ) | |
| gr.api( | |
| analyze_embedding_coverage | |
| ) | |
| gr.api( | |
| find_related_questions | |
| ) | |
| _, url, _ = demo.launch(mcp_server=True) |