Spaces:
Sleeping
Sleeping
| """ | |
| MCP Server for Hugging Face Space Deployment | |
| Uses Server-Sent Events (SSE) transport for remote MCP access | |
| Fully compatible with MCP clients like Claude Desktop | |
| Returns clean JSON without emoji formatting | |
| """ | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import StreamingResponse, JSONResponse, HTMLResponse, FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Dict, Any, List, Optional | |
| import json | |
| import asyncio | |
| from datetime import datetime | |
| from edgar_client import EdgarDataClient | |
| from financial_analyzer import FinancialAnalyzer | |
| import time | |
| import sys | |
| import signal | |
| import os | |
| from pathlib import Path | |
| from contextlib import contextmanager | |
| # Initialize FastAPI app | |
| app = FastAPI( | |
| title="SEC Financial Report MCP Server API", | |
| description="Model Context Protocol Server for SEC EDGAR Financial Data", | |
| version="2.3.6" | |
| ) | |
| # Server startup time for monitoring | |
| server_start_time = time.time() | |
| request_count = 0 | |
| error_count = 0 | |
| # Configure CORS for remote access | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Request tracking middleware with timeout protection | |
| async def track_requests(request: Request, call_next): | |
| global request_count, error_count | |
| request_count += 1 | |
| start_time = time.time() | |
| try: | |
| # Set a timeout for the entire request - increased for better reliability | |
| response = await asyncio.wait_for( | |
| call_next(request), | |
| timeout=180.0 # Increased from 120 to 180 seconds (3 minutes) | |
| ) | |
| if response.status_code >= 400: | |
| error_count += 1 | |
| return response | |
| except asyncio.TimeoutError: | |
| error_count += 1 | |
| print(f"Request timeout after {time.time() - start_time:.2f}s: {request.url.path}") | |
| return JSONResponse( | |
| status_code=504, | |
| content={"error": "Request timeout", "message": "The request took too long to process. Please try again."} | |
| ) | |
| except Exception as e: | |
| error_count += 1 | |
| print(f"Request error: {e}") | |
| raise | |
| # Initialize EDGAR clients | |
| edgar_client = EdgarDataClient( | |
| user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)" | |
| ) | |
| financial_analyzer = FinancialAnalyzer( | |
| user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)" | |
| ) | |
| # Preload company tickers data on startup for better performance | |
| print("[Startup] Preloading company tickers data...") | |
| try: | |
| edgar_client._load_company_tickers() | |
| print("[Startup] Company tickers preloaded successfully") | |
| except Exception as e: | |
| print(f"[Startup] Warning: Failed to preload company tickers: {e}") | |
| print("[Startup] Will load on first request") | |
| # ==================== MCP Protocol Implementation ==================== | |
| class MCPRequest(BaseModel): | |
| jsonrpc: str = "2.0" | |
| id: Optional[Any] = None | |
| method: str | |
| params: Optional[Dict[str, Any]] = None | |
| class MCPResponse(BaseModel): | |
| jsonrpc: str = "2.0" | |
| id: Optional[Any] = None | |
| result: Optional[Any] = None | |
| error: Optional[Dict[str, Any]] = None | |
| # MCP Tools Definition | |
| MCP_TOOLS = [ | |
| { | |
| "name": "search_company", | |
| "description": "Search for a company by name in SEC EDGAR database. Returns company CIK, name, and ticker symbol.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "company_name": { | |
| "type": "string", | |
| "description": "Company name to search (e.g., 'Microsoft', 'Apple', 'Tesla')" | |
| } | |
| }, | |
| "required": ["company_name"] | |
| } | |
| }, | |
| { | |
| "name": "get_company_info", | |
| "description": "Get detailed company information including name, tickers, SIC code, and industry description.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "cik": { | |
| "type": "string", | |
| "description": "Company CIK code (10-digit format, e.g., '0000789019')" | |
| } | |
| }, | |
| "required": ["cik"] | |
| } | |
| }, | |
| { | |
| "name": "get_company_filings", | |
| "description": "Get list of company SEC filings (10-K, 10-Q, 20-F, etc.) with filing dates and document links.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "cik": { | |
| "type": "string", | |
| "description": "Company CIK code" | |
| }, | |
| "form_types": { | |
| "type": "array", | |
| "items": {"type": "string"}, | |
| "description": "Optional: Filter by form types (e.g., ['10-K', '10-Q'])" | |
| } | |
| }, | |
| "required": ["cik"] | |
| } | |
| }, | |
| { | |
| "name": "get_financial_data", | |
| "description": "Get financial data for a specific period including revenue, net income, EPS, operating expenses, and cash flow.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "cik": { | |
| "type": "string", | |
| "description": "Company CIK code" | |
| }, | |
| "period": { | |
| "type": "string", | |
| "description": "Period in format 'YYYY' for annual or 'YYYYQX' for quarterly (e.g., '2024', '2024Q3')" | |
| } | |
| }, | |
| "required": ["cik", "period"] | |
| } | |
| }, | |
| { | |
| "name": "extract_financial_metrics", | |
| "description": "Extract comprehensive financial metrics for multiple years including both annual and quarterly data. Returns data in chronological order (newest first).", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "cik": { | |
| "type": "string", | |
| "description": "Company CIK code" | |
| }, | |
| "years": { | |
| "type": "integer", | |
| "description": "Number of recent years to extract (1-10, default: 3)", | |
| "minimum": 1, | |
| "maximum": 10, | |
| "default": 3 | |
| } | |
| }, | |
| "required": ["cik"] | |
| } | |
| }, | |
| { | |
| "name": "get_latest_financial_data", | |
| "description": "Get the most recent financial data available for a company.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "cik": { | |
| "type": "string", | |
| "description": "Company CIK code" | |
| } | |
| }, | |
| "required": ["cik"] | |
| } | |
| }, | |
| { | |
| "name": "advanced_search_company", | |
| "description": "Advanced search supporting both company name and CIK code. Automatically detects input type.", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": { | |
| "company_input": { | |
| "type": "string", | |
| "description": "Company name, ticker, or CIK code" | |
| } | |
| }, | |
| "required": ["company_input"] | |
| } | |
| } | |
| ] | |
| def timeout_context(seconds): | |
| """Context manager for timeout on Unix-like systems""" | |
| def timeout_handler(signum, frame): | |
| raise TimeoutError(f"Operation timeout after {seconds} seconds") | |
| # Only works on Unix-like systems | |
| try: | |
| old_handler = signal.signal(signal.SIGALRM, timeout_handler) | |
| signal.alarm(seconds) | |
| try: | |
| yield | |
| finally: | |
| signal.alarm(0) | |
| signal.signal(signal.SIGALRM, old_handler) | |
| except (AttributeError, ValueError): | |
| # Windows or signal not available | |
| yield | |
| def execute_tool(tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: | |
| """Execute MCP tool and return clean JSON result with enhanced timeout protection""" | |
| try: | |
| # Use context manager for timeout (90 seconds per tool - increased from 60) | |
| with timeout_context(90): | |
| if tool_name == "search_company": | |
| result = edgar_client.search_company_by_name(arguments["company_name"]) | |
| if result: | |
| return { | |
| "type": "text", | |
| "text": json.dumps(result, ensure_ascii=False) | |
| } | |
| else: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": f"No company found with name: {arguments['company_name']}" | |
| }, ensure_ascii=False) | |
| } | |
| elif tool_name == "get_company_info": | |
| result = edgar_client.get_company_info(arguments["cik"]) | |
| if result: | |
| return { | |
| "type": "text", | |
| "text": json.dumps(result, ensure_ascii=False) | |
| } | |
| else: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": f"No company found with CIK: {arguments['cik']}" | |
| }, ensure_ascii=False) | |
| } | |
| elif tool_name == "get_company_filings": | |
| form_types = arguments.get("form_types") | |
| result = edgar_client.get_company_filings(arguments["cik"], form_types) | |
| if result: | |
| # Limit to 20 results | |
| limited_result = result[:20] | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "total": len(result), | |
| "returned": len(limited_result), | |
| "filings": limited_result | |
| }, ensure_ascii=False) | |
| } | |
| else: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": f"No filings found for CIK: {arguments['cik']}" | |
| }, ensure_ascii=False) | |
| } | |
| elif tool_name == "get_financial_data": | |
| result = edgar_client.get_financial_data_for_period(arguments["cik"], arguments["period"]) | |
| if result and "period" in result: | |
| return { | |
| "type": "text", | |
| "text": json.dumps(result, ensure_ascii=False) | |
| } | |
| else: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": f"No financial data found for CIK: {arguments['cik']}, Period: {arguments['period']}" | |
| }, ensure_ascii=False) | |
| } | |
| elif tool_name == "extract_financial_metrics": | |
| years = arguments.get("years", 3) | |
| if years < 1 or years > 10: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": "Years parameter must be between 1 and 10" | |
| }, ensure_ascii=False) | |
| } | |
| metrics = financial_analyzer.extract_financial_metrics(arguments["cik"], years) | |
| if metrics: | |
| formatted = financial_analyzer.format_financial_data(metrics) | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "periods": len(formatted), | |
| "data": formatted | |
| }, ensure_ascii=False) | |
| } | |
| else: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": f"No financial metrics found for CIK: {arguments['cik']}" | |
| }, ensure_ascii=False) | |
| } | |
| elif tool_name == "get_latest_financial_data": | |
| result = financial_analyzer.get_latest_financial_data(arguments["cik"]) | |
| if result and "period" in result: | |
| return { | |
| "type": "text", | |
| "text": json.dumps(result, ensure_ascii=False) | |
| } | |
| else: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": f"No latest financial data found for CIK: {arguments['cik']}" | |
| }, ensure_ascii=False) | |
| } | |
| elif tool_name == "advanced_search_company" or tool_name == "advanced_search": | |
| # Support both names for backward compatibility | |
| result = financial_analyzer.search_company(arguments["company_input"]) | |
| if result.get("error"): | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": result["error"] | |
| }, ensure_ascii=False) | |
| } | |
| return { | |
| "type": "text", | |
| "text": json.dumps(result, ensure_ascii=False) | |
| } | |
| else: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": f"Unknown tool: {tool_name}" | |
| }, ensure_ascii=False) | |
| } | |
| except Exception as e: | |
| return { | |
| "type": "text", | |
| "text": json.dumps({ | |
| "error": f"Error executing {tool_name}: {str(e)}" | |
| }, ensure_ascii=False) | |
| } | |
| # ==================== MCP Endpoints ==================== | |
| async def handle_mcp_message(request: MCPRequest): | |
| """ | |
| Main MCP message handler | |
| Supports: initialize, tools/list, tools/call | |
| """ | |
| method = request.method | |
| params = request.params or {} | |
| # Handle initialize | |
| if method == "initialize": | |
| return MCPResponse( | |
| id=request.id, | |
| result={ | |
| "protocolVersion": "2024-11-05", | |
| "capabilities": { | |
| "tools": {} | |
| }, | |
| "serverInfo": { | |
| "name": "sec-financial-data", | |
| "version": "2.3.6" | |
| } | |
| } | |
| ).dict() | |
| # Handle tools/list | |
| elif method == "tools/list": | |
| return MCPResponse( | |
| id=request.id, | |
| result={ | |
| "tools": MCP_TOOLS | |
| } | |
| ).dict() | |
| # Handle tools/call | |
| elif method == "tools/call": | |
| tool_name = params.get("name") | |
| arguments = params.get("arguments", {}) | |
| if not tool_name: | |
| return MCPResponse( | |
| id=request.id, | |
| error={ | |
| "code": -32602, | |
| "message": "Missing tool name" | |
| } | |
| ).dict() | |
| result = execute_tool(tool_name, arguments) | |
| return MCPResponse( | |
| id=request.id, | |
| result={ | |
| "content": [result] | |
| } | |
| ).dict() | |
| else: | |
| return MCPResponse( | |
| id=request.id, | |
| error={ | |
| "code": -32601, | |
| "message": f"Method not found: {method}" | |
| } | |
| ).dict() | |
| async def sse_endpoint(request: Request): | |
| """ | |
| Server-Sent Events endpoint for MCP transport | |
| Keeps connection alive and handles MCP messages | |
| """ | |
| async def event_stream(): | |
| # Send initial endpoint message | |
| init_message = { | |
| "jsonrpc": "2.0", | |
| "method": "endpoint", | |
| "params": { | |
| "endpoint": "/message" | |
| } | |
| } | |
| yield f"data: {json.dumps(init_message)}\n\n" | |
| # Keep connection alive with optimized ping interval | |
| try: | |
| while True: | |
| await asyncio.sleep(20) # 20 seconds ping interval for stability | |
| # Send ping to keep connection alive | |
| ping_message = { | |
| "jsonrpc": "2.0", | |
| "method": "ping", | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| yield f"data: {json.dumps(ping_message)}\n\n" | |
| except asyncio.CancelledError: | |
| pass | |
| return StreamingResponse( | |
| event_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache, no-transform", | |
| "Connection": "keep-alive", | |
| "X-Accel-Buffering": "no", | |
| "Content-Type": "text/event-stream" | |
| } | |
| ) | |
| async def root(): | |
| """Interactive landing page with tool testing functionality""" | |
| # Get the path to the templates directory | |
| current_dir = Path(__file__).parent | |
| template_path = current_dir / "templates" / "index.html" | |
| # Read and return the HTML file | |
| try: | |
| with open(template_path, "r", encoding="utf-8") as f: | |
| html_content = f.read() | |
| return HTMLResponse(content=html_content) | |
| except FileNotFoundError: | |
| # Enhanced error message with debugging information | |
| import os | |
| debug_info = f""" | |
| <h1>Error: Template not found</h1> | |
| <h2>Debugging Information:</h2> | |
| <p><strong>Looking for:</strong> {template_path.absolute()}</p> | |
| <p><strong>Exists:</strong> {template_path.exists()}</p> | |
| <p><strong>Current dir:</strong> {current_dir.absolute()}</p> | |
| <h3>Directory contents:</h3> | |
| <pre>{chr(10).join(os.listdir(current_dir))}</pre> | |
| <h3>Fix:</h3> | |
| <p>Ensure Dockerfile contains: <code>COPY templates/ templates/</code></p> | |
| """ | |
| return HTMLResponse(content=debug_info, status_code=500) | |
| async def list_tools(): | |
| """List all available MCP tools (for documentation)""" | |
| return {"tools": MCP_TOOLS} | |
| async def health_check(): | |
| """Enhanced health check endpoint with diagnostics""" | |
| uptime_seconds = time.time() - server_start_time | |
| return { | |
| "status": "healthy", | |
| "server": "sec-financial-data", | |
| "version": "2.3.6", | |
| "protocol": "MCP", | |
| "transport": "SSE", | |
| "tools_count": len(MCP_TOOLS), | |
| "uptime_seconds": round(uptime_seconds, 2), | |
| "python_version": sys.version, | |
| "request_count": request_count, | |
| "error_count": error_count, | |
| "error_rate": round(error_count / max(request_count, 1) * 100, 2), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def readiness_check(): | |
| """Readiness check for load balancers""" | |
| try: | |
| # Quick validation that clients are initialized | |
| if edgar_client and financial_analyzer: | |
| return {"status": "ready", "timestamp": datetime.now().isoformat()} | |
| else: | |
| raise HTTPException(status_code=503, detail="Services not initialized") | |
| except Exception as e: | |
| raise HTTPException(status_code=503, detail=f"Not ready: {str(e)}") | |
| async def redirect_old_api(path: str): | |
| """Handle old REST API endpoints with helpful message""" | |
| return JSONResponse( | |
| status_code=404, | |
| content={ | |
| "error": "API endpoint not found", | |
| "message": "This server now uses the MCP (Model Context Protocol).", | |
| "migration_guide": { | |
| "old_endpoint": f"/api/{path}", | |
| "new_method": "Use MCP protocol via POST /message", | |
| "documentation": "Visit / for setup instructions", | |
| "example": { | |
| "method": "POST", | |
| "url": "/message", | |
| "body": { | |
| "jsonrpc": "2.0", | |
| "id": 1, | |
| "method": "tools/call", | |
| "params": { | |
| "name": "advanced_search_company", | |
| "arguments": {"company_input": "Microsoft"} | |
| } | |
| } | |
| } | |
| }, | |
| "available_tools": "/tools", | |
| "health_check": "/health" | |
| } | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run( | |
| "mcp_server_sse:app", | |
| host="0.0.0.0", | |
| port=7860, | |
| reload=True | |
| ) | |