JC321's picture
Upload app.py
b3daa10 verified
raw
history blame
18.8 kB
import gradio as gr
import requests
import json
import os
MCP_SPACE = "JC321/EasyReportDateMCP"
MCP_URL = "https://jc321-easyreportdatemcp.hf.space"
# 设置请求头
HEADERS = {
"Content-Type": "application/json",
"User-Agent": "SEC-Query-Assistant/1.0 (jtyxabc@gmail.com)"
}
# 格式化数值显示
def format_value(value, is_money=True):
"""格式化数值:0显示为N/A,其他显示为带单位的格式"""
if value is None or value == 0:
return "N/A"
if is_money:
return f"${value:.2f}B"
else:
return f"{value:.2f}"
# MCP 工具定义
def create_mcp_tools():
"""创建 MCP 工具列表"""
return [
{
"name": "query_financial_data",
"description": "Query SEC financial data for US listed companies",
"parameters": {
"type": "object",
"properties": {
"company_name": {
"type": "string",
"description": "Company name or stock symbol (e.g., Apple, NVIDIA, AAPL)"
},
"query_type": {
"type": "string",
"enum": ["Latest Financial Data", "3-Year Trends", "5-Year Trends"],
"description": "Type of financial query"
}
},
"required": ["company_name", "query_type"]
}
}
]
# 工具执行函数
def execute_tool(tool_name, **kwargs):
"""执行 MCP 工具"""
if tool_name == "query_financial_data":
return query_financial_data(kwargs.get("company_name"), kwargs.get("query_type"))
return f"Unknown tool: {tool_name}"
# 创建超链接
def create_source_link(source_form, source_url=None):
"""为Source Form创建超链接,使用MCP后端返回的URL"""
if not source_form or source_form == 'N/A':
return source_form
# 如果后端提供了URL,使用后端的URL
if source_url and source_url != 'N/A':
return f"[{source_form}]({source_url})"
# 如果没有URL,只显示文本
return source_form
def query_financial_data(company_name, query_type):
"""查询财务数据的主函数"""
if not company_name:
return "Please enter a company name or stock symbol"
# 翻译英文查询类型为中文(用于后端处理)
query_type_mapping = {
"Latest Financial Data": "最新财务数据",
"3-Year Trends": "3年趋势",
"5-Year Trends": "5年趋势",
"Company Filings": "公司报表列表"
}
internal_query_type = query_type_mapping.get(query_type, query_type)
try:
# 直接调用 FastAPI 的 REST API 端点
# 先搜索公司
search_resp = requests.post(
f"{MCP_URL}/api/advanced_search",
json={"company_input": company_name},
headers=HEADERS,
timeout=30
)
if search_resp.status_code != 200:
return f"❌ Server Error: HTTP {search_resp.status_code}\n\nResponse: {search_resp.text[:500]}"
try:
company = search_resp.json()
except (ValueError, KeyError) as e:
return f"❌ JSON Parse Error: {str(e)}\n\nResponse: {search_resp.text[:500]}"
if isinstance(company, dict) and company.get("error"):
return f"❌ 错误: {company['error']}"
result = f"# {company.get('name', 'Unknown')}\n\n"
result += f"**股票代码**: {company.get('tickers', ['N/A'])[0] if company.get('tickers') else 'N/A'}\n"
result += f"**行业**: {company.get('sic_description', 'N/A')}\n\n---\n\n"
cik = company.get('cik')
# 根据查询类型获取数据
if internal_query_type == "最新财务数据":
data_resp = requests.post(
f"{MCP_URL}/api/get_latest_financial_data",
json={"cik": cik},
headers=HEADERS,
timeout=30
)
if data_resp.status_code != 200:
return result + f"❌ Server Error: HTTP {data_resp.status_code}\n\n{data_resp.text[:500]}"
try:
data = data_resp.json()
except (ValueError, KeyError) as e:
return result + f"❌ JSON Parse Error: {str(e)}\n\n{data_resp.text[:500]}"
if isinstance(data, dict) and data.get("error"):
return result + f"❌ {data['error']}"
cik = data.get('cik')
result += f"## Fiscal Year {data.get('period', 'N/A')}\n\n"
total_revenue = data.get('total_revenue', 0) / 1e9 if data.get('total_revenue') else 0
net_income = data.get('net_income', 0) / 1e9 if data.get('net_income') else 0
eps = data.get('earnings_per_share', 0) if data.get('earnings_per_share') else 0
opex = data.get('operating_expenses', 0) / 1e9 if data.get('operating_expenses') else 0
ocf = data.get('operating_cash_flow', 0) / 1e9 if data.get('operating_cash_flow') else 0
result += f"- **Total Revenue**: {format_value(total_revenue)}\n"
result += f"- **Net Income**: {format_value(net_income)}\n"
result += f"- **Earnings Per Share**: {format_value(eps, False)}\n"
result += f"- **Operating Expenses**: {format_value(opex)}\n"
result += f"- **Operating Cash Flow**: {format_value(ocf)}\n"
# 使用后端返回的 source_url
source_form = data.get('source_form', 'N/A')
source_url = data.get('source_url', None) # 从后端获取URL
result += f"- **Source Form**: {create_source_link(source_form, source_url)}\n"
elif internal_query_type == "3年趋势":
metrics_resp = requests.post(
f"{MCP_URL}/api/extract_financial_metrics",
json={"cik": cik, "years": 3},
headers=HEADERS,
timeout=60
)
if metrics_resp.status_code != 200:
return result + f"❌ Server Error: HTTP {metrics_resp.status_code}\n\n{metrics_resp.text[:500]}"
try:
metrics = metrics_resp.json()
except (ValueError, KeyError) as e:
return result + f"❌ JSON Parse Error: {str(e)}\n\n{metrics_resp.text[:500]}"
if isinstance(metrics, dict) and metrics.get("error"):
return result + f"❌ {metrics['error']}"
result += f"## 3-Year Financial Trends ({metrics.get('count', 0)} periods)\n\n"
# 显示所有数据(包括年度和季度)
all_data = metrics.get('metrics', [])
# 按期间降序排序,确保显示最近的3年数据
all_data = sorted(all_data, key=lambda x: x.get('period', '0000'), reverse=True)
result += "| Period | Revenue (B) | Net Income (B) | EPS | Operating Expenses (B) | Operating Cash Flow (B) | Source Form |\n"
result += "|--------|-------------|----------------|-----|------------------------|-------------------------|-------------|\n"
for m in all_data:
period = m.get('period', 'N/A')
rev = (m.get('total_revenue') or 0) / 1e9
inc = (m.get('net_income') or 0) / 1e9
eps_val = m.get('earnings_per_share') or 0
opex = (m.get('operating_expenses') or 0) / 1e9
ocf = (m.get('operating_cash_flow') or 0) / 1e9
source_form = m.get('source_form', 'N/A')
source_url = m.get('source_url', None) # 从后端获取URL
# 区分年度和季度
period_prefix = "FY" if 'Q' not in period else ""
source_link = create_source_link(source_form, source_url)
result += f"| {period_prefix}{period} | {format_value(rev)} | {format_value(inc)} | {format_value(eps_val, False)} | {format_value(opex)} | {format_value(ocf)} | {source_link} |\n"
elif internal_query_type == "5年趋势":
metrics_resp = requests.post(
f"{MCP_URL}/api/extract_financial_metrics",
json={"cik": cik, "years": 5},
headers=HEADERS,
timeout=60
)
if metrics_resp.status_code != 200:
return result + f"❌ Server Error: HTTP {metrics_resp.status_code}\n\n{metrics_resp.text[:500]}"
try:
metrics = metrics_resp.json()
except (ValueError, KeyError) as e:
return result + f"❌ JSON Parse Error: {str(e)}\n\n{metrics_resp.text[:500]}"
if isinstance(metrics, dict) and metrics.get("error"):
return result + f"❌ {metrics['error']}"
# 显示所有数据(包括年度和季度)
all_data = metrics.get('metrics', [])
# 按期间降序排序,确保显示最近的5年数据
all_data = sorted(all_data, key=lambda x: x.get('period', '0000'), reverse=True)
result += f"## 5-Year Financial Trends ({metrics.get('count', 0)} periods)\n\n"
result += "| Period | Revenue (B) | Net Income (B) | EPS | Operating Expenses (B) | Operating Cash Flow (B) | Source Form |\n"
result += "|--------|-------------|----------------|-----|------------------------|-------------------------|-------------|\n"
for m in all_data:
period = m.get('period', 'N/A')
rev = (m.get('total_revenue') or 0) / 1e9
inc = (m.get('net_income') or 0) / 1e9
eps_val = m.get('earnings_per_share') or 0
opex = (m.get('operating_expenses') or 0) / 1e9
ocf = (m.get('operating_cash_flow') or 0) / 1e9
source_form = m.get('source_form', 'N/A')
source_url = m.get('source_url', None) # 从后端获取URL
# 区分年度和季度
period_prefix = "FY" if 'Q' not in period else ""
source_link = create_source_link(source_form, source_url)
result += f"| {period_prefix}{period} | {format_value(rev)} | {format_value(inc)} | {format_value(eps_val, False)} | {format_value(opex)} | {format_value(ocf)} | {source_link} |\n"
elif internal_query_type == "公司报表列表":
# 查询公司所有报表
filings_resp = requests.post(
f"{MCP_URL}/api/get_company_filings",
json={"cik": cik, "limit": 50}, # 限制最多50条
headers=HEADERS,
timeout=60
)
if filings_resp.status_code != 200:
return result + f"❌ Server Error: HTTP {filings_resp.status_code}\n\n{filings_resp.text[:500]}"
try:
filings_data = filings_resp.json()
except (ValueError, KeyError) as e:
return result + f"❌ JSON Parse Error: {str(e)}\n\n{filings_resp.text[:500]}"
if isinstance(filings_data, dict) and filings_data.get("error"):
return result + f"❌ {filings_data['error']}"
filings = filings_data.get('filings', []) if isinstance(filings_data, dict) else filings_data
result += f"## Company Filings ({len(filings)} records)\n\n"
result += "| Form Type | Filing Date | Accession Number | Primary Document |\n"
result += "|-----------|-------------|------------------|------------------|\n"
for filing in filings:
form_type = filing.get('form_type', 'N/A')
filing_date = filing.get('filing_date', 'N/A')
accession_num = filing.get('accession_number', 'N/A')
primary_doc = filing.get('primary_document', 'N/A')
filing_url = filing.get('filing_url', None) # 从后端获取URL
# 使用后端返回的URL创建链接
if filing_url and filing_url != 'N/A':
form_link = f"[{form_type}]({filing_url})"
primary_doc_link = f"[{primary_doc}]({filing_url})"
else:
form_link = form_type
primary_doc_link = primary_doc
result += f"| {form_link} | {filing_date} | {accession_num} | {primary_doc_link} |\n"
return result
except requests.exceptions.RequestException as e:
return f"❌ Network Error: {str(e)}\n\nMCP Server: {MCP_URL}"
except Exception as e:
import traceback
return f"❌ Unexpected Error: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
# Chatbot 功能:使用MCP工具
def chatbot_response(message, history):
"""聊天机器人响应函数,集成MCP工具"""
try:
# 检查是否是财务查询相关问题
financial_keywords = ['financial', 'revenue', 'income', 'earnings', 'cash flow', 'expenses', '财务', '收入', '利润', 'data', 'trend', 'performance']
if any(keyword in message.lower() for keyword in financial_keywords):
# 提取公司名称和查询类型
company_keywords = ['apple', 'microsoft', 'nvidia', 'tesla', 'alibaba', 'google', 'amazon', 'meta', 'tsla', 'aapl', 'msft', 'nvda', 'googl', 'amzn']
detected_company = None
for company in company_keywords:
if company in message.lower():
if company in ['aapl']: detected_company = 'Apple'
elif company in ['msft']: detected_company = 'Microsoft'
elif company in ['nvda']: detected_company = 'NVIDIA'
elif company in ['tsla']: detected_company = 'Tesla'
elif company in ['googl']: detected_company = 'Google'
elif company in ['amzn']: detected_company = 'Amazon'
else: detected_company = company.capitalize()
break
if detected_company:
# 根据问题内容选择查询类型
if any(word in message.lower() for word in ['trend', '趋势', 'history', 'historical', 'over time']):
if any(word in message for word in ['5', 'five', '五年']):
query_type = '5-Year Trends'
else:
query_type = '3-Year Trends'
else:
query_type = 'Latest Financial Data'
# 调用财务查询函数
result = query_financial_data(detected_company, query_type)
return f"Here's the financial information for {detected_company}:\n\n{result}"
else:
return "I can help you query financial data! Please specify a company name. For example: 'Show me Apple's latest financial data' or 'What's NVIDIA's 3-year trend?' \n\nSupported companies include: Apple, Microsoft, NVIDIA, Tesla, Alibaba, Google, Amazon, and more."
# 如果不是财务查询,返回通用回复
return "Hello! I'm a financial data assistant powered by SEC EDGAR data. I can help you query financial information for US listed companies.\n\n**What I can do:**\n- Get latest financial data (revenue, income, EPS, etc.)\n- Show 3-year or 5-year financial trends\n- Provide detailed financial metrics\n\n**Try asking:**\n- 'Show me Apple's latest financial data'\n- 'What's NVIDIA's 3-year financial trend?'\n- 'How is Microsoft performing financially?'"
except Exception as e:
return f"Sorry, I encountered an error: {str(e)}. Please try asking about financial data for specific companies like Apple, Microsoft, NVIDIA, Tesla, etc."
# 创建 Gradio 界面
with gr.Blocks(title="SEC Financial Data Query Assistant") as demo:
gr.Markdown("# 🤖 SEC Financial Data Query Assistant")
gr.Markdown("Query SEC financial data for US listed companies through MCP Server")
with gr.Tab("AI Assistant"):
# 使用 Gradio 6.0.1 的 ChatInterface 和 MCP 支持
chat = gr.ChatInterface(
fn=chatbot_response,
title="💬 Financial AI Assistant",
description="Ask me about financial data for any US listed company! I can help you query latest data or analyze trends.",
examples=[
"Show me Apple's latest financial data",
"What's NVIDIA's 3-year financial trend?",
"Get Microsoft's 5-year financial trends",
"How is Tesla performing financially?"
],
cache_examples=False,
chatbot=gr.Chatbot(height=700),
textbox=gr.Textbox(placeholder="Ask me about any company's financial data...", scale=7)
)
with gr.Tab("Direct Query"):
gr.Markdown("## 🔍 Direct Financial Data Query")
with gr.Row():
company_input = gr.Textbox(
label="Company Name or Stock Symbol",
placeholder="e.g., NVIDIA, Apple, Alibaba, AAPL",
scale=2
)
query_type = gr.Radio(
["Latest Financial Data", "3-Year Trends", "5-Year Trends", "Company Filings"],
label="Query Type",
value="Latest Financial Data",
scale=1
)
submit_btn = gr.Button("🔍 Query", variant="primary", size="lg")
output = gr.Markdown(label="Query Results")
# 示例
gr.Examples(
examples=[
["NVIDIA", "Latest Financial Data"],
["Apple", "3-Year Trends"],
["Microsoft", "5-Year Trends"],
["Alibaba", "Company Filings"],
["Tesla", "3-Year Trends"]
],
inputs=[company_input, query_type],
outputs=output,
fn=query_financial_data,
cache_examples=False
)
submit_btn.click(
fn=query_financial_data,
inputs=[company_input, query_type],
outputs=output
)
gr.Markdown("---")
gr.Markdown(f"**Data Source**: SEC EDGAR | **MCP Server**: `{MCP_URL}`")
demo.launch()