EasyReportDataMCP / financial_analyzer.py
JC321's picture
Upload 10 files
fa4c2b2 verified
raw
history blame
9.77 kB
"""Financial Data Analysis Module"""
from edgar_client import EdgarDataClient
from datetime import datetime
import json
class FinancialAnalyzer:
def __init__(self, user_agent="Juntao Peng Financial Report Metrics App (jtyxabc@gmail.com)"):
"""
初始化财务分析器
Args:
user_agent (str): 用户代理字符串,用于识别请求来源
"""
self.edgar_client = EdgarDataClient(user_agent)
def search_company(self, company_input):
"""
搜索公司信息(通过名称或CIK)
Args:
company_input (str): 公司名称或CIK
Returns:
dict: 公司信息
"""
# 如果输入是数字,假设它是CIK
if company_input.isdigit() and len(company_input) >= 8:
# 获取公司信息
company_info = self.edgar_client.get_company_info(company_input)
if company_info:
return company_info
else:
return {"error": "未找到指定CIK的公司"}
else:
# 通过名称搜索公司
company = self.edgar_client.search_company_by_name(company_input)
if company:
# 获取详细信息
company_info = self.edgar_client.get_company_info(company['cik'])
if company_info:
return company_info
else:
# 如果无法获取详细信息,返回基本信息
return {
"cik": company['cik'],
"name": company['name'],
"tickers": [company['ticker']] if company['ticker'] else []
}
else:
return {"error": "未找到匹配的公司"}
def get_company_filings_list(self, cik, form_types=['10-K', '10-Q']):
"""
获取公司财报列表
Args:
cik (str): 公司CIK
form_types (list): 财报类型列表
Returns:
list: 财报列表
"""
filings = self.edgar_client.get_company_filings(cik, form_types)
return filings
def extract_financial_metrics(self, cik, years=3):
"""
提取指定年数的财务指标
Args:
cik (str): 公司CIK
years (int): 要提取的年数,默认为3年
Returns:
list: 财务数据列表
"""
# 直接从company facts中获取所有可用的财年数据
# 这样可以避免filing date和fiscal year不匹配的问题
financial_data = []
# 获取company facts以确定可用的财年
facts = self.edgar_client.get_company_facts(cik)
if not facts:
return []
# 从facts中提取所有可用的财年
available_years = set()
# 检查US-GAAP和IFRS数据源
for data_source in ["us-gaap", "ifrs-full"]:
if data_source in facts.get("facts", {}):
source_data = facts["facts"][data_source]
# 查找Revenue标签以确定可用年份
revenue_tags = ["Revenues", "RevenueFromContractWithCustomerExcludingAssessedTax",
"Revenue", "RevenueFromContractWithCustomer"]
for tag in revenue_tags:
if tag in source_data:
units = source_data[tag].get("units", {})
if "USD" in units:
for entry in units["USD"]:
# 只考虑年度报告(10-K或20-F)
if entry.get("form") in ["10-K", "20-F"]:
# 优先使用fy字段(财政年度)
fy = entry.get("fy", 0)
if fy > 0:
available_years.add(fy)
# 如果没有fy字段,从end date提取年份作为备选
# 注意:对于财年不等于日历年的公司,这可能不准确
elif not fy:
end_date = entry.get("end", "")
if end_date and len(end_date) >= 4:
year = int(end_date[:4])
available_years.add(year)
break
if available_years:
break
if not available_years:
# 如果无法从facts获取,回退到使用filing date
filings_10k = self.edgar_client.get_company_filings(cik, ['10-K'])
filings_20f = self.edgar_client.get_company_filings(cik, ['20-F'])
filings = filings_10k + filings_20f
if not filings:
return []
# 使用filing date作为参考
latest_filing_year = None
for filing in filings:
if 'filing_date' in filing and filing['filing_date']:
try:
filing_year = int(filing['filing_date'][:4])
if latest_filing_year is None or filing_year > latest_filing_year:
latest_filing_year = filing_year
except ValueError:
continue
if latest_filing_year is None:
return []
# 生成年份列表
for i in range(years * 2): # 扩大范围以捕获更多数据
available_years.add(latest_filing_year - i)
# 按年份降序排列
sorted_years = sorted(available_years, reverse=True)
# 生成期间列表(年度和季度)
periods = []
for year in sorted_years[:years]:
# 添加年度数据
periods.append(str(year))
# 添加季度数据,按Q4、Q3、Q2、Q1顺序
for quarter in range(4, 0, -1):
periods.append(f"{year}Q{quarter}")
# 获取每个期间的财务数据
for period in periods:
data = self.edgar_client.get_financial_data_for_period(cik, period)
# 即使没有完整数据也添加,避免N/A情况
if data: # 只要period字段存在就添加
financial_data.append(data)
return financial_data
def get_latest_financial_data(self, cik):
"""
获取最新财务数据
Args:
cik (str): 公司CIK
Returns:
dict: 最新财务数据
"""
# 获取最近的财报年份(支持10-K和20-F)
filings_10k = self.edgar_client.get_company_filings(cik, ['10-K'])
filings_20f = self.edgar_client.get_company_filings(cik, ['20-F'])
filings = filings_10k + filings_20f
if not filings:
return {}
# 获取最新的财报年份
latest_filing_year = None
for filing in filings:
if 'filing_date' in filing and filing['filing_date']:
try:
filing_year = int(filing['filing_date'][:4])
if latest_filing_year is None or filing_year > latest_filing_year:
latest_filing_year = filing_year
except ValueError:
continue
if latest_filing_year is None:
return {}
# 获取最新年份的财务数据
return self.edgar_client.get_financial_data_for_period(cik, str(latest_filing_year))
def format_financial_data(self, financial_data):
"""
格式化财务数据以便显示
Args:
financial_data (dict or list): 财务数据
Returns:
dict or list: 格式化后的财务数据
"""
if isinstance(financial_data, list):
formatted_data = []
for data in financial_data:
formatted_data.append(self._format_single_financial_data(data))
return formatted_data
else:
return self._format_single_financial_data(financial_data)
def _format_single_financial_data(self, data):
"""
格式化单个财务数据条目
Args:
data (dict): 财务数据
Returns:
dict: 格式化后的财务数据
"""
formatted = data.copy()
# 确保所有关键字段都存在,即使为None
key_fields = ['total_revenue', 'net_income', 'earnings_per_share', 'operating_expenses', 'operating_cash_flow', 'source_url', 'source_form']
for key in key_fields:
if key not in formatted:
formatted[key] = None
# 不再进行单位转换,保持原始数值
# 格式化EPS,保留两位小数
if 'earnings_per_share' in formatted and isinstance(formatted['earnings_per_share'], (int, float)):
formatted['earnings_per_share'] = round(formatted['earnings_per_share'], 2)
return formatted