Spaces:
Runtime error
Runtime error
| """ | |
| Simple FastAPI web interface for MCP Server usage guide | |
| Integrated with MCP server directly | |
| """ | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import HTMLResponse, JSONResponse, Response, FileResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pathlib import Path | |
| import uvicorn | |
| import json | |
| # Import MCP server components | |
| from mcp.server.fastmcp import FastMCP | |
| from edgar_client import EdgarDataClient | |
| from financial_analyzer import FinancialAnalyzer | |
| app = FastAPI(title="SEC Financial Data MCP Server") | |
| # Add CORS middleware | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize EDGAR clients | |
| edgar_client = EdgarDataClient( | |
| user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)" | |
| ) | |
| financial_analyzer = FinancialAnalyzer( | |
| user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)" | |
| ) | |
| # Create FastMCP server with pure JSON response | |
| mcp = FastMCP("sec-financial-data", json_response=True) | |
| def search_company(company_name: str) -> dict: | |
| """Search for a company by name in SEC EDGAR database.""" | |
| result = edgar_client.search_company_by_name(company_name) | |
| if result: | |
| return result | |
| else: | |
| return {"error": f"No company found with name: {company_name}"} | |
| def get_company_info(cik: str) -> dict: | |
| """Get detailed company information.""" | |
| result = edgar_client.get_company_info(cik) | |
| if result: | |
| return result | |
| else: | |
| return {"error": f"No company found with CIK: {cik}"} | |
| def get_company_filings(cik: str, form_types: list[str] | None = None) -> dict: | |
| """Get list of company SEC filings.""" | |
| # Convert list to tuple for caching | |
| if form_types: | |
| form_types_tuple = tuple(form_types) | |
| else: | |
| form_types_tuple = None | |
| result = edgar_client.get_company_filings(cik, form_types_tuple) | |
| if result: | |
| limited_result = result[:20] | |
| return { | |
| "total": len(result), | |
| "returned": len(limited_result), | |
| "filings": limited_result | |
| } | |
| else: | |
| return {"error": f"No filings found for CIK: {cik}"} | |
| def get_financial_data(cik: str, period: str) -> dict: | |
| """Get financial data for a specific period.""" | |
| result = edgar_client.get_financial_data_for_period(cik, period) | |
| if result and "period" in result: | |
| return result | |
| else: | |
| return {"error": f"No financial data found for CIK: {cik}, Period: {period}"} | |
| def extract_financial_metrics(cik: str, years: int = 3) -> dict: | |
| """Extract comprehensive financial metrics for multiple years.""" | |
| if years < 1 or years > 10: | |
| return {"error": "Years parameter must be between 1 and 10"} | |
| try: | |
| metrics = financial_analyzer.extract_financial_metrics(cik, years) | |
| if metrics: | |
| formatted = financial_analyzer.format_financial_data(metrics) | |
| return { | |
| "periods": len(formatted), | |
| "data": formatted | |
| } | |
| else: | |
| # Return helpful debug info | |
| filings_10k = edgar_client.get_company_filings(cik, ('10-K',)) | |
| filings_20f = edgar_client.get_company_filings(cik, ('20-F',)) | |
| return { | |
| "error": f"No financial metrics extracted for CIK: {cik}", | |
| "debug": { | |
| "cik": cik, | |
| "years_requested": years, | |
| "filings_found": { | |
| "10-K": len(filings_10k), | |
| "20-F": len(filings_20f) | |
| }, | |
| "latest_filings": [ | |
| {"form": f.get("form_type"), "date": f.get("filing_date")} | |
| for f in (filings_10k + filings_20f)[:5] | |
| ] | |
| }, | |
| "suggestion": "Try using get_latest_financial_data or get_financial_data with a specific period" | |
| } | |
| except Exception as e: | |
| return { | |
| "error": f"Exception extracting metrics: {str(e)}", | |
| "cik": cik, | |
| "years": years | |
| } | |
| def get_latest_financial_data(cik: str) -> dict: | |
| """Get the most recent financial data available.""" | |
| result = financial_analyzer.get_latest_financial_data(cik) | |
| if result and "period" in result: | |
| return result | |
| else: | |
| return {"error": f"No latest financial data found for CIK: {cik}"} | |
| def advanced_search_company(company_input: str) -> dict: | |
| """Advanced search supporting both company name and CIK code.""" | |
| result = financial_analyzer.search_company(company_input) | |
| if result.get("error"): | |
| return {"error": result["error"]} | |
| return result | |
| # Mount MCP server HTTP endpoint (support both /mcp and /sse for backward compatibility) | |
| async def mcp_http_endpoint(request: Request): | |
| """MCP HTTP endpoint - handle both GET and POST (accessible via /mcp or /sse)""" | |
| try: | |
| # For POST requests, get JSON body | |
| if request.method == "POST": | |
| body = await request.json() | |
| # Handle JSON-RPC request | |
| if body.get("method") == "tools/list": | |
| # List all tools | |
| tools = [] | |
| for tool_name, tool_func in mcp._tool_manager._tools.items(): | |
| tools.append({ | |
| "name": tool_name, | |
| "description": tool_func.__doc__ or "", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": {}, | |
| "required": [] | |
| } | |
| }) | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "result": {"tools": tools} | |
| }) | |
| elif body.get("method") == "tools/call": | |
| # Call a specific tool | |
| tool_name = body.get("params", {}).get("name") | |
| arguments = body.get("params", {}).get("arguments", {}) | |
| # Directly call the tool functions we defined | |
| tool_map = { | |
| "search_company": search_company, | |
| "get_company_info": get_company_info, | |
| "get_company_filings": get_company_filings, | |
| "get_financial_data": get_financial_data, | |
| "extract_financial_metrics": extract_financial_metrics, | |
| "get_latest_financial_data": get_latest_financial_data, | |
| "advanced_search_company": advanced_search_company | |
| } | |
| if tool_name in tool_map: | |
| tool_func = tool_map[tool_name] | |
| result = tool_func(**arguments) | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "result": { | |
| "content": [{ | |
| "type": "text", | |
| "text": json.dumps(result) | |
| }] | |
| } | |
| }) | |
| else: | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "error": { | |
| "code": -32601, | |
| "message": f"Tool not found: {tool_name}" | |
| } | |
| }) | |
| else: | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "error": { | |
| "code": -32601, | |
| "message": f"Method not found: {body.get('method')}" | |
| } | |
| }) | |
| else: | |
| # GET request - return info | |
| return JSONResponse({ | |
| "service": "SEC Financial Data MCP Server", | |
| "protocol": "MCP 2024-11-05", | |
| "transport": "HTTP", | |
| "status": "online" | |
| }) | |
| except Exception as e: | |
| return JSONResponse( | |
| content={"error": str(e), "type": "server_error"}, | |
| status_code=500 | |
| ) | |
| # Get the template directory path | |
| TEMPLATES_DIR = Path(__file__).parent / "templates" | |
| async def root(): | |
| """Serve the main index page (ignores query parameters like ?logs=container)""" | |
| index_path = TEMPLATES_DIR / "index.html" | |
| if index_path.exists(): | |
| return FileResponse(index_path, media_type="text/html") | |
| # Return error with debug info | |
| return HTMLResponse( | |
| f"<h1>Error: Template not found</h1>" | |
| f"<p>Looking for: {index_path}</p>" | |
| f"<p>Exists: {index_path.exists()}</p>" | |
| f"<p>Current dir: {Path(__file__).parent}</p>", | |
| status_code=404 | |
| ) | |
| async def test_page(): | |
| """Serve the test page""" | |
| test_path = TEMPLATES_DIR / "test.html" | |
| if test_path.exists(): | |
| return FileResponse(test_path) | |
| return HTMLResponse("<h1>Error: Test page not found</h1>", status_code=404) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |