"""Agentic RAG tools for autonomous vector store exploration. This module provides tools that allow an agent to autonomously: 1. Search semantically across the vector store 2. Filter by metadata fields 3. Retrieve specific documents 4. List available categories 5. Perform hybrid search with filters The agent uses a ReAct loop to iteratively explore and refine its search. """ import json from typing import Optional, Dict, Any, List from langchain_core.tools import tool from functools import wraps # Global references to be set at initialization _encoder = None _vector_store = None def set_rag_dependencies(encoder, vector_store): """Set the encoder and vector store instances for RAG tools. Args: encoder: The SeaLion encoder instance vector_store: The DonorVectorStore instance """ global _encoder, _vector_store _encoder = encoder _vector_store = vector_store def _format_results(results: List[Any], include_details: bool = True) -> str: """Format search results for agent consumption. Args: results: List of SimilarityResult objects include_details: Whether to include full form data Returns: Formatted string representation of results """ if not results: return "No results found." formatted = [] for i, result in enumerate(results, 1): entry = { "rank": i, "id": result.id, "form_type": result.form_type, "similarity_score": round(result.score, 4), } if include_details and result.form_data: # Extract key fields for readability form_data = result.form_data entry["name"] = form_data.get("name", "Unknown") entry["country"] = form_data.get("country", "Unknown") entry["causes"] = form_data.get("causes", []) # Include type-specific fields if result.form_type == "donor": entry["donor_type"] = form_data.get("donor_type", "Unknown") entry["donation_frequency"] = form_data.get("donation_frequency") elif result.form_type == "volunteer": entry["volunteer_type"] = form_data.get("volunteer_type", "Unknown") entry["skills"] = form_data.get("skills", []) entry["availability"] = form_data.get("availability") formatted.append(entry) return json.dumps(formatted, indent=2, default=str) @tool async def semantic_search(query: str, limit: int = 5, form_type: Optional[str] = None) -> str: """Search documents by semantic similarity. Use this to find donors/volunteers whose profiles match a natural language query. The search uses vector embeddings to find semantically similar entries. Args: query: Natural language description of what you're looking for. Examples: "corporate donors interested in education", "volunteers with tech skills in Singapore" limit: Maximum number of results to return (default: 5, max: 20) form_type: Optional filter - "donor" or "volunteer" Returns: JSON formatted list of matching profiles with similarity scores """ print(f"[Agentic RAG] semantic_search called - query: '{query}', limit: {limit}, form_type: {form_type}") if _encoder is None or _vector_store is None: return "Error: RAG tools not initialized. Call set_rag_dependencies first." try: # Encode the query embedding = await _encoder.encode(query) # Search the vector store results = await _vector_store.find_similar( query_embedding=embedding, form_type=form_type, limit=min(limit, 20) ) return _format_results(results) except Exception as e: return f"Search error: {str(e)}" @tool async def filter_by_metadata( field: str, value: str, limit: int = 10 ) -> str: """Browse documents filtered by metadata field. Use this to find all entries matching a specific metadata value. Useful for exploring what's available before doing semantic search. Args: field: The metadata field to filter on. Valid fields: "form_type", "donor_type", "volunteer_type", "country", "availability" value: The value to match. Examples: form_type="donor", country="SG", donor_type="corporate" limit: Maximum number of results (default: 10) Returns: JSON formatted list of matching entries """ print(f"[Agentic RAG] filter_by_metadata called - field: '{field}', value: '{value}', limit: {limit}") if _vector_store is None: return "Error: RAG tools not initialized." try: # Map field to actual database query approach if field == "form_type": results = await _vector_store.find_by_form_type(value, limit=limit) else: # For other fields, we need to search through text_content # Use a raw query approach async with _vector_store.pool.connection() as conn: async with conn.cursor() as cur: # Build ILIKE pattern for JSON field search pattern = f'%"{field}": "{value}"%' await cur.execute( """ SELECT source_id, text_content, metadata FROM my_embeddings WHERE text_content ILIKE %s LIMIT %s """, (pattern, limit) ) rows = await cur.fetchall() # Convert to SimilarityResult-like format from recommender.vector_store import SimilarityResult, _parse_json_field results = [] for row in rows: form_data = _parse_json_field(row[1]) metadata = _parse_json_field(row[2]) results.append(SimilarityResult( id=row[0], form_data=form_data, form_type=metadata.get("form_type", "unknown"), score=1.0, distance=0.0 )) return _format_results(results) except Exception as e: return f"Filter error: {str(e)}" @tool async def get_document_by_id(doc_id: str) -> str: """Retrieve a specific document by ID for deeper inspection. Use this when you've identified a promising result from search and want to see the complete profile details. Args: doc_id: The unique document/form ID (e.g., "donor_12345") Returns: Complete JSON representation of the document """ print(f"[Agentic RAG] get_document_by_id called - doc_id: '{doc_id}'") if _vector_store is None: return "Error: RAG tools not initialized." try: result = await _vector_store.get_embedding(doc_id) if result is None: return f"Document with ID '{doc_id}' not found." # Return full document details document = { "id": result.id, "form_type": result.form_type, "data": result.form_data } return json.dumps(document, indent=2, default=str) except Exception as e: return f"Retrieval error: {str(e)}" @tool async def list_available_categories() -> str: """List all unique values for filterable fields. Use this first to understand what categories exist in the database before performing filtered searches. Returns available: - Form types (donor, volunteer) - Countries (ASEAN country codes) - Causes (education, health, etc.) - Donor types (individual, corporate, foundation) - Volunteer types (regular, event_based, skilled) Returns: JSON object with distinct values for each category """ print("[Agentic RAG] list_available_categories called") if _vector_store is None: return "Error: RAG tools not initialized." try: async with _vector_store.pool.connection() as conn: async with conn.cursor() as cur: # Get form type counts await cur.execute(""" SELECT metadata->>'form_type' as form_type, COUNT(*) as count FROM my_embeddings GROUP BY metadata->>'form_type' """) form_types = {row[0]: row[1] for row in await cur.fetchall()} # Get distinct countries await cur.execute(""" SELECT DISTINCT text_content::json->>'country' as country FROM my_embeddings WHERE text_content::json->>'country' IS NOT NULL """) countries = [row[0] for row in await cur.fetchall() if row[0]] # Get distinct donor types await cur.execute(""" SELECT DISTINCT text_content::json->>'donor_type' as dtype FROM my_embeddings WHERE text_content::json->>'donor_type' IS NOT NULL """) donor_types = [row[0] for row in await cur.fetchall() if row[0]] # Get distinct volunteer types await cur.execute(""" SELECT DISTINCT text_content::json->>'volunteer_type' as vtype FROM my_embeddings WHERE text_content::json->>'volunteer_type' IS NOT NULL """) volunteer_types = [row[0] for row in await cur.fetchall() if row[0]] # Get all causes (need to aggregate from arrays) await cur.execute(""" SELECT text_content FROM my_embeddings WHERE text_content LIKE '%causes%' LIMIT 100 """) rows = await cur.fetchall() all_causes = set() for row in rows: try: if isinstance(row[0], str): data = json.loads(row[0]) else: data = row[0] causes = data.get("causes", []) if isinstance(causes, list): all_causes.update(causes) except (json.JSONDecodeError, TypeError): pass categories = { "form_types": form_types, "countries": sorted(countries), "donor_types": sorted(donor_types), "volunteer_types": sorted(volunteer_types), "causes": sorted(all_causes), "total_records": sum(form_types.values()) if form_types else 0 } return json.dumps(categories, indent=2) except Exception as e: return f"Error listing categories: {str(e)}" @tool async def hybrid_search( query: str, country: Optional[str] = None, form_type: Optional[str] = None, causes: Optional[List[str]] = None, limit: int = 10 ) -> str: """Combine semantic search with metadata filters. Use this for targeted searches that combine meaning (semantic) with specific constraints (filters). More precise than pure semantic search when you know specific criteria. Args: query: Natural language query for semantic matching country: Optional country code filter (e.g., "SG", "MY", "TH") form_type: Optional form type filter ("donor" or "volunteer") causes: Optional list of cause categories to match limit: Maximum number of results (default: 10) Returns: JSON formatted list of results matching both semantic query and filters """ print(f"[Agentic RAG] hybrid_search called - query: '{query}', country: {country}, form_type: {form_type}, causes: {causes}, limit: {limit}") if _encoder is None or _vector_store is None: return "Error: RAG tools not initialized." try: # Encode the query embedding = await _encoder.encode(query) # Use cause-based hybrid search if causes specified if causes and len(causes) > 0: results = await _vector_store.find_by_causes( target_causes=causes, query_embedding=embedding, limit=limit ) # Apply additional filters if needed if form_type or country: filtered = [] for r in results: if form_type and r.form_type != form_type: continue if country and r.form_data.get("country") != country: continue filtered.append(r) results = filtered[:limit] else: # Standard similarity search with filters results = await _vector_store.find_similar( query_embedding=embedding, form_type=form_type, limit=limit, country_filter=country ) return _format_results(results) except Exception as e: return f"Hybrid search error: {str(e)}" @tool async def get_statistics() -> str: """Get overall statistics about the vector store. Use this to understand the size and composition of the database before starting your search. Returns: JSON with counts by form type and other aggregate stats """ print("[Agentic RAG] get_statistics called") if _vector_store is None: return "Error: RAG tools not initialized." try: counts = await _vector_store.count_by_type() return json.dumps({ "database_statistics": counts, "description": "Number of entries by form type in the vector store" }, indent=2) except Exception as e: return f"Error getting statistics: {str(e)}" # Export all RAG tools as a list for easy registration RAG_TOOLS = [ semantic_search, filter_by_metadata, get_document_by_id, list_available_categories, hybrid_search, get_statistics, ]