Spaces:
Sleeping
Sleeping
| """ | |
| Tool for querying Vertex AI RAG corpora and retrieving relevant information. | |
| """ | |
| import logging | |
| from google.adk.tools.tool_context import ToolContext | |
| from vertexai import rag | |
| from ..config import ( | |
| DEFAULT_DISTANCE_THRESHOLD, | |
| DEFAULT_TOP_K, | |
| ) | |
| from .utils import check_corpus_exists, get_corpus_resource_name | |
| def rag_query( | |
| corpus_name: str, | |
| query: str, | |
| tool_context: ToolContext, | |
| ) -> dict: | |
| """ | |
| Query a Vertex AI RAG corpus with a user question and return relevant information. | |
| Args: | |
| corpus_name (str): The name of the corpus to query. If empty, the current corpus will be used. | |
| Preferably use the resource_name from list_corpora results. | |
| query (str): The text query to search for in the corpus | |
| tool_context (ToolContext): The tool context | |
| Returns: | |
| dict: The query results and status | |
| """ | |
| try: | |
| # Check if the corpus exists | |
| if not check_corpus_exists(corpus_name, tool_context): | |
| return { | |
| "status": "error", | |
| "message": f"Corpus '{corpus_name}' does not exist. Please create it first using the create_corpus tool.", | |
| "query": query, | |
| "corpus_name": corpus_name, | |
| } | |
| # Get the corpus resource name | |
| corpus_resource_name = get_corpus_resource_name(corpus_name) | |
| # Configure retrieval parameters | |
| rag_retrieval_config = rag.RagRetrievalConfig( | |
| top_k=DEFAULT_TOP_K, | |
| filter=rag.Filter(vector_distance_threshold=DEFAULT_DISTANCE_THRESHOLD), | |
| ) | |
| # Perform the query | |
| print("Performing retrieval query...") | |
| response = rag.retrieval_query( | |
| rag_resources=[ | |
| rag.RagResource( | |
| rag_corpus=corpus_resource_name, | |
| ) | |
| ], | |
| text=query, | |
| rag_retrieval_config=rag_retrieval_config, | |
| ) | |
| # Process the response into a more usable format | |
| results = [] | |
| if hasattr(response, "contexts") and response.contexts: | |
| for ctx_group in response.contexts.contexts: | |
| result = { | |
| "source_uri": ( | |
| ctx_group.source_uri if hasattr(ctx_group, "source_uri") else "" | |
| ), | |
| "source_name": ( | |
| ctx_group.source_display_name | |
| if hasattr(ctx_group, "source_display_name") | |
| else "" | |
| ), | |
| "text": ctx_group.text if hasattr(ctx_group, "text") else "", | |
| "score": ctx_group.score if hasattr(ctx_group, "score") else 0.0, | |
| } | |
| results.append(result) | |
| # If we didn't find any results | |
| if not results: | |
| return { | |
| "status": "warning", | |
| "message": f"No results found in corpus '{corpus_name}' for query: '{query}'", | |
| "query": query, | |
| "corpus_name": corpus_name, | |
| "results": [], | |
| "results_count": 0, | |
| } | |
| return { | |
| "status": "success", | |
| "message": f"Successfully queried corpus '{corpus_name}'", | |
| "query": query, | |
| "corpus_name": corpus_name, | |
| "results": results, | |
| "results_count": len(results), | |
| } | |
| except Exception as e: | |
| error_msg = f"Error querying corpus: {str(e)}" | |
| logging.error(error_msg) | |
| return { | |
| "status": "error", | |
| "message": error_msg, | |
| "query": query, | |
| "corpus_name": corpus_name, | |
| } | |