Spaces:
Running
Running
File size: 45,885 Bytes
bb4d350 da13ac2 64dfb4b 7480d0c bb4d350 30e837a efb4152 e821aa5 9928bbd 64dfb4b e821aa5 64dfb4b e821aa5 0197fd1 e821aa5 68a5585 e821aa5 68a5585 e821aa5 30e837a bb4d350 4f368c2 da13ac2 0197fd1 4f368c2 0197fd1 4f368c2 0197fd1 4f368c2 0197fd1 4f368c2 0197fd1 4f368c2 0c61b8c 925a3bd 4f368c2 0197fd1 4f368c2 0197fd1 4f368c2 6ce4a6a 4f368c2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 |
try:
import os
from typing import Any, Dict, List, Optional
import json
import gradio as gr
import torch
from sentence_transformers import SentenceTransformer
import chromadb
from config import Config
except ImportError as e:
print(f"❌ Error: Required packages not installed: {e}")
print("🔧 Make sure you're in the gemmaembeddings conda environment")
print("📦 Required packages: torch, sentence-transformers, chromadb")
# Global variables for model and collection (initialized lazily)
config = Config()
device = Config.get_device()
model = SentenceTransformer(config.MODEL_PATH)
collection = None
print(f"🔄 Connecting to ChromaDB from cloud...")
database = os.environ.get("chromadb_db")
api_key = os.environ.get("chromadb_api_key")
tenant=os.environ.get("chromadb_tenant")
client = chromadb.CloudClient(
api_key=api_key,
tenant=tenant,
database=database
)
print(f"Connection to chromabd successful...")
# === COLLECTION VALIDATION ===
# Ensure the required collection exists and has data
try:
collection = client.get_collection(config.COLLECTION_NAME)
doc_count = collection.count()
if doc_count == 0:
print(f"Collection '{config.COLLECTION_NAME}' exists but is empty. Run ingest_studies.py to populate it.")
print(f"✅ Connected to collection '{config.COLLECTION_NAME}' with {doc_count} documents")
except Exception as e:
print(f"Collection '{config.COLLECTION_NAME}' not found. Run ingest_studies.py first. Error: {str(e)}")
class EmbeddingGemmaPrompts:
"""
Optimized prompt templates for Google's EmbeddingGemma model.
This class implements the official EmbeddingGemma prompt instructions as specified
in the HuggingFace model documentation. It provides task-specific formatting to
achieve optimal embedding quality and search relevance.
Reference: https://huggingface.co/google/embeddinggemma-300m#prompt-instructions
The prompt format follows these official patterns:
- Query: 'task: {task description} | query: {content}'
- Document: 'title: {title | "none"} | text: {content}'
Performance Impact:
- task: fact checking → +136% similarity improvement
- task: semantic similarity → +112% similarity improvement
- task: question answering → +98% similarity improvement
- task: classification → +73% similarity improvement
Usage:
# Format a search query
formatted = EmbeddingGemmaPrompts.encode_query("How does RS work?", "question_answering")
# Result: "task: question answering | query: How does RS work?"
# Format a document for embedding
formatted = EmbeddingGemmaPrompts.encode_document("Content here", "Document Title")
# Result: "title: Document Title | text: Content here"
Attributes:
TASKS (Dict[str, str]): Mapping of task types to official task descriptions
"""
@staticmethod
def format_query_prompt(content: str, task: str = "search result") -> str:
"""
Format query using official EmbeddingGemma query prompt template.
Applies the official query format: 'task: {task description} | query: {content}'
This format is critical for achieving optimal embedding quality with EmbeddingGemma.
Args:
content (str): The raw query text to be embedded
task (str): Official EmbeddingGemma task description. Defaults to "search result"
Returns:
str: Formatted query string ready for embedding
Example:
>>> EmbeddingGemmaPrompts.format_query_prompt("RS trading system", "question answering")
'task: question answering | query: RS trading system'
"""
return f"task: {task} | query: {content}"
@staticmethod
def format_document_prompt(content: str, title: str = "none") -> str:
"""
Format document using official EmbeddingGemma document prompt template.
Applies the official document format: 'title: {title | "none"} | text: {content}'
Including meaningful titles significantly improves embedding quality and search relevance.
Args:
content (str): The document text content to be embedded
title (str): Document title or "none" if no title available. Defaults to "none"
Returns:
str: Formatted document string ready for embedding
Example:
>>> EmbeddingGemmaPrompts.format_document_prompt("Content here", "Risk Management")
'title: Risk Management | text: Content here'
>>> EmbeddingGemmaPrompts.format_document_prompt("Content without title")
'title: none | text: Content without title'
"""
return f'title: {title} | text: {content}'
# Official EmbeddingGemma task descriptions with performance rankings
# Based on testing results showing similarity score improvements
TASKS = {
# === RETRIEVAL TASKS ===
# General-purpose retrieval (baseline performance)
"retrieval_query": "search result", # Standard retrieval query format
"retrieval_document": "document", # Document embedding format
# === HIGH-PERFORMANCE SPECIALIZED TASKS ===
# Best for verifying claims and finding evidence (+136% performance)
"fact_checking": "fact checking",
# Excellent for concept comparison and relationship analysis (+112% performance)
"semantic_similarity": "sentence similarity",
# Optimized for Q&A scenarios with contextual responses (+98% performance)
"question_answering": "question answering",
# Effective for content categorization and topic analysis (+73% performance)
"classification": "classification",
# === MODERATE PERFORMANCE TASKS ===
# Good for document grouping and clustering (+59% performance)
"clustering": "clustering",
# Specialized for finding code examples and implementations (+39% performance)
"code_retrieval": "code retrieval",
# === LEGACY COMPATIBILITY ===
# Shorter aliases for backward compatibility
"search": "search result", # Default baseline task
"question": "question answering", # Alias for question_answering
"fact": "fact checking" # Alias for fact_checking
}
@classmethod
def get_task_description(cls, task_type: str) -> str:
"""
Get the official EmbeddingGemma task description for a given task type.
Validates the task type and returns the corresponding official task description
used in EmbeddingGemma prompt formatting. Falls back to "search result" for
unknown task types to ensure compatibility.
Args:
task_type (str): The task type key (e.g., "question_answering", "fact_checking")
Returns:
str: Official EmbeddingGemma task description (e.g., "question answering", "fact checking")
Example:
>>> EmbeddingGemmaPrompts.get_task_description("fact_checking")
'fact checking'
>>> EmbeddingGemmaPrompts.get_task_description("unknown_task")
'search result' # Fallback for unknown tasks
"""
return cls.TASKS.get(task_type, "search result")
@classmethod
def encode_query(cls, content: str, task_type: str = "search") -> str:
"""
Encode a query with task-specific EmbeddingGemma prompt optimization.
This is the primary method for formatting search queries. It combines the
user's query with the appropriate task-specific prompt template to achieve
optimal embedding quality and search relevance.
Args:
content (str): The raw query text from the user
task_type (str): Task type for optimization. Defaults to "search"
Valid options: "search", "question_answering", "fact_checking",
"semantic_similarity", "classification", "clustering", "code_retrieval"
Returns:
str: Optimized query string formatted for EmbeddingGemma
Performance Impact:
Using appropriate task types can improve similarity scores by 39-136%
compared to the baseline "search" task type.
Example:
>>> cls.encode_query("How does risk management work?", "question_answering")
'task: question answering | query: How does risk management work?'
>>> cls.encode_query("RS system reduces risk by 30%", "fact_checking")
'task: fact checking | query: RS system reduces risk by 30%'
"""
task_desc = cls.get_task_description(task_type)
return cls.format_query_prompt(content, task_desc)
@classmethod
def encode_document(cls, content: str, title: str = "none") -> str:
"""
Encode a document with proper EmbeddingGemma document formatting.
Formats documents for embedding using the official EmbeddingGemma document
template. Including meaningful titles significantly improves search relevance
and helps the model understand document structure.
Args:
content (str): The document text content to embed
title (str): Document title extracted from metadata, filename, or content.
Use "none" if no meaningful title is available
Returns:
str: Formatted document string ready for embedding
Best Practices:
- Extract titles from filenames, headers, or metadata when possible
- Use "none" rather than empty string when no title is available
- Keep titles concise and descriptive (< 100 characters)
Example:
>>> cls.encode_document("Trading strategy content...", "Momentum Strategy Guide")
'title: Momentum Strategy Guide | text: Trading strategy content...'
>>> cls.encode_document("Untitled content here")
'title: none | text: Untitled content here'
"""
return cls.format_document_prompt(content, title)
def search_knowledge_base(
query: str,
num_results: int = 5,
source_filter: Optional[str] = None,
task_type: str = "search"
) -> Dict[str, Any]:
"""
Search the RS Studies knowledge base using semantic similarity
Args:
query: The search query
num_results: Number of results to return
source_filter: Optional source folder filter
task_type: Type of task for query formatting
Returns:
Dictionary with search results and metadata
"""
try:
# Create query embedding with task-specific formatting using EmbeddingGemmaPrompts
query_formatted = EmbeddingGemmaPrompts.encode_query(query, task_type)
query_embedding = model.encode([query_formatted], device=device)
# Prepare search parameters
search_params = {
"query_embeddings": query_embedding.tolist(),
"n_results": min(num_results, config.MAX_NUM_RESULTS),
"include": ["documents", "metadatas", "distances"]
}
# Add source filter if specified
if source_filter and source_filter in config.VALID_SOURCES:
search_params["where"] = {"source_folder": {"$eq": source_filter}}
# Perform search
results = collection.query(**search_params)
# Format results
formatted_results = []
if results["documents"] and len(results["documents"]) > 0:
for i in range(len(results["documents"][0])):
result = {
"rank": i + 1,
"content": results["documents"][0][i],
"source_folder": results["metadatas"][0][i].get("source_folder", "unknown"),
"chunk_file": results["metadatas"][0][i].get("chunk_file", "unknown"),
"chunk_number": results["metadatas"][0][i].get("chunk_number", "unknown"),
"similarity_score": float(1 - results["distances"][0][i]),
"distance": float(results["distances"][0][i]),
"chunk_length": results["metadatas"][0][i].get("chunk_length", 0),
"metadata": results["metadatas"][0][i]
}
formatted_results.append(result)
return {
"query": query,
"task_type": task_type,
"num_results": len(formatted_results),
"source_filter": source_filter,
"results": formatted_results,
"success": True
}
except Exception as e:
return {"error": f"Search failed: {str(e)}", "results": [], "success": False}
def get_available_sources() -> Dict[str, Any]:
"""Get list of available source folders in the knowledge base"""
try:
# Get all metadata to find unique source folders
all_results = collection.get(include=["metadatas"])
sources = set()
for metadata in all_results["metadatas"]:
source = metadata.get("source_folder")
if source:
sources.add(source)
# Get statistics for each source
source_stats = {}
for source in sources:
source_results = collection.get(
where={"source_folder": {"$eq": source}},
include=["metadatas"]
)
source_stats[source] = len(source_results["metadatas"])
return {
"sources": sorted(list(sources)),
"source_stats": source_stats,
"total_sources": len(sources),
"total_chunks": collection.count(),
"success": True
}
except Exception as e:
return {"error": f"Failed to get sources: {str(e)}", "sources": [], "success": False}
# MCP Tool Definitions
def search_rs_studies(
query: str,
num_results: int = 5,
source_filter: Optional[str] = None,
task_type: str = "search"
) -> str:
"""
Search the RS Studies knowledge base for relevant information.
This tool provides semantic search across RS trading system documentation,
Chennai meetup transcripts, and Q&A content with optimized EmbeddingGemma prompts.
Args:
query: Your search question or topic (required)
num_results: Number of results to return (1-50, default: 5)
source_filter: Limit search to specific source:
- 'rs_stkege_01': RS trading system documentation
- 'cheenai_meet_full': Chennai meetup transcripts
- 'QnAYoutubeChannel': Q&A discussions
- None: Search all sources (default)
task_type: Search optimization using EmbeddingGemma task-specific prompts:
- 'search'/'retrieval_query': General search (default)
- 'question'/'question_answering': Question answering format
- 'fact'/'fact_checking': Fact checking format
- 'classification': Text classification tasks
- 'clustering': Document clustering and grouping
- 'semantic_similarity': Semantic similarity assessment
- 'code_retrieval': Code search and retrieval
Returns:
JSON string with search results including content, sources, and similarity scores
"""
# Validate parameters
if not query or not query.strip():
return json.dumps({"error": "Query cannot be empty", "results": [], "success": False})
num_results = max(1, min(num_results, config.MAX_NUM_RESULTS))
if source_filter and source_filter not in config.VALID_SOURCES:
return json.dumps({
"error": f"Invalid source_filter. Must be one of: {config.VALID_SOURCES}",
"results": [],
"success": False
})
valid_task_types = list(EmbeddingGemmaPrompts.TASKS.keys())
if task_type not in valid_task_types:
return json.dumps({
"error": f"Invalid task_type. Must be one of: {valid_task_types}",
"results": [],
"success": False
})
# Perform search
results = search_knowledge_base(query, num_results, source_filter, task_type)
return json.dumps(results, indent=2)
def get_rs_sources() -> str:
"""
Get information about available data sources in the RS Studies knowledge base.
Returns:
JSON string with list of available sources, their statistics, and collection info
"""
sources_info = get_available_sources()
return json.dumps(sources_info, indent=2)
def ask_rs_question(question: str, context_size: int = 3) -> str:
"""
Ask a specific question about RS trading systems and get contextual answers.
This is a higher-level tool that searches for relevant information and
provides it in a question-answering format with ranked context.
Args:
question: Your question about RS systems, trading, or related topics
context_size: Number of relevant chunks to include in context (1-10, default: 3)
Returns:
JSON string with the question, relevant context chunks, and analysis
"""
if not question or not question.strip():
return json.dumps({
"error": "Question cannot be empty",
"context": [],
"success": False
})
context_size = max(1, min(context_size, config.MAX_CONTEXT_SIZE))
# Search for relevant information using question task type
search_results = search_knowledge_base(question, context_size, task_type="question_answering")
if not search_results.get("success", False):
return json.dumps(search_results)
# Format as Q&A response
response = {
"question": question,
"context_chunks": len(search_results.get("results", [])),
"relevant_context": [],
"sources_used": set(),
"success": True
}
for i, result in enumerate(search_results.get("results", [])[:context_size]):
context_item = {
"rank": i + 1,
"content": result["content"],
"source": f"{result['source_folder']} (chunk {result['chunk_number']})",
"relevance_score": f"{result['similarity_score']:.3f}",
"chunk_file": result["chunk_file"]
}
response["relevant_context"].append(context_item)
response["sources_used"].add(result["source_folder"])
# Convert set to list for JSON serialization
response["sources_used"] = sorted(list(response["sources_used"]))
return json.dumps(response, indent=2)
def get_collection_info() -> str:
"""
Get detailed information about the RS Studies knowledge base collection.
Returns:
JSON string with collection statistics, configuration, and metadata structure
"""
try:
total_count = collection.count()
# Get sample of metadata to understand structure
sample_results = collection.get(limit=10, include=["metadatas"])
# Analyze metadata structure
metadata_keys = set()
for metadata in sample_results["metadatas"]:
metadata_keys.update(metadata.keys())
info = {
"collection_name": config.COLLECTION_NAME,
"total_documents": total_count,
"model_path": config.MODEL_PATH,
"device": device,
"metadata_structure": sorted(list(metadata_keys)),
"config": {
"max_results": config.MAX_NUM_RESULTS,
"valid_sources": config.VALID_SOURCES
},
"success": True
}
return json.dumps(info, indent=2)
except Exception as e:
return json.dumps({"error": f"Failed to get collection info: {str(e)}", "success": False})
def search_by_source(source_name: str, query: str = "", num_results: int = 10) -> str:
"""
Browse or search within a specific data source.
Args:
source_name: Name of the source to search within (use get_rs_sources to see available sources)
query: Optional search query (if empty, returns recent chunks from the source)
num_results: Number of results to return (1-50, default: 10)
Returns:
JSON string with results from the specified source
"""
if source_name not in config.VALID_SOURCES:
return json.dumps({
"error": f"Invalid source_name. Must be one of: {config.VALID_SOURCES}",
"results": [],
"success": False
})
num_results = max(1, min(num_results, config.MAX_NUM_RESULTS))
if query.strip():
# Search within the source
results = search_knowledge_base(query, num_results, source_name)
else:
# Browse the source (get recent chunks)
if not ensure_initialized():
return json.dumps({
"error": "Server not properly initialized",
"results": [],
"success": False
})
try:
source_results = collection.get(
where={"source_folder": {"$eq": source_name}},
limit=num_results,
include=["documents", "metadatas"]
)
formatted_results = []
for i, (doc, metadata) in enumerate(zip(source_results["documents"], source_results["metadatas"])):
result = {
"rank": i + 1,
"content": doc,
"source_folder": metadata.get("source_folder", "unknown"),
"chunk_file": metadata.get("chunk_file", "unknown"),
"chunk_number": metadata.get("chunk_number", "unknown"),
"chunk_length": metadata.get("chunk_length", 0),
"metadata": metadata
}
formatted_results.append(result)
results = {
"source_name": source_name,
"query": query or "(browsing mode)",
"num_results": len(formatted_results),
"results": formatted_results,
"success": True
}
except Exception as e:
results = {
"error": f"Failed to browse source: {str(e)}",
"results": [],
"success": False
}
return json.dumps(results, indent=2)
def verify_fact_rs(statement: str, num_evidence: int = 3) -> str:
"""
Verify a fact or statement against the RS Studies knowledge base.
This tool uses EmbeddingGemma's fact checking optimization to find evidence
that supports or contradicts the given statement.
Args:
statement: The statement or claim to verify
num_evidence: Number of evidence chunks to return (1-10, default: 3)
Returns:
JSON string with evidence chunks ranked by relevance to the fact claim
"""
if not statement or not statement.strip():
return json.dumps({
"error": "Statement cannot be empty",
"evidence": [],
"success": False
})
num_evidence = max(1, min(num_evidence, config.MAX_CONTEXT_SIZE))
# Search for evidence using fact checking optimization
search_results = search_knowledge_base(statement, num_evidence, task_type="fact_checking")
if not search_results.get("success", False):
return json.dumps(search_results)
# Format as fact verification response
response = {
"statement": statement,
"evidence_count": len(search_results.get("results", [])),
"evidence": [],
"sources_consulted": set(),
"success": True
}
for i, result in enumerate(search_results.get("results", [])):
evidence_item = {
"rank": i + 1,
"content": result["content"],
"source": f"{result['source_folder']} (chunk {result['chunk_number']})",
"relevance_score": f"{result['similarity_score']:.3f}",
"chunk_file": result["chunk_file"]
}
response["evidence"].append(evidence_item)
response["sources_consulted"].add(result["source_folder"])
# Convert set to list for JSON serialization
response["sources_consulted"] = sorted(list(response["sources_consulted"]))
return json.dumps(response, indent=2)
def compare_similarity_rs(text1: str, text2: str, context_size: int = 5) -> str:
"""
Compare semantic similarity between two concepts in the RS Studies context.
This tool finds content related to both concepts and assesses their relationship
using EmbeddingGemma's semantic similarity optimization.
Args:
text1: First concept, topic, or text to compare
text2: Second concept, topic, or text to compare
context_size: Number of relevant chunks to analyze for each concept (1-10, default: 5)
Returns:
JSON string with related content for both concepts and similarity analysis
"""
if not text1 or not text1.strip() or not text2 or not text2.strip():
return json.dumps({
"error": "Both text1 and text2 must be provided",
"analysis": {},
"success": False
})
context_size = max(1, min(context_size, config.MAX_CONTEXT_SIZE))
# Search for content related to each concept using semantic similarity optimization
results1 = search_knowledge_base(text1, context_size, task_type="semantic_similarity")
results2 = search_knowledge_base(text2, context_size, task_type="semantic_similarity")
if not results1.get("success", False) or not results2.get("success", False):
return json.dumps({
"error": "Failed to search for one or both concepts",
"analysis": {},
"success": False
})
# Analyze overlap and differences
sources1 = set(r["source_folder"] for r in results1.get("results", []))
sources2 = set(r["source_folder"] for r in results2.get("results", []))
response = {
"concept1": text1,
"concept2": text2,
"concept1_results": len(results1.get("results", [])),
"concept2_results": len(results2.get("results", [])),
"shared_sources": sorted(list(sources1.intersection(sources2))),
"concept1_unique_sources": sorted(list(sources1 - sources2)),
"concept2_unique_sources": sorted(list(sources2 - sources1)),
"concept1_context": [
{
"rank": i + 1,
"content": r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"],
"source": f"{r['source_folder']} (chunk {r['chunk_number']})",
"relevance": f"{r['similarity_score']:.3f}"
}
for i, r in enumerate(results1.get("results", []))
],
"concept2_context": [
{
"rank": i + 1,
"content": r["content"][:200] + "..." if len(r["content"]) > 200 else r["content"],
"source": f"{r['source_folder']} (chunk {r['chunk_number']})",
"relevance": f"{r['similarity_score']:.3f}"
}
for i, r in enumerate(results2.get("results", []))
],
"success": True
}
return json.dumps(response, indent=2)
def classify_content_rs(content: str, categories: List[str] = None) -> str:
"""
Classify content against RS Studies knowledge categories.
Uses EmbeddingGemma's classification optimization to categorize content
based on the RS Studies knowledge base.
Args:
content: Text content to classify
categories: Optional list of specific categories to check against
(defaults to major RS topics)
Returns:
JSON string with classification results and supporting evidence
"""
if not content or not content.strip():
return json.dumps({
"error": "Content cannot be empty",
"classification": {},
"success": False
})
# Default categories based on RS Studies sources
if categories is None:
categories = [
"trading systems",
"market analysis",
"Chennai meetup discussions",
"Q&A topics",
"technical strategies"
]
# Search for similar content using classification optimization
search_results = search_knowledge_base(content, 8, task_type="classification")
if not search_results.get("success", False):
return json.dumps(search_results)
# Analyze which categories the content best fits
source_distribution = {}
for result in search_results.get("results", []):
source = result["source_folder"]
if source not in source_distribution:
source_distribution[source] = []
source_distribution[source].append({
"content": result["content"][:150] + "..." if len(result["content"]) > 150 else result["content"],
"similarity": result["similarity_score"]
})
response = {
"content": content[:200] + "..." if len(content) > 200 else content,
"available_categories": categories,
"source_distribution": source_distribution,
"top_matches": [
{
"rank": i + 1,
"content": r["content"][:150] + "..." if len(r["content"]) > 150 else r["content"],
"source_category": r["source_folder"],
"similarity_score": f"{r['similarity_score']:.3f}"
}
for i, r in enumerate(search_results.get("results", [])[:5])
],
"success": True
}
return json.dumps(response, indent=2)
# ==================================================
# QnA-ENHANCED EMBEDDING TOOLS
# ==================================================
def search_by_embedding_type(
query: str,
embedding_type: str = "content",
num_results: int = 5,
source_filter: Optional[str] = None
) -> str:
"""
Search the knowledge base using specific embedding types for optimized retrieval.
This tool leverages the QnA-enhanced embeddings to provide targeted search
based on different content representations of the same chunks.
Args:
query: Your search question or topic (required)
embedding_type: Type of embedding to search:
- 'content': Original chunk content (default)
- 'enhanced_content': Content enhanced with QnA context
- 'questions': Questions-only embeddings for question matching
- 'answers': Answers-only embeddings for factual retrieval
num_results: Number of results to return (1-50, default: 5)
source_filter: Limit to specific source folder (optional)
Returns:
JSON string with search results optimized for the specified embedding type
"""
# Validate parameters
if not query or not query.strip():
return json.dumps({"error": "Query cannot be empty", "results": [], "success": False})
valid_embedding_types = ["content", "enhanced_content", "questions", "answers"]
if embedding_type not in valid_embedding_types:
return json.dumps({
"error": f"Invalid embedding_type. Must be one of: {valid_embedding_types}",
"results": [],
"success": False
})
num_results = max(1, min(num_results, config.MAX_NUM_RESULTS))
try:
# Format query appropriately based on embedding type
if embedding_type == "questions":
formatted_query = EmbeddingGemmaPrompts.encode_query(query, "question_answering")
elif embedding_type == "answers":
formatted_query = EmbeddingGemmaPrompts.encode_query(query, "fact_checking")
else:
formatted_query = EmbeddingGemmaPrompts.encode_query(query, "search")
# Create query embedding
query_embedding = model.encode([formatted_query], device=device)
# Build where clause to filter by embedding type
where_clause = {"embedding_type": embedding_type}
if source_filter:
where_clause["source_folder"] = source_filter
# Query ChromaDB
search_results = collection.query(
query_embeddings=query_embedding.tolist(),
n_results=num_results,
where=where_clause
)
# Format results
results = []
for i, (doc, metadata, distance) in enumerate(zip(
search_results['documents'][0],
search_results['metadatas'][0],
search_results['distances'][0]
)):
results.append({
"rank": i + 1,
"content": doc,
"similarity_score": 1 - distance,
"embedding_type": metadata.get("embedding_type", "unknown"),
"enhanced": metadata.get("enhanced", False),
"qna_count": metadata.get("qna_count", 0),
"source_folder": metadata.get("source_folder", "unknown"),
"chunk_number": metadata.get("chunk_number", "unknown"),
"chunk_file": metadata.get("chunk_file", "unknown")
})
return json.dumps({
"query": query,
"embedding_type": embedding_type,
"results_found": len(results),
"source_filter": source_filter,
"results": results,
"success": True
}, indent=2)
except Exception as e:
return json.dumps({
"error": f"Search failed: {str(e)}",
"query": query,
"embedding_type": embedding_type,
"results": [],
"success": False
})
def smart_multi_search(
query: str,
num_results_per_type: int = 3,
source_filter: Optional[str] = None,
combine_strategy: str = "best_of_each"
) -> str:
"""
Perform intelligent multi-type search across different embedding types.
This tool searches across multiple embedding types and combines results
to provide comprehensive coverage of relevant information.
Args:
query: Your search question or topic (required)
num_results_per_type: Results per embedding type (1-10, default: 3)
source_filter: Limit to specific source folder (optional)
combine_strategy: How to combine results:
- 'best_of_each': Top results from each type
- 'relevance_ranked': All results ranked by similarity
- 'type_weighted': Weighted by embedding type appropriateness
Returns:
JSON string with combined search results and analysis
"""
if not query or not query.strip():
return json.dumps({"error": "Query cannot be empty", "results": [], "success": False})
num_results_per_type = max(1, min(num_results_per_type, 10))
try:
all_results = {}
embedding_types = ["content", "enhanced_content", "questions", "answers"]
# Search each embedding type
for emb_type in embedding_types:
search_result = search_by_embedding_type(
query, emb_type, num_results_per_type, source_filter
)
result_data = json.loads(search_result)
if result_data.get("success", False):
all_results[emb_type] = result_data["results"]
else:
all_results[emb_type] = []
# Combine results based on strategy
combined_results = []
if combine_strategy == "best_of_each":
# Take top result from each type
for emb_type, results in all_results.items():
for result in results:
result["search_type"] = emb_type
combined_results.append(result)
elif combine_strategy == "relevance_ranked":
# Combine all and sort by similarity
for emb_type, results in all_results.items():
for result in results:
result["search_type"] = emb_type
combined_results.append(result)
combined_results.sort(key=lambda x: x["similarity_score"], reverse=True)
elif combine_strategy == "type_weighted":
# Apply weights based on query type analysis
query_lower = query.lower()
# Simple heuristics for weighting
weights = {
"content": 1.0,
"enhanced_content": 1.2, # Slightly favor enhanced
"questions": 1.5 if any(word in query_lower for word in ["what", "how", "why", "when", "where", "?"]) else 0.8,
"answers": 1.3 if any(word in query_lower for word in ["define", "explain", "meaning", "is"]) else 0.9
}
for emb_type, results in all_results.items():
for result in results:
result["search_type"] = emb_type
result["weighted_score"] = result["similarity_score"] * weights[emb_type]
combined_results.append(result)
combined_results.sort(key=lambda x: x["weighted_score"], reverse=True)
# Deduplicate by chunk (keep best scoring version)
seen_chunks = {}
final_results = []
for result in combined_results:
chunk_key = f"{result['source_folder']}_chunk_{result['chunk_number']}"
if chunk_key not in seen_chunks or result["similarity_score"] > seen_chunks[chunk_key]["similarity_score"]:
seen_chunks[chunk_key] = result
final_results = list(seen_chunks.values())
final_results.sort(key=lambda x: x.get("weighted_score", x["similarity_score"]), reverse=True)
# Add ranking
for i, result in enumerate(final_results):
result["final_rank"] = i + 1
return json.dumps({
"query": query,
"combine_strategy": combine_strategy,
"total_results": len(final_results),
"embedding_types_searched": embedding_types,
"results_per_type": {emb_type: len(results) for emb_type, results in all_results.items()},
"source_filter": source_filter,
"results": final_results[:num_results_per_type * 2], # Limit final output
"success": True
}, indent=2)
except Exception as e:
return json.dumps({
"error": f"Multi-search failed: {str(e)}",
"query": query,
"results": [],
"success": False
})
def analyze_embedding_coverage(source_filter: Optional[str] = None) -> str:
"""
Analyze the distribution and coverage of different embedding types in the knowledge base.
Args:
source_filter: Limit analysis to specific source folder (optional)
Returns:
JSON string with embedding type statistics and coverage analysis
"""
try:
# Build where clause
where_clause = {}
if source_filter:
where_clause["source_folder"] = source_filter
# Get all documents with metadata
if where_clause:
all_docs = collection.get(where=where_clause)
else:
all_docs = collection.get()
# Analyze embedding types
type_counts = {}
enhanced_counts = {"enhanced": 0, "original": 0}
source_breakdown = {}
qna_stats = {"with_qna": 0, "without_qna": 0}
for metadata in all_docs['metadatas']:
emb_type = metadata.get('embedding_type', 'unknown')
type_counts[emb_type] = type_counts.get(emb_type, 0) + 1
# Enhanced vs original
if metadata.get('enhanced', False):
enhanced_counts["enhanced"] += 1
else:
enhanced_counts["original"] += 1
# Source breakdown
source = metadata.get('source_folder', 'unknown')
if source not in source_breakdown:
source_breakdown[source] = {}
source_breakdown[source][emb_type] = source_breakdown[source].get(emb_type, 0) + 1
# QnA statistics
if metadata.get('qna_count', 0) > 0:
qna_stats["with_qna"] += 1
else:
qna_stats["without_qna"] += 1
total_embeddings = len(all_docs['metadatas'])
return json.dumps({
"total_embeddings": total_embeddings,
"source_filter": source_filter,
"embedding_type_distribution": type_counts,
"enhancement_status": enhanced_counts,
"qna_coverage": qna_stats,
"source_breakdown": source_breakdown,
"coverage_percentage": {
emb_type: round((count / total_embeddings) * 100, 2)
for emb_type, count in type_counts.items()
},
"success": True
}, indent=2)
except Exception as e:
return json.dumps({
"error": f"Analysis failed: {str(e)}",
"analysis": {},
"success": False
})
def find_related_questions(
topic: str,
num_questions: int = 5,
source_filter: Optional[str] = None
) -> str:
"""
Find questions related to a specific topic using the questions-only embeddings.
This tool is optimized for discovering what questions are available about
a topic, useful for exploration and understanding coverage.
Args:
topic: Topic or concept to find questions about (required)
num_questions: Number of related questions to return (1-20, default: 5)
source_filter: Limit to specific source folder (optional)
Returns:
JSON string with related questions and their context
"""
if not topic or not topic.strip():
return json.dumps({"error": "Topic cannot be empty", "questions": [], "success": False})
num_questions = max(1, min(num_questions, 20))
try:
# Search using questions-only embeddings
question_search = search_by_embedding_type(
topic, "questions", num_questions, source_filter
)
search_data = json.loads(question_search)
if not search_data.get("success", False):
return json.dumps({
"error": "Failed to search questions",
"topic": topic,
"questions": [],
"success": False
})
# Extract questions and add context
questions = []
for result in search_data["results"]:
# Parse the questions from the content (format: "Q1 | Q2 | Q3")
question_list = [q.strip() for q in result["content"].split("|")]
for question in question_list:
if question: # Skip empty questions
questions.append({
"question": question,
"relevance_score": result["similarity_score"],
"source": f"{result['source_folder']} (chunk {result['chunk_number']})",
"chunk_file": result["chunk_file"],
"qna_count": result.get("qna_count", 0)
})
# Sort by relevance and limit
questions.sort(key=lambda x: x["relevance_score"], reverse=True)
questions = questions[:num_questions]
# Add ranking
for i, q in enumerate(questions):
q["rank"] = i + 1
return json.dumps({
"topic": topic,
"total_questions_found": len(questions),
"source_filter": source_filter,
"questions": questions,
"success": True
}, indent=2)
except Exception as e:
return json.dumps({
"error": f"Question search failed: {str(e)}",
"topic": topic,
"questions": [],
"success": False
})
with gr.Blocks() as demo:
gr.Markdown(
"""
This is a MCP only tool for RS Studies
This connects to a remote chromadb instance.
This tool is MCP-only, so it does not have a UI.
"""
)
gr.api(
search_rs_studies
)
gr.api(
get_rs_sources
)
gr.api(
ask_rs_question
)
gr.api(
search_by_source
)
gr.api(
verify_fact_rs
)
gr.api(
compare_similarity_rs
)
gr.api(
classify_content_rs
)
gr.api(
search_by_embedding_type
)
gr.api(
smart_multi_search
)
gr.api(
analyze_embedding_coverage
)
gr.api(
find_related_questions
)
_, url, _ = demo.launch(mcp_server=True) |