Spaces:
Running
Running
File size: 8,865 Bytes
5d2eba0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | # mcp_gateway.py
from fastapi import FastAPI, HTTPException, Request, UploadFile, File
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import uvicorn
import httpx
import logging
import os
import io
from dotenv import load_dotenv
load_dotenv()
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("MCP_Gateway")
# --- Import Microservices for Consolidation ---
try:
from tavily_mcp import app as tavily_app
from alphavantage_mcp import app as alphavantage_app
from private_mcp import app as private_app
logger.info("Successfully imported microservices for consolidation.")
except ImportError as e:
logger.critical(f"Failed to import microservices: {e}")
raise
# --- Configuration (Updated for Monolithic Mode) ---
# Default to internal mounted paths on the same port (8000)
TAVILY_MCP_URL = os.getenv("TAVILY_MCP_URL", "http://127.0.0.1:8000/tavily/research")
ALPHAVANTAGE_MCP_URL = os.getenv("ALPHAVANTAGE_MCP_URL", "http://127.0.0.1:8000/alphavantage/market_data")
PRIVATE_MCP_URL = os.getenv("PRIVATE_MCP_URL", "http://127.0.0.1:8000/private/portfolio_data")
# --- FastAPI App ---
app = FastAPI(title="Aegis MCP Gateway (Monolith)")
# --- CORS Configuration ---
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Mount Microservices ---
app.mount("/tavily", tavily_app)
app.mount("/alphavantage", alphavantage_app)
app.mount("/private", private_app)
client = httpx.AsyncClient()
@app.middleware("http")
async def audit_log_middleware(request: Request, call_next):
# Skip logging for internal sub-app calls to reduce noise if needed,
# but strictly speaking this middleware triggers for the parent app.
# Requests to mounted apps might bypass this or trigger it depending on path matching.
logger.info(f"Request received: {request.method} {request.url}")
response = await call_next(request)
return response
# --- New REST Endpoints for Next.js ---
class ResearchRequest(BaseModel):
ticker: str
class ChatMessage(BaseModel):
role: str
content: str
class ChatRequest(BaseModel):
message: str
history: list[ChatMessage] = []
@app.post("/api/chat")
async def api_chat_orchestrator(request: ChatRequest):
try:
from features.utils import call_gemini
from features.research_report import generate_report
import re
user_msg = request.message
# 1. Routing Agent: Determine intent
routing_prompt = f"""You are Sentinel's routing agent. The user said: "{user_msg}"
Determine if they want a deep research report on a specific stock ticker.
If YES, reply ONLY with the stock ticker symbol (e.g. AAPL, TSLA, NVDA).
If NO (they are just asking a general question or chatting), reply ONLY with the word "CHAT".
"""
intent = call_gemini(routing_prompt, "You are a precise routing system.").strip().upper()
if intent != "CHAT" and len(intent) <= 5 and intent.isalpha():
# Trigger Research Pipeline
logger.info(f"Routing to Research Report Pipeline for: {intent}")
report = generate_report(intent)
# Format the JSON report beautifully into Markdown for the Chat UI
reply = f"### 📊 Sentinel Analysis Sequence Complete: **{report.get('_resolved_ticker', intent)}**\n\n"
reply += f"**Executive Summary**\n{report.get('executive_summary', '')}\n\n"
reply += f"***\n**Fundamentals**\n{report.get('fundamentals', '')}\n\n"
reply += f"***\n**Latest Intelligence**\n{report.get('news', '')}\n\n"
reply += f"***\n**⚠️ Risk Assessment**\n{report.get('risks', '')}\n\n"
reply += f"***\n**🎯 Final Verdict & Price Target**\n{report.get('verdict', '')}"
return {"reply": reply}
else:
# 2. General Conversation Agent
logger.info("Routing to General Chat Agent")
chat_context = ""
for msg in request.history[-5:]: # Keep last 5 messages for context
chat_context += f"{msg.role.capitalize()}: {msg.content}\n"
chat_prompt = f"""You are Sentinel, an elite AI financial intelligence operating system.
You are talking to a user through a sleek, neon 'Generative UI' terminal.
Keep your responses concise, sharp, and highly technical. Use markdown extensively.
Conversation History:
{chat_context}
User's new message:
{user_msg}
"""
reply = call_gemini(chat_prompt, "You are Sentinel, an elite financial AI.")
return {"reply": reply}
except Exception as e:
logger.error(f"Chat Orchestrator Error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/research")
async def api_research_report(request: ResearchRequest):
try:
from features.research_report import generate_report
report = generate_report(request.ticker)
return {"status": "success", "data": report}
except Exception as e:
logger.error(f"Research Report Error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/portfolio")
async def api_portfolio_analyzer(file: UploadFile = File(...)):
try:
from features.portfolio_analyzer import _parse_csv, _parse_excel, _parse_pdf, _enrich_holdings, _generate_ai_analysis
content = await file.read()
file_obj = io.BytesIO(content)
file_obj.name = file.filename
if file.filename.lower().endswith('.csv'):
holdings = _parse_csv(file_obj)
elif file.filename.lower().endswith(('.xlsx', '.xls')):
holdings = _parse_excel(file_obj)
elif file.filename.lower().endswith('.pdf'):
holdings = _parse_pdf(file_obj)
else:
raise HTTPException(status_code=400, detail="Unsupported file format.")
if holdings is None or holdings.empty:
raise HTTPException(status_code=400, detail="Could not parse holdings from the uploaded file.")
enriched = _enrich_holdings(holdings)
ai_result = _generate_ai_analysis(enriched)
# Convert df to dict
enriched_dict = enriched.to_dict(orient="records")
return {
"status": "success",
"data": {
"holdings": enriched_dict,
"analysis": ai_result
}
}
except Exception as e:
logger.error(f"Portfolio Analyzer Error: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/route_agent_request")
async def route_agent_request(request_data: dict):
target_service = request_data.get("target_service")
payload = request_data.get("payload", {})
logger.info(f"Routing request for target service: {target_service}")
url_map = {
"tavily_research": TAVILY_MCP_URL,
"alpha_vantage_market_data": ALPHAVANTAGE_MCP_URL,
"alpha_vantage_overview": os.getenv("AV_OVERVIEW_URL", "http://127.0.0.1:8000/alphavantage/company_overview"),
"alpha_vantage_quote": os.getenv("AV_QUOTE_URL", "http://127.0.0.1:8000/alphavantage/global_quote"),
"internal_portfolio_data": PRIVATE_MCP_URL,
}
target_url = url_map.get(target_service)
if not target_url:
logger.error(f"Invalid target service specified: {target_service}")
raise HTTPException(status_code=400, detail=f"Invalid target service: {target_service}")
try:
# Self-referential call (Gateway -> Mounted App on same server)
# We must ensure we don't block. HTTPX AsyncClient handles this well.
response = await client.post(target_url, json=payload, timeout=180.0)
response.raise_for_status()
return JSONResponse(content=response.json(), status_code=response.status_code)
except httpx.HTTPStatusError as e:
logger.error(f"Error from microservice {target_service}: {e.response.text}")
raise HTTPException(status_code=e.response.status_code, detail=e.response.json())
except httpx.RequestError as e:
logger.error(f"Could not connect to microservice {target_service}: {e}")
raise HTTPException(status_code=503, detail=f"Service '{target_service}' is unavailable.")
except Exception as e:
logger.critical(f"An unexpected error occurred during routing: {e}")
raise HTTPException(status_code=500, detail="Internal server error in MCP Gateway.")
@app.get("/")
def read_root():
return {"message": "Aegis MCP Gateway (Monolithic) is operational."}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)
|