# mcp_gateway.py from fastapi import FastAPI, HTTPException, Request, UploadFile, File from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel import uvicorn import httpx import logging import os import io from dotenv import load_dotenv load_dotenv() # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("MCP_Gateway") # --- Import Microservices for Consolidation --- try: from tavily_mcp import app as tavily_app from alphavantage_mcp import app as alphavantage_app from private_mcp import app as private_app logger.info("Successfully imported microservices for consolidation.") except ImportError as e: logger.critical(f"Failed to import microservices: {e}") raise # --- Configuration (Updated for Monolithic Mode) --- # Default to internal mounted paths on the same port (8000) TAVILY_MCP_URL = os.getenv("TAVILY_MCP_URL", "http://127.0.0.1:8000/tavily/research") ALPHAVANTAGE_MCP_URL = os.getenv("ALPHAVANTAGE_MCP_URL", "http://127.0.0.1:8000/alphavantage/market_data") PRIVATE_MCP_URL = os.getenv("PRIVATE_MCP_URL", "http://127.0.0.1:8000/private/portfolio_data") # --- FastAPI App --- app = FastAPI(title="Aegis MCP Gateway (Monolith)") # --- CORS Configuration --- app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # --- Mount Microservices --- app.mount("/tavily", tavily_app) app.mount("/alphavantage", alphavantage_app) app.mount("/private", private_app) client = httpx.AsyncClient() @app.middleware("http") async def audit_log_middleware(request: Request, call_next): # Skip logging for internal sub-app calls to reduce noise if needed, # but strictly speaking this middleware triggers for the parent app. # Requests to mounted apps might bypass this or trigger it depending on path matching. logger.info(f"Request received: {request.method} {request.url}") response = await call_next(request) return response # --- New REST Endpoints for Next.js --- class ResearchRequest(BaseModel): ticker: str class ChatMessage(BaseModel): role: str content: str class ChatRequest(BaseModel): message: str history: list[ChatMessage] = [] @app.post("/api/chat") async def api_chat_orchestrator(request: ChatRequest): try: from features.utils import call_gemini from features.research_report import generate_report import re user_msg = request.message # 1. Routing Agent: Determine intent routing_prompt = f"""You are Sentinel's routing agent. The user said: "{user_msg}" Determine if they want a deep research report on a specific stock ticker. If YES, reply ONLY with the stock ticker symbol (e.g. AAPL, TSLA, NVDA). If NO (they are just asking a general question or chatting), reply ONLY with the word "CHAT". """ intent = call_gemini(routing_prompt, "You are a precise routing system.").strip().upper() if intent != "CHAT" and len(intent) <= 5 and intent.isalpha(): # Trigger Research Pipeline logger.info(f"Routing to Research Report Pipeline for: {intent}") report = generate_report(intent) # Format the JSON report beautifully into Markdown for the Chat UI reply = f"### 📊 Sentinel Analysis Sequence Complete: **{report.get('_resolved_ticker', intent)}**\n\n" reply += f"**Executive Summary**\n{report.get('executive_summary', '')}\n\n" reply += f"***\n**Fundamentals**\n{report.get('fundamentals', '')}\n\n" reply += f"***\n**Latest Intelligence**\n{report.get('news', '')}\n\n" reply += f"***\n**⚠️ Risk Assessment**\n{report.get('risks', '')}\n\n" reply += f"***\n**🎯 Final Verdict & Price Target**\n{report.get('verdict', '')}" return {"reply": reply} else: # 2. General Conversation Agent logger.info("Routing to General Chat Agent") chat_context = "" for msg in request.history[-5:]: # Keep last 5 messages for context chat_context += f"{msg.role.capitalize()}: {msg.content}\n" chat_prompt = f"""You are Sentinel, an elite AI financial intelligence operating system. You are talking to a user through a sleek, neon 'Generative UI' terminal. Keep your responses concise, sharp, and highly technical. Use markdown extensively. Conversation History: {chat_context} User's new message: {user_msg} """ reply = call_gemini(chat_prompt, "You are Sentinel, an elite financial AI.") return {"reply": reply} except Exception as e: logger.error(f"Chat Orchestrator Error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/research") async def api_research_report(request: ResearchRequest): try: from features.research_report import generate_report report = generate_report(request.ticker) return {"status": "success", "data": report} except Exception as e: logger.error(f"Research Report Error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/api/portfolio") async def api_portfolio_analyzer(file: UploadFile = File(...)): try: from features.portfolio_analyzer import _parse_csv, _parse_excel, _parse_pdf, _enrich_holdings, _generate_ai_analysis content = await file.read() file_obj = io.BytesIO(content) file_obj.name = file.filename if file.filename.lower().endswith('.csv'): holdings = _parse_csv(file_obj) elif file.filename.lower().endswith(('.xlsx', '.xls')): holdings = _parse_excel(file_obj) elif file.filename.lower().endswith('.pdf'): holdings = _parse_pdf(file_obj) else: raise HTTPException(status_code=400, detail="Unsupported file format.") if holdings is None or holdings.empty: raise HTTPException(status_code=400, detail="Could not parse holdings from the uploaded file.") enriched = _enrich_holdings(holdings) ai_result = _generate_ai_analysis(enriched) # Convert df to dict enriched_dict = enriched.to_dict(orient="records") return { "status": "success", "data": { "holdings": enriched_dict, "analysis": ai_result } } except Exception as e: logger.error(f"Portfolio Analyzer Error: {e}") raise HTTPException(status_code=500, detail=str(e)) @app.post("/route_agent_request") async def route_agent_request(request_data: dict): target_service = request_data.get("target_service") payload = request_data.get("payload", {}) logger.info(f"Routing request for target service: {target_service}") url_map = { "tavily_research": TAVILY_MCP_URL, "alpha_vantage_market_data": ALPHAVANTAGE_MCP_URL, "alpha_vantage_overview": os.getenv("AV_OVERVIEW_URL", "http://127.0.0.1:8000/alphavantage/company_overview"), "alpha_vantage_quote": os.getenv("AV_QUOTE_URL", "http://127.0.0.1:8000/alphavantage/global_quote"), "internal_portfolio_data": PRIVATE_MCP_URL, } target_url = url_map.get(target_service) if not target_url: logger.error(f"Invalid target service specified: {target_service}") raise HTTPException(status_code=400, detail=f"Invalid target service: {target_service}") try: # Self-referential call (Gateway -> Mounted App on same server) # We must ensure we don't block. HTTPX AsyncClient handles this well. response = await client.post(target_url, json=payload, timeout=180.0) response.raise_for_status() return JSONResponse(content=response.json(), status_code=response.status_code) except httpx.HTTPStatusError as e: logger.error(f"Error from microservice {target_service}: {e.response.text}") raise HTTPException(status_code=e.response.status_code, detail=e.response.json()) except httpx.RequestError as e: logger.error(f"Could not connect to microservice {target_service}: {e}") raise HTTPException(status_code=503, detail=f"Service '{target_service}' is unavailable.") except Exception as e: logger.critical(f"An unexpected error occurred during routing: {e}") raise HTTPException(status_code=500, detail="Internal server error in MCP Gateway.") @app.get("/") def read_root(): return {"message": "Aegis MCP Gateway (Monolithic) is operational."} if __name__ == "__main__": uvicorn.run(app, host="127.0.0.1", port=8000)