Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import json | |
| import os | |
| MCP_SPACE = "JC321/EasyReportDateMCP" | |
| MCP_URL = "https://jc321-easyreportdatemcp.hf.space" | |
| # 设置请求头 | |
| HEADERS = { | |
| "Content-Type": "application/json", | |
| "User-Agent": "SEC-Query-Assistant/1.0 (jtyxabc@gmail.com)" | |
| } | |
| # 格式化数值显示 | |
| def format_value(value, is_money=True): | |
| """格式化数值:0显示为N/A,其他显示为带单位的格式""" | |
| if value is None or value == 0: | |
| return "N/A" | |
| if is_money: | |
| return f"${value:.2f}B" | |
| else: | |
| return f"{value:.2f}" | |
| # MCP 工具定义 | |
| def create_mcp_tools(): | |
| """创建 MCP 工具列表""" | |
| return [ | |
| { | |
| "name": "query_financial_data", | |
| "description": "Query SEC financial data for US listed companies", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "company_name": { | |
| "type": "string", | |
| "description": "Company name or stock symbol (e.g., Apple, NVIDIA, AAPL)" | |
| }, | |
| "query_type": { | |
| "type": "string", | |
| "enum": ["Latest Financial Data", "3-Year Trends", "5-Year Trends"], | |
| "description": "Type of financial query" | |
| } | |
| }, | |
| "required": ["company_name", "query_type"] | |
| } | |
| } | |
| ] | |
| # 工具执行函数 | |
| def execute_tool(tool_name, **kwargs): | |
| """执行 MCP 工具""" | |
| if tool_name == "query_financial_data": | |
| return query_financial_data(kwargs.get("company_name"), kwargs.get("query_type")) | |
| return f"Unknown tool: {tool_name}" | |
| # 创建超链接 | |
| def create_source_link(source_form, cik=None): | |
| """为Source Form创建SEC EDGAR超链接""" | |
| if not source_form or source_form == 'N/A' or not cik: | |
| return source_form | |
| # 构建SEC EDGAR链接(这是一个示例,实际链接需要更多参数) | |
| edgar_url = f"https://www.sec.gov/edgar/browse/?CIK={cik}" | |
| return f"[{source_form}]({edgar_url})" | |
| def query_financial_data(company_name, query_type): | |
| """查询财务数据的主函数""" | |
| if not company_name: | |
| return "Please enter a company name or stock symbol" | |
| # 翻译英文查询类型为中文(用于后端处理) | |
| query_type_mapping = { | |
| "Latest Financial Data": "最新财务数据", | |
| "3-Year Trends": "3年趋势", | |
| "5-Year Trends": "5年趋势", | |
| "Company Filings": "公司报表列表" | |
| } | |
| internal_query_type = query_type_mapping.get(query_type, query_type) | |
| try: | |
| # 直接调用 FastAPI 的 REST API 端点 | |
| # 先搜索公司 | |
| search_resp = requests.post( | |
| f"{MCP_URL}/api/advanced_search", | |
| json={"company_input": company_name}, | |
| headers=HEADERS, | |
| timeout=30 | |
| ) | |
| if search_resp.status_code != 200: | |
| return f"❌ Server Error: HTTP {search_resp.status_code}\n\nResponse: {search_resp.text[:500]}" | |
| try: | |
| company = search_resp.json() | |
| except (ValueError, KeyError) as e: | |
| return f"❌ JSON Parse Error: {str(e)}\n\nResponse: {search_resp.text[:500]}" | |
| if isinstance(company, dict) and company.get("error"): | |
| return f"❌ 错误: {company['error']}" | |
| result = f"# {company.get('name', 'Unknown')}\n\n" | |
| result += f"**股票代码**: {company.get('tickers', ['N/A'])[0] if company.get('tickers') else 'N/A'}\n" | |
| result += f"**行业**: {company.get('sic_description', 'N/A')}\n\n---\n\n" | |
| cik = company.get('cik') | |
| # 根据查询类型获取数据 | |
| if internal_query_type == "最新财务数据": | |
| data_resp = requests.post( | |
| f"{MCP_URL}/api/get_latest_financial_data", | |
| json={"cik": cik}, | |
| headers=HEADERS, | |
| timeout=30 | |
| ) | |
| if data_resp.status_code != 200: | |
| return result + f"❌ Server Error: HTTP {data_resp.status_code}\n\n{data_resp.text[:500]}" | |
| try: | |
| data = data_resp.json() | |
| except (ValueError, KeyError) as e: | |
| return result + f"❌ JSON Parse Error: {str(e)}\n\n{data_resp.text[:500]}" | |
| if isinstance(data, dict) and data.get("error"): | |
| return result + f"❌ {data['error']}" | |
| cik = data.get('cik') | |
| result += f"## Fiscal Year {data.get('period', 'N/A')}\n\n" | |
| total_revenue = data.get('total_revenue', 0) / 1e9 if data.get('total_revenue') else 0 | |
| net_income = data.get('net_income', 0) / 1e9 if data.get('net_income') else 0 | |
| eps = data.get('earnings_per_share', 0) if data.get('earnings_per_share') else 0 | |
| opex = data.get('operating_expenses', 0) / 1e9 if data.get('operating_expenses') else 0 | |
| ocf = data.get('operating_cash_flow', 0) / 1e9 if data.get('operating_cash_flow') else 0 | |
| result += f"- **Total Revenue**: {format_value(total_revenue)}\n" | |
| result += f"- **Net Income**: {format_value(net_income)}\n" | |
| result += f"- **Earnings Per Share**: {format_value(eps, False)}\n" | |
| result += f"- **Operating Expenses**: {format_value(opex)}\n" | |
| result += f"- **Operating Cash Flow**: {format_value(ocf)}\n" | |
| result += f"- **Source Form**: {create_source_link(data.get('source_form', 'N/A'), cik)}\n" | |
| elif internal_query_type == "3年趋势": | |
| metrics_resp = requests.post( | |
| f"{MCP_URL}/api/extract_financial_metrics", | |
| json={"cik": cik, "years": 3}, | |
| headers=HEADERS, | |
| timeout=60 | |
| ) | |
| if metrics_resp.status_code != 200: | |
| return result + f"❌ Server Error: HTTP {metrics_resp.status_code}\n\n{metrics_resp.text[:500]}" | |
| try: | |
| metrics = metrics_resp.json() | |
| except (ValueError, KeyError) as e: | |
| return result + f"❌ JSON Parse Error: {str(e)}\n\n{metrics_resp.text[:500]}" | |
| if isinstance(metrics, dict) and metrics.get("error"): | |
| return result + f"❌ {metrics['error']}" | |
| result += f"## 3-Year Financial Trends ({metrics.get('count', 0)} periods)\n\n" | |
| # 显示所有数据(包括年度和季度) | |
| all_data = metrics.get('metrics', []) | |
| result += "| Period | Revenue (B) | Net Income (B) | EPS | Operating Expenses (B) | Operating Cash Flow (B) | Source Form |\n" | |
| result += "|--------|-------------|----------------|-----|------------------------|-------------------------|-------------|\n" | |
| for m in all_data: | |
| period = m.get('period', 'N/A') | |
| rev = (m.get('total_revenue') or 0) / 1e9 | |
| inc = (m.get('net_income') or 0) / 1e9 | |
| eps_val = m.get('earnings_per_share') or 0 | |
| opex = (m.get('operating_expenses') or 0) / 1e9 | |
| ocf = (m.get('operating_cash_flow') or 0) / 1e9 | |
| source_form = m.get('source_form', 'N/A') | |
| cik_val = m.get('cik') or cik | |
| # 区分年度和季度 | |
| period_prefix = "FY" if 'Q' not in period else "" | |
| source_link = create_source_link(source_form, cik_val) | |
| result += f"| {period_prefix}{period} | {format_value(rev)} | {format_value(inc)} | {format_value(eps_val, False)} | {format_value(opex)} | {format_value(ocf)} | {source_link} |\n" | |
| elif internal_query_type == "5年趋势": | |
| metrics_resp = requests.post( | |
| f"{MCP_URL}/api/extract_financial_metrics", | |
| json={"cik": cik, "years": 5}, | |
| headers=HEADERS, | |
| timeout=60 | |
| ) | |
| if metrics_resp.status_code != 200: | |
| return result + f"❌ Server Error: HTTP {metrics_resp.status_code}\n\n{metrics_resp.text[:500]}" | |
| try: | |
| metrics = metrics_resp.json() | |
| except (ValueError, KeyError) as e: | |
| return result + f"❌ JSON Parse Error: {str(e)}\n\n{metrics_resp.text[:500]}" | |
| if isinstance(metrics, dict) and metrics.get("error"): | |
| return result + f"❌ {metrics['error']}" | |
| # 显示所有数据(包括年度和季度) | |
| all_data = metrics.get('metrics', []) | |
| result += f"## 5-Year Financial Trends ({metrics.get('count', 0)} periods)\n\n" | |
| result += "| Period | Revenue (B) | Net Income (B) | EPS | Operating Expenses (B) | Operating Cash Flow (B) | Source Form |\n" | |
| result += "|--------|-------------|----------------|-----|------------------------|-------------------------|-------------|\n" | |
| for m in all_data: | |
| period = m.get('period', 'N/A') | |
| rev = (m.get('total_revenue') or 0) / 1e9 | |
| inc = (m.get('net_income') or 0) / 1e9 | |
| eps_val = m.get('earnings_per_share') or 0 | |
| opex = (m.get('operating_expenses') or 0) / 1e9 | |
| ocf = (m.get('operating_cash_flow') or 0) / 1e9 | |
| source_form = m.get('source_form', 'N/A') | |
| cik_val = m.get('cik') or cik | |
| # 区分年度和季度 | |
| period_prefix = "FY" if 'Q' not in period else "" | |
| source_link = create_source_link(source_form, cik_val) | |
| result += f"| {period_prefix}{period} | {format_value(rev)} | {format_value(inc)} | {format_value(eps_val, False)} | {format_value(opex)} | {format_value(ocf)} | {source_link} |\n" | |
| elif internal_query_type == "公司报表列表": | |
| # 查询公司所有报表 | |
| filings_resp = requests.post( | |
| f"{MCP_URL}/api/get_company_filings", | |
| json={"cik": cik, "limit": 50}, # 限制最多50条 | |
| headers=HEADERS, | |
| timeout=60 | |
| ) | |
| if filings_resp.status_code != 200: | |
| return result + f"❌ Server Error: HTTP {filings_resp.status_code}\n\n{filings_resp.text[:500]}" | |
| try: | |
| filings_data = filings_resp.json() | |
| except (ValueError, KeyError) as e: | |
| return result + f"❌ JSON Parse Error: {str(e)}\n\n{filings_resp.text[:500]}" | |
| if isinstance(filings_data, dict) and filings_data.get("error"): | |
| return result + f"❌ {filings_data['error']}" | |
| filings = filings_data.get('filings', []) if isinstance(filings_data, dict) else filings_data | |
| result += f"## Company Filings ({len(filings)} records)\n\n" | |
| result += "| Form Type | Filing Date | Accession Number | Primary Document |\n" | |
| result += "|-----------|-------------|------------------|------------------|\n" | |
| for filing in filings: | |
| form_type = filing.get('form_type', 'N/A') | |
| filing_date = filing.get('filing_date', 'N/A') | |
| accession_num = filing.get('accession_number', 'N/A') | |
| primary_doc = filing.get('primary_document', 'N/A') | |
| # 创建 SEC EDGAR 原始文档链接 | |
| if accession_num != 'N/A' and primary_doc != 'N/A': | |
| # 移除 accession_number 中的连字符以匹配 SEC URL 格式 | |
| acc_no_clean = accession_num.replace('-', '') | |
| doc_url = f"https://www.sec.gov/Archives/edgar/data/{cik}/{acc_no_clean}/{primary_doc}" | |
| form_link = f"[{form_type}]({doc_url})" | |
| primary_doc_link = f"[{primary_doc}]({doc_url})" | |
| else: | |
| form_link = form_type | |
| primary_doc_link = primary_doc | |
| result += f"| {form_link} | {filing_date} | {accession_num} | {primary_doc_link} |\n" | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| return f"❌ Network Error: {str(e)}\n\nMCP Server: {MCP_URL}" | |
| except Exception as e: | |
| import traceback | |
| return f"❌ Unexpected Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}" | |
| # Chatbot 功能:使用MCP工具 | |
| def chatbot_response(message, history): | |
| """聊天机器人响应函数,集成MCP工具""" | |
| try: | |
| # 检查是否是财务查询相关问题 | |
| financial_keywords = ['financial', 'revenue', 'income', 'earnings', 'cash flow', 'expenses', '财务', '收入', '利润', 'data', 'trend', 'performance'] | |
| if any(keyword in message.lower() for keyword in financial_keywords): | |
| # 提取公司名称和查询类型 | |
| company_keywords = ['apple', 'microsoft', 'nvidia', 'tesla', 'alibaba', 'google', 'amazon', 'meta', 'tsla', 'aapl', 'msft', 'nvda', 'googl', 'amzn'] | |
| detected_company = None | |
| for company in company_keywords: | |
| if company in message.lower(): | |
| if company in ['aapl']: detected_company = 'Apple' | |
| elif company in ['msft']: detected_company = 'Microsoft' | |
| elif company in ['nvda']: detected_company = 'NVIDIA' | |
| elif company in ['tsla']: detected_company = 'Tesla' | |
| elif company in ['googl']: detected_company = 'Google' | |
| elif company in ['amzn']: detected_company = 'Amazon' | |
| else: detected_company = company.capitalize() | |
| break | |
| if detected_company: | |
| # 根据问题内容选择查询类型 | |
| if any(word in message.lower() for word in ['trend', '趋势', 'history', 'historical', 'over time']): | |
| if any(word in message for word in ['5', 'five', '五年']): | |
| query_type = '5-Year Trends' | |
| else: | |
| query_type = '3-Year Trends' | |
| else: | |
| query_type = 'Latest Financial Data' | |
| # 调用财务查询函数 | |
| result = query_financial_data(detected_company, query_type) | |
| return f"Here's the financial information for {detected_company}:\n\n{result}" | |
| else: | |
| return "I can help you query financial data! Please specify a company name. For example: 'Show me Apple's latest financial data' or 'What's NVIDIA's 3-year trend?' \n\nSupported companies include: Apple, Microsoft, NVIDIA, Tesla, Alibaba, Google, Amazon, and more." | |
| # 如果不是财务查询,返回通用回复 | |
| return "Hello! I'm a financial data assistant powered by SEC EDGAR data. I can help you query financial information for US listed companies.\n\n**What I can do:**\n- Get latest financial data (revenue, income, EPS, etc.)\n- Show 3-year or 5-year financial trends\n- Provide detailed financial metrics\n\n**Try asking:**\n- 'Show me Apple's latest financial data'\n- 'What's NVIDIA's 3-year financial trend?'\n- 'How is Microsoft performing financially?'" | |
| except Exception as e: | |
| return f"Sorry, I encountered an error: {str(e)}. Please try asking about financial data for specific companies like Apple, Microsoft, NVIDIA, Tesla, etc." | |
| # 创建 Gradio 界面 | |
| with gr.Blocks(title="SEC Financial Data Query Assistant") as demo: | |
| gr.Markdown("# 🤖 SEC Financial Data Query Assistant") | |
| gr.Markdown("Query SEC financial data for US listed companies through MCP Server") | |
| with gr.Tab("AI Assistant"): | |
| # 使用 Gradio 6.0.1 的 ChatInterface 和 MCP 支持 | |
| chat = gr.ChatInterface( | |
| fn=chatbot_response, | |
| title="💬 Financial AI Assistant", | |
| description="Ask me about financial data for any US listed company! I can help you query latest data or analyze trends.", | |
| examples=[ | |
| "Show me Apple's latest financial data", | |
| "What's NVIDIA's 3-year financial trend?", | |
| "Get Microsoft's 5-year financial trends", | |
| "How is Tesla performing financially?" | |
| ], | |
| cache_examples=False, | |
| chatbot=gr.Chatbot(height=700), | |
| textbox=gr.Textbox(placeholder="Ask me about any company's financial data...", scale=7), | |
| submit_btn="Send", | |
| retry_btn="🔄 Retry", | |
| undo_btn="⬅️ Undo", | |
| clear_btn="🗑️ Clear" | |
| ) | |
| with gr.Tab("Direct Query"): | |
| gr.Markdown("## 🔍 Direct Financial Data Query") | |
| with gr.Row(): | |
| company_input = gr.Textbox( | |
| label="Company Name or Stock Symbol", | |
| placeholder="e.g., NVIDIA, Apple, Alibaba, AAPL", | |
| scale=2 | |
| ) | |
| query_type = gr.Radio( | |
| ["Latest Financial Data", "3-Year Trends", "5-Year Trends", "Company Filings"], | |
| label="Query Type", | |
| value="Latest Financial Data", | |
| scale=1 | |
| ) | |
| submit_btn = gr.Button("🔍 Query", variant="primary", size="lg") | |
| output = gr.Markdown(label="Query Results") | |
| # 示例 | |
| gr.Examples( | |
| examples=[ | |
| ["NVIDIA", "Latest Financial Data"], | |
| ["Apple", "3-Year Trends"], | |
| ["Microsoft", "5-Year Trends"], | |
| ["Alibaba", "Company Filings"], | |
| ["Tesla", "3-Year Trends"] | |
| ], | |
| inputs=[company_input, query_type], | |
| outputs=output, | |
| fn=query_financial_data, | |
| cache_examples=False | |
| ) | |
| submit_btn.click( | |
| fn=query_financial_data, | |
| inputs=[company_input, query_type], | |
| outputs=output | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown(f"**Data Source**: SEC EDGAR | **MCP Server**: `{MCP_URL}`") | |
| demo.launch() |