Spaces:
Runtime error
Runtime error
| """ | |
| SEC Financial Data MCP Server with Gradio UI | |
| Provides standard MCP service + Web interface for testing | |
| """ | |
| import os | |
| import gradio as gr | |
| import json | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse | |
| from mcp.server.fastmcp import FastMCP | |
| from edgar_client import EdgarDataClient | |
| from financial_analyzer import FinancialAnalyzer | |
| # Initialize FastMCP server | |
| mcp = FastMCP("sec-financial-data") | |
| # Initialize EDGAR clients | |
| edgar_client = EdgarDataClient( | |
| user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)" | |
| ) | |
| financial_analyzer = FinancialAnalyzer( | |
| user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)" | |
| ) | |
| # Define MCP tools with optimized descriptions for LLM | |
| def search_company(company_name: str) -> dict: | |
| """ | |
| [STEP 1] Find company CIK by name/ticker. Always call this FIRST before other tools. | |
| Args: company_name - Company name or ticker (e.g. "Tesla", "TSLA", "google") | |
| Returns: {cik, name, ticker} - Use cik for subsequent tool calls | |
| """ | |
| result = edgar_client.search_company_by_name(company_name) | |
| return result if result else {"error": f"No company found with name: {company_name}"} | |
| def get_company_info(cik: str) -> dict: | |
| """Get company details (name, SIC, industry). Requires CIK from search_company.""" | |
| result = edgar_client.get_company_info(cik) | |
| return result if result else {"error": f"No company found with CIK: {cik}"} | |
| def get_company_filings(cik: str, form_types: list[str] | None = None) -> dict: | |
| """List SEC filings (10-K, 10-Q, etc). Requires CIK. Use form_types to filter.""" | |
| form_types_tuple = tuple(form_types) if form_types else None | |
| result = edgar_client.get_company_filings(cik, form_types_tuple) | |
| if result: | |
| limited_result = result[:20] | |
| return { | |
| "total": len(result), | |
| "returned": len(limited_result), | |
| "filings": limited_result | |
| } | |
| return {"error": f"No filings found for CIK: {cik}"} | |
| def get_financial_data(cik: str, period: str) -> dict: | |
| """Get financial data for specific period (e.g. '2024', '2024Q3'). Requires CIK.""" | |
| result = edgar_client.get_financial_data_for_period(cik, period) | |
| return result if result and "period" in result else {"error": f"No financial data found for CIK: {cik}, Period: {period}"} | |
| def extract_financial_metrics(cik: str, years: int = 3) -> dict: | |
| """ | |
| [PRIMARY TOOL] Get multi-year financials (annual+quarterly). Includes latest data. | |
| Use for: Any financial query (trends, growth, comparisons, or latest data) | |
| Workflow: search_company(name) β extract_financial_metrics(cik, years) | |
| Args: | |
| cik: 10-digit CIK from search_company | |
| years: Years to retrieve (1-10, default 3) | |
| Returns: {periods, data: [{period, revenue, net_income, eps, ...}]} | |
| Note: Data includes latest year, so no need to call get_latest_financial_data separately. | |
| """ | |
| if years < 1 or years > 10: | |
| return {"error": "Years parameter must be between 1 and 10"} | |
| metrics = financial_analyzer.extract_financial_metrics(cik, years) | |
| if metrics: | |
| formatted = financial_analyzer.format_financial_data(metrics) | |
| return {"periods": len(formatted), "data": formatted} | |
| return {"error": f"No financial metrics extracted for CIK: {cik}"} | |
| def get_latest_financial_data(cik: str) -> dict: | |
| """ | |
| Get ONLY most recent annual report. Use extract_financial_metrics instead (it includes latest). | |
| Only use when: User explicitly wants ONLY the latest snapshot (rare case). | |
| Args: cik - 10-digit CIK from search_company | |
| Returns: {period, revenue, net_income, eps, ...} | |
| """ | |
| result = financial_analyzer.get_latest_financial_data(cik) | |
| return result if result and "period" in result else {"error": f"No latest financial data found for CIK: {cik}"} | |
| def advanced_search_company(company_input: str) -> dict: | |
| """ | |
| Alternative to search_company. Accepts company name OR CIK. Same output format. | |
| Args: company_input - Name, ticker, or CIK | |
| Returns: {cik, name, ticker} | |
| """ | |
| result = financial_analyzer.search_company(company_input) | |
| return result if not result.get("error") else {"error": result["error"]} | |
| # Gradio wrapper functions (ζ·»ε θ°θ―εθΆ ζΆε€η) | |
| def gradio_search_company(company_name: str): | |
| """Gradio wrapper for search_company""" | |
| if not company_name or not company_name.strip(): | |
| return json.dumps({"error": "Company name cannot be empty"}, indent=2) | |
| try: | |
| import sys | |
| print(f"[DEBUG] Searching company: {company_name.strip()}", file=sys.stderr) | |
| result = search_company(company_name.strip()) | |
| print(f"[DEBUG] Search result type: {type(result)}", file=sys.stderr) | |
| print(f"[DEBUG] Search result: {result}", file=sys.stderr) | |
| # Ensure result is a dict | |
| if not isinstance(result, dict): | |
| result = {"error": f"Unexpected result type: {type(result)}"} | |
| return json.dumps(result, indent=2) | |
| except TimeoutError as e: | |
| return json.dumps({"error": f"Request timeout: {str(e)}"}, indent=2) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return json.dumps({"error": f"Exception: {str(e)}", "type": str(type(e))}, indent=2) | |
| def gradio_get_company_info(cik: str): | |
| """Gradio wrapper for get_company_info""" | |
| if not cik or not cik.strip(): | |
| return json.dumps({"error": "CIK cannot be empty"}, indent=2) | |
| try: | |
| import sys | |
| print(f"[DEBUG] Getting company info for CIK: {cik.strip()}", file=sys.stderr) | |
| result = get_company_info(cik.strip()) | |
| print(f"[DEBUG] Company info result: {result}", file=sys.stderr) | |
| if not isinstance(result, dict): | |
| result = {"error": f"Unexpected result type: {type(result)}"} | |
| return json.dumps(result, indent=2) | |
| except TimeoutError as e: | |
| return json.dumps({"error": f"Request timeout: {str(e)}"}, indent=2) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return json.dumps({"error": f"Exception: {str(e)}", "type": str(type(e))}, indent=2) | |
| def gradio_extract_metrics(cik: str, years: float): | |
| """Gradio wrapper for extract_financial_metrics""" | |
| if not cik or not cik.strip(): | |
| return json.dumps({"error": "CIK cannot be empty"}, indent=2) | |
| try: | |
| import sys | |
| years_int = int(years) | |
| print(f"[DEBUG] Extracting metrics for CIK: {cik.strip()}, Years: {years_int}", file=sys.stderr) | |
| result = extract_financial_metrics(cik.strip(), years_int) | |
| print(f"[DEBUG] Extract metrics result: {result}", file=sys.stderr) | |
| if not isinstance(result, dict): | |
| result = {"error": f"Unexpected result type: {type(result)}"} | |
| return json.dumps(result, indent=2) | |
| except TimeoutError as e: | |
| return json.dumps({"error": f"Request timeout: {str(e)}"}, indent=2) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return json.dumps({"error": f"Exception: {str(e)}", "type": str(type(e))}, indent=2) | |
| def gradio_get_latest(cik: str): | |
| """Gradio wrapper for get_latest_financial_data""" | |
| if not cik or not cik.strip(): | |
| return json.dumps({"error": "CIK cannot be empty"}, indent=2) | |
| try: | |
| import sys | |
| print(f"[DEBUG] Getting latest data for CIK: {cik.strip()}", file=sys.stderr) | |
| result = get_latest_financial_data(cik.strip()) | |
| print(f"[DEBUG] Latest data result: {result}", file=sys.stderr) | |
| if not isinstance(result, dict): | |
| result = {"error": f"Unexpected result type: {type(result)}"} | |
| return json.dumps(result, indent=2) | |
| except TimeoutError as e: | |
| return json.dumps({"error": f"Request timeout: {str(e)}"}, indent=2) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| return json.dumps({"error": f"Exception: {str(e)}", "type": str(type(e))}, indent=2) | |
| # Mount MCP HTTP endpoints for client integration | |
| app = FastAPI() | |
| async def mcp_http_endpoint(request: Request): | |
| """MCP HTTP endpoint - handle both GET and POST (accessible via /mcp or /sse)""" | |
| try: | |
| # For POST requests, get JSON body | |
| if request.method == "POST": | |
| body = await request.json() | |
| # Handle JSON-RPC request | |
| if body.get("method") == "tools/list": | |
| # List all tools | |
| tools = [] | |
| for tool_name, tool_func in mcp._tool_manager._tools.items(): | |
| tools.append({ | |
| "name": tool_name, | |
| "description": tool_func.__doc__ or "", | |
| "inputSchema": { | |
| "type": "object", | |
| "properties": {}, | |
| "required": [] | |
| } | |
| }) | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "result": {"tools": tools} | |
| }) | |
| elif body.get("method") == "tools/call": | |
| # Call a specific tool | |
| tool_name = body.get("params", {}).get("name") | |
| arguments = body.get("params", {}).get("arguments", {}) | |
| # Directly call the tool functions we defined | |
| tool_map = { | |
| "search_company": search_company, | |
| "get_company_info": get_company_info, | |
| "get_company_filings": get_company_filings, | |
| "get_financial_data": get_financial_data, | |
| "extract_financial_metrics": extract_financial_metrics, | |
| "get_latest_financial_data": get_latest_financial_data, | |
| "advanced_search_company": advanced_search_company | |
| } | |
| if tool_name in tool_map: | |
| tool_func = tool_map[tool_name] | |
| result = tool_func(**arguments) | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "result": { | |
| "content": [{ | |
| "type": "text", | |
| "text": json.dumps(result) | |
| }] | |
| } | |
| }) | |
| else: | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "error": { | |
| "code": -32601, | |
| "message": f"Tool not found: {tool_name}" | |
| } | |
| }) | |
| else: | |
| return JSONResponse({ | |
| "jsonrpc": "2.0", | |
| "id": body.get("id"), | |
| "error": { | |
| "code": -32601, | |
| "message": f"Method not found: {body.get('method')}" | |
| } | |
| }) | |
| else: | |
| # GET request - return info | |
| return JSONResponse({ | |
| "service": "SEC Financial Data MCP Server", | |
| "protocol": "MCP 2024-11-05", | |
| "transport": "HTTP", | |
| "status": "online" | |
| }) | |
| except Exception as e: | |
| return JSONResponse( | |
| content={"error": str(e), "type": "server_error"}, | |
| status_code=500 | |
| ) | |
| # Create Gradio interface | |
| with gr.Blocks(title="SEC Financial Data MCP Server") as demo: | |
| gr.Markdown(""" | |
| # π SEC Financial Data MCP Server | |
| Access real-time SEC EDGAR financial data via Model Context Protocol | |
| **MCP Endpoint:** Use `mcp_server_fastmcp.py` for standard MCP client connections | |
| """) | |
| with gr.Tab("π Search Company"): | |
| gr.Markdown("### Search for a company by name") | |
| with gr.Row(): | |
| with gr.Column(): | |
| company_input = gr.Textbox(label="Company Name", placeholder="Tesla", value="Tesla") | |
| search_btn = gr.Button("Search", variant="primary") | |
| with gr.Column(): | |
| search_output = gr.Code(label="Result", language="json", lines=15) | |
| search_btn.click(gradio_search_company, inputs=company_input, outputs=search_output) | |
| with gr.Tab("βΉοΈ Company Info"): | |
| gr.Markdown("### Get detailed company information") | |
| with gr.Row(): | |
| with gr.Column(): | |
| cik_input = gr.Textbox(label="Company CIK", placeholder="0001318605", value="0001318605") | |
| info_btn = gr.Button("Get Info", variant="primary") | |
| with gr.Column(): | |
| info_output = gr.Code(label="Result", language="json", lines=15) | |
| info_btn.click(gradio_get_company_info, inputs=cik_input, outputs=info_output) | |
| with gr.Tab("π Financial Metrics"): | |
| gr.Markdown("### Extract multi-year financial metrics β") | |
| with gr.Row(): | |
| with gr.Column(): | |
| metrics_cik = gr.Textbox(label="Company CIK", placeholder="0001318605", value="0001318605") | |
| metrics_years = gr.Slider(minimum=1, maximum=10, value=3, step=1, label="Years") | |
| metrics_btn = gr.Button("Extract Metrics", variant="primary") | |
| with gr.Column(): | |
| metrics_output = gr.Code(label="Result", language="json", lines=20) | |
| metrics_btn.click(gradio_extract_metrics, inputs=[metrics_cik, metrics_years], outputs=metrics_output) | |
| with gr.Tab("π Latest Data"): | |
| gr.Markdown("### Get latest financial data") | |
| with gr.Row(): | |
| with gr.Column(): | |
| latest_cik = gr.Textbox(label="Company CIK", placeholder="0001318605", value="0001318605") | |
| latest_btn = gr.Button("Get Latest", variant="primary") | |
| with gr.Column(): | |
| latest_output = gr.Code(label="Result", language="json", lines=15) | |
| latest_btn.click(gradio_get_latest, inputs=latest_cik, outputs=latest_output) | |
| with gr.Tab("π Documentation"): | |
| gr.Markdown(""" | |
| ## π οΈ Available Tools (7) | |
| 1. **search_company** - Search by company name | |
| 2. **get_company_info** - Get company details by CIK | |
| 3. **get_company_filings** - List SEC filings | |
| 4. **get_financial_data** - Get specific period data | |
| 5. **extract_financial_metrics** β - Multi-year trends | |
| 6. **get_latest_financial_data** - Latest snapshot | |
| 7. **advanced_search_company** - Flexible search | |
| ## π MCP Integration | |
| For MCP client integration, run: | |
| ```bash | |
| python mcp_server_fastmcp.py | |
| ``` | |
| Then configure your MCP client (e.g., Claude Desktop): | |
| ```json | |
| { | |
| "mcpServers": { | |
| "sec-financial-data": { | |
| "command": "python", | |
| "args": ["path/to/mcp_server_fastmcp.py"] | |
| } | |
| } | |
| } | |
| ``` | |
| ## π Data Source | |
| - **SEC EDGAR API** - Official SEC data | |
| - **Financial Statements** - 10-K, 10-Q, 20-F forms | |
| - **XBRL Data** - Structured metrics | |
| """) | |
| # Mount Gradio app to FastAPI | |
| app = gr.mount_gradio_app(app, demo, path="/") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |