JC321's picture
Upload 4 files
e0facb5 verified
raw
history blame
9.95 kB
"""
Simple FastAPI web interface for MCP Server usage guide
Integrated with MCP server directly
"""
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse, Response, FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pathlib import Path
import uvicorn
import json
# Import MCP server components
from mcp.server.fastmcp import FastMCP
from edgar_client import EdgarDataClient
from financial_analyzer import FinancialAnalyzer
app = FastAPI(title="SEC Financial Data MCP Server")
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize EDGAR clients
edgar_client = EdgarDataClient(
user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"
)
financial_analyzer = FinancialAnalyzer(
user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"
)
# Create FastMCP server with pure JSON response
mcp = FastMCP("sec-financial-data", json_response=True)
@mcp.tool()
def search_company(company_name: str) -> dict:
"""Search for a company by name in SEC EDGAR database."""
result = edgar_client.search_company_by_name(company_name)
if result:
return result
else:
return {"error": f"No company found with name: {company_name}"}
@mcp.tool()
def get_company_info(cik: str) -> dict:
"""Get detailed company information."""
result = edgar_client.get_company_info(cik)
if result:
return result
else:
return {"error": f"No company found with CIK: {cik}"}
@mcp.tool()
def get_company_filings(cik: str, form_types: list[str] | None = None) -> dict:
"""Get list of company SEC filings."""
# Convert list to tuple for caching
if form_types:
form_types_tuple = tuple(form_types)
else:
form_types_tuple = None
result = edgar_client.get_company_filings(cik, form_types_tuple)
if result:
limited_result = result[:20]
return {
"total": len(result),
"returned": len(limited_result),
"filings": limited_result
}
else:
return {"error": f"No filings found for CIK: {cik}"}
@mcp.tool()
def get_financial_data(cik: str, period: str) -> dict:
"""Get financial data for a specific period."""
result = edgar_client.get_financial_data_for_period(cik, period)
if result and "period" in result:
return result
else:
return {"error": f"No financial data found for CIK: {cik}, Period: {period}"}
@mcp.tool()
def extract_financial_metrics(cik: str, years: int = 3) -> dict:
"""Extract comprehensive financial metrics for multiple years."""
if years < 1 or years > 10:
return {"error": "Years parameter must be between 1 and 10"}
try:
metrics = financial_analyzer.extract_financial_metrics(cik, years)
if metrics:
formatted = financial_analyzer.format_financial_data(metrics)
return {
"periods": len(formatted),
"data": formatted
}
else:
# Return helpful debug info
filings_10k = edgar_client.get_company_filings(cik, ('10-K',))
filings_20f = edgar_client.get_company_filings(cik, ('20-F',))
return {
"error": f"No financial metrics extracted for CIK: {cik}",
"debug": {
"cik": cik,
"years_requested": years,
"filings_found": {
"10-K": len(filings_10k),
"20-F": len(filings_20f)
},
"latest_filings": [
{"form": f.get("form_type"), "date": f.get("filing_date")}
for f in (filings_10k + filings_20f)[:5]
]
},
"suggestion": "Try using get_latest_financial_data or get_financial_data with a specific period"
}
except Exception as e:
return {
"error": f"Exception extracting metrics: {str(e)}",
"cik": cik,
"years": years
}
@mcp.tool()
def get_latest_financial_data(cik: str) -> dict:
"""Get the most recent financial data available."""
result = financial_analyzer.get_latest_financial_data(cik)
if result and "period" in result:
return result
else:
return {"error": f"No latest financial data found for CIK: {cik}"}
@mcp.tool()
def advanced_search_company(company_input: str) -> dict:
"""Advanced search supporting both company name and CIK code."""
result = financial_analyzer.search_company(company_input)
if result.get("error"):
return {"error": result["error"]}
return result
# Mount MCP server HTTP endpoint (support both /mcp and /sse for backward compatibility)
@app.post("/mcp")
@app.get("/mcp")
@app.post("/sse")
@app.get("/sse")
async def mcp_http_endpoint(request: Request):
"""MCP HTTP endpoint - handle both GET and POST (accessible via /mcp or /sse)"""
try:
# For POST requests, get JSON body
if request.method == "POST":
body = await request.json()
# Handle JSON-RPC request
if body.get("method") == "tools/list":
# List all tools
tools = []
for tool_name, tool_func in mcp._tool_manager._tools.items():
tools.append({
"name": tool_name,
"description": tool_func.__doc__ or "",
"inputSchema": {
"type": "object",
"properties": {},
"required": []
}
})
return JSONResponse({
"jsonrpc": "2.0",
"id": body.get("id"),
"result": {"tools": tools}
})
elif body.get("method") == "tools/call":
# Call a specific tool
tool_name = body.get("params", {}).get("name")
arguments = body.get("params", {}).get("arguments", {})
# Directly call the tool functions we defined
tool_map = {
"search_company": search_company,
"get_company_info": get_company_info,
"get_company_filings": get_company_filings,
"get_financial_data": get_financial_data,
"extract_financial_metrics": extract_financial_metrics,
"get_latest_financial_data": get_latest_financial_data,
"advanced_search_company": advanced_search_company
}
if tool_name in tool_map:
tool_func = tool_map[tool_name]
result = tool_func(**arguments)
return JSONResponse({
"jsonrpc": "2.0",
"id": body.get("id"),
"result": {
"content": [{
"type": "text",
"text": json.dumps(result)
}]
}
})
else:
return JSONResponse({
"jsonrpc": "2.0",
"id": body.get("id"),
"error": {
"code": -32601,
"message": f"Tool not found: {tool_name}"
}
})
else:
return JSONResponse({
"jsonrpc": "2.0",
"id": body.get("id"),
"error": {
"code": -32601,
"message": f"Method not found: {body.get('method')}"
}
})
else:
# GET request - return info
return JSONResponse({
"service": "SEC Financial Data MCP Server",
"protocol": "MCP 2024-11-05",
"transport": "HTTP",
"status": "online"
})
except Exception as e:
return JSONResponse(
content={"error": str(e), "type": "server_error"},
status_code=500
)
# Get the template directory path
TEMPLATES_DIR = Path(__file__).parent / "templates"
@app.get("/", response_class=HTMLResponse)
async def root():
"""Serve the main index page (ignores query parameters like ?logs=container)"""
index_path = TEMPLATES_DIR / "index.html"
if index_path.exists():
return FileResponse(index_path, media_type="text/html")
# Return error with debug info
return HTMLResponse(
f"<h1>Error: Template not found</h1>"
f"<p>Looking for: {index_path}</p>"
f"<p>Exists: {index_path.exists()}</p>"
f"<p>Current dir: {Path(__file__).parent}</p>",
status_code=404
)
@app.get("/test", response_class=HTMLResponse)
async def test_page():
"""Serve the test page"""
test_path = TEMPLATES_DIR / "test.html"
if test_path.exists():
return FileResponse(test_path)
return HTMLResponse("<h1>Error: Test page not found</h1>", status_code=404)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=7860)