stockany2 / app /analysis /etf_analyzer.py
fiewolf1000's picture
Upload 67 files
0f10134 verified
import akshare as ak
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
from stockstats import StockDataFrame
class EtfAnalyzer:
def __init__(self, etf_code, stock_analyzer_instance):
self.etf_code = etf_code
self.analysis_result = {}
self.hist_df = None # 用于存储历史数据以供后续分析使用
self.stock_analyzer = stock_analyzer_instance # 复用StockAnalyzer实例
def run_analysis(self):
"""
运行所有分析步骤并返回结果
"""
self.get_basic_info()
self.analyze_market_performance()
self.analyze_fund_flow()
self.analyze_risk_and_tracking()
self.analyze_holdings()
self.analyze_sector()
self.get_ai_summary()
return self.analysis_result
def get_basic_info(self):
"""
1. 基本信息分析
"""
print("开始获取基本信息...")
try:
# 使用akshare获取ETF基金概况
fund_info_df = ak.fund_etf_fund_info_em(fund=self.etf_code)
if fund_info_df.empty:
info_dict = {"error": "未能获取到该ETF的基本信息,请检查代码是否正确。"}
else:
# 将返回的DataFrame转换成字典,假设第一列是键,第二列是值
info_dict = {}
for _, row in fund_info_df.iterrows():
if len(row) >= 2:
info_dict[row.iloc[0]] = row.iloc[1]
self.analysis_result['basic_info'] = info_dict
print("基本信息获取完成。")
except Exception as e:
print(f"获取ETF基本信息时出错: {e}")
self.analysis_result['basic_info'] = {"error": f"获取基本信息失败: {e}"}
def analyze_market_performance(self):
"""
2. 市场表现与技术分析 (第一部分:回报率与流动性)
"""
print("开始分析市场表现...")
try:
# 获取近一年的历史数据
end_date = datetime.now()
start_date = end_date - timedelta(days=365)
end_date_str = end_date.strftime('%Y%m%d')
start_date_str = start_date.strftime('%Y%m%d')
# 使用后复权数据
hist_df = ak.fund_etf_hist_em(symbol=self.etf_code, start_date=start_date_str, end_date=end_date_str, adjust="hfq")
if hist_df.empty:
self.analysis_result['market_performance'] = {"error": "未能获取到该ETF的历史行情数据。"}
print("未能获取历史行情数据。")
return
# --- 数据准备 ---
hist_df['日期'] = pd.to_datetime(hist_df['日期'])
hist_df.set_index('日期', inplace=True)
hist_df['收盘'] = pd.to_numeric(hist_df['收盘'])
hist_df['成交额'] = pd.to_numeric(hist_df['成交额'], errors='coerce').fillna(0)
hist_df['换手率'] = pd.to_numeric(hist_df['换手率'], errors='coerce').fillna(0)
# 存储历史数据以供其他方法使用
self.hist_df = hist_df
# --- 回报率计算 ---
returns = {}
if not hist_df.empty:
latest_price = hist_df['收盘'].iloc[-1]
periods = {
'近1周': 5,
'近1个月': 21,
'近3个月': 63,
'近1年': 252
}
for name, days in periods.items():
if len(hist_df) > days:
old_price = hist_df['收盘'].iloc[-days-1]
returns[name] = ((latest_price / old_price) - 1) * 100 if old_price != 0 else 0
else:
# 如果数据不足,则从第一天开始算
old_price = hist_df['收盘'].iloc[0]
returns[name] = ((latest_price / old_price) - 1) * 100 if old_price != 0 else 0
# 计算年初至今回报率
ytd_df = hist_df[hist_df.index.year == end_date.year]
if not ytd_df.empty:
ytd_start_price = ytd_df['收盘'].iloc[0]
returns['年初至今'] = ((latest_price / ytd_start_price) - 1) * 100 if ytd_start_price != 0 else 0
else:
# 如果当年没有数据,则计算从开始到现在的总回报
start_price = hist_df['收盘'].iloc[0]
returns['年初至今'] = ((latest_price / start_price) - 1) * 100 if start_price != 0 else 0
# --- 流动性分析 ---
liquidity = {}
last_month_df = hist_df.tail(21)
if not last_month_df.empty:
liquidity['日均成交额(近一月)'] = last_month_df['成交额'].mean()
liquidity['日均换手率(近一月)'] = last_month_df['换手率'].mean()
else:
liquidity['日均成交额(近一月)'] = None
liquidity['日均换手率(近一月)'] = None
# --- 技术指标计算 ---
tech_indicators = {}
if self.hist_df is not None and not self.hist_df.empty:
# stockstats需要特定的列名,创建一个副本进行操作
stock_df_for_ta = self.hist_df.copy()
stock_df_for_ta.rename(columns={'收盘': 'close', '开盘': 'open', '最高': 'high', '最低': 'low', '成交量': 'volume'}, inplace=True)
# 转换为StockDataFrame
sdf = StockDataFrame.retype(stock_df_for_ta)
# 计算指标
sdf[['macd', 'macds', 'macdh']] # MACD
sdf['rsi_14'] # RSI
sdf['close_20_sma'] # 20日均线
sdf['close_60_sma'] # 60日均线
# 获取最新的指标值
latest_indicators = sdf.iloc[-1]
tech_indicators = {
'MA20': latest_indicators.get('close_20_sma'),
'MA60': latest_indicators.get('close_60_sma'),
'MACD': latest_indicators.get('macd'),
'MACD_Signal': latest_indicators.get('macds'),
'MACD_Hist': latest_indicators.get('macdh'),
'RSI_14': latest_indicators.get('rsi_14')
}
# 注意:不再覆盖 self.hist_df
self.analysis_result['market_performance'] = {
"returns": returns,
"liquidity": liquidity,
"tech_indicators": tech_indicators,
"message": "回报率、流动性和技术指标分析完成。"
}
print("市场表现分析(回报率、流动性、技术指标)完成。")
# --- 与基准对比 ---
benchmark_code = 'sh000300' # 默认使用沪深300作为基准
print(f"开始与基准 {benchmark_code} 进行对比...")
benchmark_df = ak.stock_zh_index_daily(symbol=benchmark_code)
benchmark_df['date'] = pd.to_datetime(benchmark_df['date'])
benchmark_df.set_index('date', inplace=True)
# 截取与ETF相同的时间段
benchmark_df = benchmark_df.loc[self.hist_df.index[0]:self.hist_df.index[-1]]
benchmark_returns = {}
alpha = {}
if not benchmark_df.empty:
benchmark_latest_price = benchmark_df['close'].iloc[-1]
for name, days in periods.items():
if len(benchmark_df) > days:
old_price = benchmark_df['close'].iloc[-days-1]
benchmark_returns[name] = ((benchmark_latest_price / old_price) - 1) * 100 if old_price != 0 else 0
else:
old_price = benchmark_df['close'].iloc[0]
benchmark_returns[name] = ((benchmark_latest_price / old_price) - 1) * 100 if old_price != 0 else 0
# 计算超额收益
alpha[name] = returns.get(name, 0) - benchmark_returns.get(name, 0)
# 年初至今
benchmark_ytd_df = benchmark_df[benchmark_df.index.year == end_date.year]
if not benchmark_ytd_df.empty:
ytd_start_price = benchmark_ytd_df['close'].iloc[0]
benchmark_returns['年初至今'] = ((benchmark_latest_price / ytd_start_price) - 1) * 100 if ytd_start_price != 0 else 0
else:
start_price = benchmark_df['close'].iloc[0]
benchmark_returns['年初至今'] = ((benchmark_latest_price / start_price) - 1) * 100 if start_price != 0 else 0
alpha['年初至今'] = returns.get('年初至今', 0) - benchmark_returns.get('年初至今', 0)
# 更新结果
self.analysis_result['market_performance']['benchmark_returns'] = benchmark_returns
self.analysis_result['market_performance']['alpha'] = alpha
self.analysis_result['market_performance']['message'] = "完整的市场表现分析已完成。"
print("与基准对比分析完成。")
except Exception as e:
print(f"分析市场表现时出错: {e}")
self.analysis_result['market_performance'] = {"error": f"分析市场表现失败: {e}"}
def analyze_fund_flow(self):
"""
3. 资金流向分析
"""
print("开始分析资金流向...")
try:
# 复用已获取的历史数据
if self.hist_df is None or self.hist_df.empty:
self.analysis_result['fund_flow'] = {"error": "历史数据缺失,无法进行资金流向分析。"}
print("历史数据缺失,跳过资金流向分析。")
return
df = self.hist_df.copy()
# akshare返回的hist_df中没有直接的份额列,需要重新请求或者寻找其他接口
# 这里我们假设'成交量'可以作为份额变化的代理指标进行估算,这是一个简化处理
# 更好的方法是找到直接提供份额历史的接口
# 计算份额变化 (此处使用成交量作为代理)
# 注意:这是一个估算,并非精确值。真实份额变化需专门接口。
df['份额变化'] = df['成交量'].diff().fillna(0)
# 估算资金净流入/流出 = 份额变化 * 当日收盘价
# 正值表示流入,负值表示流出
df['资金净流入估算'] = df['份额变化'] * df['收盘']
# --- 汇总统计 ---
flow_summary = {}
periods = {
'近1周': 5,
'近1个月': 21,
'近3个月': 63
}
for name, days in periods.items():
if len(df) >= days:
flow_summary[name] = df['资金净流入估算'].tail(days).sum()
else:
flow_summary[name] = df['资金净流入估算'].sum()
# 获取最近的资金流数据以供图表使用 (例如最近60天)
recent_flow_data = df.tail(60)[['资金净流入估算']].copy()
recent_flow_data.index = recent_flow_data.index.strftime('%Y-%m-%d')
recent_flow_data['资金净流入估算'] = (recent_flow_data['资金净流入估算'] / 1e8).round(4) # 转换为亿元
# 准备图表数据,格式为 [ [date_string, value], ... ]
chart_data_df = recent_flow_data.reset_index()
chart_data_list = chart_data_df.values.tolist()
self.analysis_result['fund_flow'] = {
"summary": flow_summary,
"daily_flow_chart_data": {"data": chart_data_list},
"message": "资金流向分析完成 (基于成交量估算)。"
}
print("资金流向分析完成。")
except Exception as e:
print(f"分析资金流向时出错: {e}")
self.analysis_result['fund_flow'] = {"error": f"分析资金流向失败: {e}"}
def analyze_risk_and_tracking(self):
"""
4. 风险与跟踪能力分析
"""
print("开始分析风险与跟踪能力...")
try:
if self.hist_df is None or self.hist_df.empty:
self.analysis_result['risk_and_tracking'] = {"error": "历史数据缺失,无法进行风险分析。"}
print("历史数据缺失,跳过风险分析。")
return
df = self.hist_df.copy()
# 确保返回的是pct_change,而不是整个Series
df['etf_return'] = df['收盘'].pct_change().fillna(0)
# 1. 波动率 (年化)
annualized_volatility = df['etf_return'].std() * np.sqrt(252)
# 2. 与基准比较 (Beta, 跟踪误差, 夏普比率)
benchmark_code = 'sh000300'
benchmark_df = ak.stock_zh_index_daily(symbol=benchmark_code)
benchmark_df['date'] = pd.to_datetime(benchmark_df['date'])
benchmark_df.set_index('date', inplace=True)
benchmark_df = benchmark_df.loc[df.index.min():df.index.max()]
benchmark_df['benchmark_return'] = benchmark_df['close'].pct_change().fillna(0)
# 合并数据
merged_df = pd.merge(df[['etf_return']], benchmark_df[['benchmark_return']], left_index=True, right_index=True, how='inner')
# 计算 Beta
covariance = merged_df['etf_return'].cov(merged_df['benchmark_return'])
variance = merged_df['benchmark_return'].var()
beta = covariance / variance if variance != 0 else None
# 计算 跟踪误差 (年化)
merged_df['difference'] = merged_df['etf_return'] - merged_df['benchmark_return']
tracking_error = merged_df['difference'].std() * np.sqrt(252)
# 计算 夏普比率 (年化)
risk_free_rate_daily = (1.02 ** (1/252)) - 1 # 假设年化无风险利率为2%
avg_daily_return = merged_df['etf_return'].mean()
std_daily_return = merged_df['etf_return'].std()
sharpe_ratio = ((avg_daily_return - risk_free_rate_daily) * 252) / (std_daily_return * np.sqrt(252)) if std_daily_return != 0 else None
# 3. 溢价/折价率 (近一个月平均)
avg_premium_discount = None
df_for_premium = self.hist_df.copy()
if '单位净值' in df_for_premium.columns and not df_for_premium['单位净值'].isnull().all():
df_for_premium['单位净值'] = pd.to_numeric(df_for_premium['单位净值'], errors='coerce')
df_for_premium.dropna(subset=['单位净值'], inplace=True)
df_for_premium = df_for_premium[df_for_premium['单位净值'] != 0]
if not df_for_premium.empty:
df_for_premium['premium_discount'] = ((df_for_premium['收盘'] / df_for_premium['单位净值']) - 1) * 100
avg_premium_discount = df_for_premium.tail(21)['premium_discount'].mean()
risk_metrics = {
"annualized_volatility": annualized_volatility,
"beta": beta,
"tracking_error": tracking_error,
"sharpe_ratio": sharpe_ratio,
"avg_premium_discount_monthly": avg_premium_discount
}
self.analysis_result['risk_and_tracking'] = risk_metrics
print("风险与跟踪能力分析完成。")
except Exception as e:
print(f"分析风险与跟踪能力时出错: {e}")
self.analysis_result['risk_and_tracking'] = {"error": f"分析风险与跟踪能力失败: {e}"}
def analyze_holdings(self):
"""
5. 持仓分析
"""
print("开始分析持仓...")
try:
# 获取ETF持仓明细
holdings_df = ak.fund_portfolio_hold_em(symbol=self.etf_code, date=datetime.now().strftime("%Y"))
if holdings_df.empty or '股票代码' not in holdings_df.columns:
self.analysis_result['holdings'] = {"error": "未能获取到该ETF的持仓数据。"}
print("未能获取持仓数据。")
return
# 提取前十大持仓
top_10_holdings = holdings_df.head(10)[['股票代码', '股票名称', '持仓市值', '占净值比例']].copy()
top_10_holdings.rename(columns={'占净值比例': '占净值比例(%)'}, inplace=True)
top_10_holdings['占净值比例(%)'] = pd.to_numeric(top_10_holdings['占净值比例(%)'], errors='coerce')
# 计算前十大持仓集中度
concentration = top_10_holdings['占净值比例(%)'].sum()
holdings_data = {
"top_10_holdings": top_10_holdings.to_dict('records'),
"concentration": concentration
}
self.analysis_result['holdings'] = holdings_data
print("持仓分析完成。")
except Exception as e:
print(f"分析持仓时出错: {e}")
self.analysis_result['holdings'] = {"error": f"分析持仓失败: {e}"}
def analyze_sector(self):
"""
6. 板块深度分析
"""
print("开始进行板块深度分析...")
try:
# 1. 识别板块/行业
basic_info = self.analysis_result.get('basic_info', {})
tracking_index = basic_info.get('跟踪标的', '')
if not tracking_index or '指数' not in tracking_index:
self.analysis_result['sector_analysis'] = {"error": "无法从基本信息中确定ETF跟踪的板块或行业指数。"}
print("无法识别板块,跳过板块分析。")
return
# 简化处理:假设跟踪标的名称与akshare中的板块名称直接对应
# 例如 "中证白酒指数" -> 我们需要找到对应的板块名称 "白酒"
# 这是一个复杂的映射,这里我们先做一个简化假设,后续可以优化
sector_name = tracking_index.replace('指数', '').replace('中证', '').replace('国证', '')
# 尝试获取行业板块数据,如果失败,则可能是概念板块
try:
sector_df = ak.stock_board_industry_hist_em(symbol=sector_name)
except Exception:
try:
sector_df = ak.stock_board_concept_hist_em(symbol=sector_name)
except Exception as e:
self.analysis_result['sector_analysis'] = {"error": f"无法获取板块 '{sector_name}' 的行情数据: {e}"}
return
sector_df['日期'] = pd.to_datetime(sector_df['日期'])
sector_df.set_index('日期', inplace=True)
# 2. 板块回报率 (景气度)
sector_returns = {}
latest_price = sector_df['收盘'].iloc[-1]
periods = {'近1周': 5, '近1个月': 21, '近3个月': 63, '近1年': 252}
for name, days in periods.items():
if len(sector_df) > days:
old_price = sector_df['收盘'].iloc[-days-1]
sector_returns[name] = ((latest_price / old_price) - 1) * 100 if old_price != 0 else 0
# 3. 板块估值 (PE百分位)
pe_df = ak.stock_board_industry_pe_em(symbol=sector_name)
latest_pe = pe_df.iloc[-1]['滚动市盈率']
pe_percentile = (pe_df['滚动市盈率'] < latest_pe).mean() * 100 if not pe_df.empty else None
sector_data = {
"sector_name": sector_name,
"returns": sector_returns,
"valuation": {
"current_pe": latest_pe,
"pe_percentile": pe_percentile
}
}
self.analysis_result['sector_analysis'] = sector_data
print("板块深度分析完成。")
except Exception as e:
print(f"分析板块时出错: {e}")
self.analysis_result['sector_analysis'] = {"error": f"分析板块失败: {e}"}
def get_ai_summary(self):
"""
7. AI 综合诊断
"""
print("开始生成AI综合诊断...")
try:
# 1. 整合所有分析结果
prompt_data = f"请为ETF代码 {self.etf_code} 生成一份全面的投资分析报告。请严格根据以下数据进行分析,不要使用外部知识:\n\n"
# 基本信息
basic_info = self.analysis_result.get('basic_info', {})
prompt_data += f"**基本信息**: 跟踪指数: {basic_info.get('跟踪标的', 'N/A')}, 规模: {basic_info.get('基金规模', 'N/A')}, 管理人: {basic_info.get('基金管理人', 'N/A')}.\n"
# 市场表现
perf = self.analysis_result.get('market_performance', {})
returns = perf.get('returns', {})
alpha = perf.get('alpha', {})
prompt_data += f"**市场表现**: 近1个月回报率: {returns.get('近1个月', 0):.2f}%, 同期超额收益(相对沪深300): {alpha.get('近1个月', 0):.2f}%. 年初至今回报率: {returns.get('年初至今', 0):.2f}%, 同期超额收益: {alpha.get('年初至今', 0):.2f}%.\n"
# 风险
risk = self.analysis_result.get('risk_and_tracking', {})
prompt_data += f"**风险指标**: 年化波动率: {risk.get('annualized_volatility', 0):.2%}, Beta值: {risk.get('beta', 0):.2f}, 年化跟踪误差: {risk.get('tracking_error', 0):.2%}.\n"
# 持仓
holdings = self.analysis_result.get('holdings', {})
prompt_data += f"**持仓结构**: 前十大持仓集中度: {holdings.get('concentration', 0):.2f}%.\n"
# 板块
sector = self.analysis_result.get('sector_analysis', {})
sector_val = sector.get('valuation', {})
prompt_data += f"**板块分析**: 所属板块: {sector.get('sector_name', 'N/A')}, 当前滚动PE находится at {sector_val.get('pe_percentile', 0):.2f}% 历史分位点.\n\n"
# 2. 构建Prompt
prompt_data += "请根据以上数据,从以下三个方面进行分析,并给出一个最终总结:\n1. **核心优势**: 这只ETF最主要的投资亮点是什么?\n2. **潜在风险**: 投资者需要注意哪些潜在风险?\n3. **板块前景**: 结合板块估值和前景,对该ETF的赛道进行评价。\n4. **最终结论**: 给出一个不超过50字的简明投资结论。"
# 3. 调用AI模型
if self.stock_analyzer:
ai_summary = self.stock_analyzer.get_ai_analysis_from_prompt(prompt_data)
else:
ai_summary = "AI分析器未初始化,无法生成摘要。"
self.analysis_result['ai_summary'] = {"message": ai_summary}
print("AI综合诊断生成完成。")
except Exception as e:
print(f"生成AI摘要时出错: {e}")
self.analysis_result['ai_summary'] = {"error": f"生成AI摘要失败: {e}"}
if __name__ == '__main__':
# For testing purposes
test_etf = '510300' # 沪深300 ETF
analyzer = EtfAnalyzer(test_etf)
results = analyzer.run_analysis()
import json
print(json.dumps(results, indent=4, ensure_ascii=False))