Spaces:
Sleeping
Sleeping
| from mcp.server.fastmcp import FastMCP | |
| from datetime import datetime | |
| from llama_index.core import VectorStoreIndex | |
| from llama_index.core import ( | |
| StorageContext, | |
| load_index_from_storage, | |
| ) | |
| from llama_index.core import Settings | |
| from llama_index.llms.azure_openai import AzureOpenAI | |
| from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding | |
| from typing import Dict, Optional, List | |
| import json | |
| import os | |
| import aiohttp # Necesario para las peticiones HTTP asíncronas | |
| api_key = os.environ.get('AZURE_API_KEY') | |
| azure_endpoint = "https://pharmaia-gpt.openai.azure.com/" | |
| api_version = "2024-02-01" | |
| llm = AzureOpenAI( | |
| model="gpt-4.1", | |
| deployment_name="gpt-4.1", | |
| api_key=api_key, | |
| azure_endpoint=azure_endpoint, | |
| api_version=api_version, | |
| ) | |
| # You need to deploy your own embedding model as well as your own chat completion model | |
| embed_model = AzureOpenAIEmbedding( | |
| model="text-embedding-3-large", | |
| deployment_name="text-embedding-3-large", | |
| api_key=api_key, | |
| azure_endpoint=azure_endpoint, | |
| api_version=api_version, | |
| ) | |
| Settings.llm = llm | |
| Settings.embed_model = embed_model | |
| # Configuración inicial (esto probablemente estaría en otro módulo) | |
| DOCUMENTS_BASE_PATH = "./" | |
| SOURCES = { | |
| "oms": "oms/", # Esta será la carpeta base que contiene todos los subíndices | |
| } | |
| # Cargar índices recursivamente | |
| indices: Dict[str, VectorStoreIndex] = {} | |
| for source, rel_path in SOURCES.items(): | |
| full_path = os.path.join(DOCUMENTS_BASE_PATH, rel_path) | |
| if not os.path.exists(full_path): | |
| continue | |
| # Buscar todas las subcarpetas que contengan índices | |
| for root, dirs, files in os.walk(full_path): | |
| if "storage_nodes" in dirs: | |
| # Esta es una carpeta que contiene un índice | |
| try: | |
| storage_path = os.path.join(root, "storage_nodes") | |
| storage_context = StorageContext.from_defaults(persist_dir=storage_path) | |
| # Usamos el nombre de la carpeta padre como clave (ej: "vec_1") | |
| index_name = os.path.basename(root) | |
| full_index_name = f"{source}_{index_name}" # ej: "oms_vec_1" | |
| index = load_index_from_storage(storage_context, index_id="vector_index") | |
| indices[full_index_name] = index | |
| except Exception as e: | |
| print(f"Error cargando índice en {root}: {str(e)}") | |
| continue | |
| port = int(os.getenv("PORT", 7860)) | |
| mcp = FastMCP("OnBase", port=port) | |
| # Configuración del archivo retrievers.json | |
| RETRIEVERS_METADATA_PATH = Path("./retrievers.json") | |
| # Cargar metadatos de los retrievers | |
| def load_retrievers_metadata() -> Dict: | |
| try: | |
| with open(RETRIEVERS_METADATA_PATH, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| print(f"Warning: {RETRIEVERS_METADATA_PATH} not found. Using empty metadata.") | |
| return {} | |
| except json.JSONDecodeError: | |
| print(f"Warning: {RETRIEVERS_METADATA_PATH} is invalid JSON. Using empty metadata.") | |
| return {} | |
| retrievers_metadata = load_retrievers_metadata() | |
| # Resource para listar solo títulos/disponibles | |
| def get_retriever_titles() -> dict: | |
| """ | |
| Devuelve una lista con los títulos/nombres de los retrievers disponibles | |
| """ | |
| return { | |
| "titles": list(retrievers_metadata.keys()), | |
| "count": len(retrievers_metadata) | |
| } | |
| # Resource para obtener metadatos específicos | |
| def get_retriever_details(retriever_title: str) -> dict: | |
| """ | |
| Devuelve los metadatos completos para un retriever específico | |
| Parameters: | |
| retriever_title: El título/nombre del retriever (ej: 'oms') | |
| """ | |
| if retriever_title not in retrievers_metadata: | |
| return { | |
| "error": f"Retriever '{retriever_title}' no encontrado", | |
| "available_titles": list(retrievers_metadata.keys()) | |
| } | |
| return { | |
| "retriever": retriever_title, | |
| "details": retrievers_metadata[retriever_title] | |
| } | |
| # Modificación del resource existente para usar los metadatos | |
| def get_available_retrievers(retriever_title: Optional[str] = None) -> dict: | |
| """ | |
| Versión mejorada que puede filtrar por título de retriever | |
| Parameters: | |
| retriever_title: Opcional. Si se especifica, solo devuelve los de este título | |
| """ | |
| available_retrievers = [] | |
| for full_index_name in indices.keys(): | |
| parts = full_index_name.split('_') | |
| source = parts[0] | |
| # Filtrar por título si se especificó | |
| if retriever_title and source != retriever_title: | |
| continue | |
| # Obtener metadatos del JSON si existen | |
| metadata = retrievers_metadata.get(source, {}).get(full_index_name, {}) | |
| available_retrievers.append({ | |
| "retriever_name": full_index_name, | |
| "source": source, | |
| "index_name": '_'.join(parts[1:]) if len(parts) > 1 else "default", | |
| "description": metadata.get("description", f"Documentos de {source.upper()}"), | |
| "content_info": metadata.get("content_info", "No description available"), | |
| "last_updated": metadata.get("last_updated", "unknown") | |
| }) | |
| if retriever_title and not available_retrievers: | |
| return { | |
| "error": f"No hay retrievers para el título '{retriever_title}'", | |
| "available_titles": list(retrievers_metadata.keys()) | |
| } | |
| return { | |
| "retrievers": available_retrievers, | |
| "count": len(available_retrievers), | |
| "filtered_by": retriever_title if retriever_title else "all" | |
| } | |
| def retrieve_docs( | |
| query: str, | |
| retrievers: List[str], | |
| top_k: int = 3 | |
| ) -> dict: | |
| """ | |
| Retrieve documents from different regulations using semantic search. | |
| Parameters: | |
| query: Search query (required). | |
| retrievers: List of specific retriever names to use (required). | |
| top_k: Number of results to return per retriever (default: 3). | |
| Example: | |
| retrieve_docs( | |
| query="salud pública", | |
| retrievers=["oms_vec_1", "oms_tree_2"], | |
| top_k=2 | |
| ) | |
| """ | |
| if not query: | |
| return {"error": "Query parameter is required"} | |
| if not retrievers: | |
| return {"error": "At least one retriever must be specified", "available_retrievers": list(indices.keys())} | |
| # Verificar que todos los retrievers solicitados existan | |
| invalid_retrievers = [r for r in retrievers if r not in indices] | |
| if invalid_retrievers: | |
| return { | |
| "error": f"Invalid retrievers specified: {invalid_retrievers}", | |
| "available_retrievers": list(indices.keys()) | |
| } | |
| results = {} | |
| for retriever_name in retrievers: | |
| try: | |
| retriever = indices[retriever_name].as_retriever(similarity_top_k=top_k) | |
| nodes = retriever.retrieve(query) | |
| results[retriever_name] = [ | |
| { | |
| "content": node.get_content(), | |
| "metadata": node.metadata, | |
| "score": node.score | |
| } | |
| for node in nodes | |
| ] | |
| except Exception as e: | |
| results[retriever_name] = { | |
| "error": f"Error retrieving documents: {str(e)}" | |
| } | |
| return { | |
| "results": results, | |
| "query": query, | |
| "retrievers_used": retrievers, | |
| "top_k": top_k, | |
| "successful_retrievers": [r for r in retrievers if isinstance(results[r], list)], | |
| "failed_retrievers": [r for r in retrievers if not isinstance(results[r], list)] | |
| } | |
| async def search_tavily( | |
| query: str, | |
| days: int = 7, | |
| max_results: int = 1, | |
| include_answer: bool = False | |
| ) -> dict: | |
| """Perform a web search using the Tavily API. | |
| Args: | |
| query: Search query string (required) | |
| days: Restrict search to last N days (default: 7) | |
| max_results: Maximum results to return (default: 1) | |
| include_answer: Include a direct answer only when requested by the user (default: False) | |
| Returns: | |
| dict: Search results from Tavily | |
| """ | |
| # Obtener la API key de las variables de entorno | |
| tavily_api_key = os.environ.get('TAVILY_API_KEY') | |
| if not tavily_api_key: | |
| raise ValueError("TAVILY_API_KEY environment variable not set") | |
| headers = { | |
| "Authorization": f"Bearer {tavily_api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| payload = { | |
| "query": query, | |
| "search_depth": "basic", | |
| "max_results": max_results, | |
| "days": days if days else None, | |
| "include_answer": include_answer | |
| } | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| "https://api.tavily.com/search", | |
| headers=headers, | |
| json=payload | |
| ) as response: | |
| response.raise_for_status() | |
| result = await response.json() | |
| return result | |
| except Exception as e: | |
| return { | |
| "error": str(e), | |
| "status": "failed", | |
| "query": query | |
| } | |
| if __name__ == "__main__": | |
| mcp.run("sse") |