# 数据处理器 import pandas as pd import numpy as np from typing import Dict, List, Any, Optional import talib import logging class DataProcessor: """ 数据处理器,负责数据预处理、特征工程和技术指标计算 """ def __init__(self): """ 初始化数据处理器 """ self.logger = logging.getLogger(__name__) def preprocess(self, data: pd.DataFrame) -> pd.DataFrame: """ 数据预处理 Args: data: 原始数据 Returns: 预处理后的数据 """ # 复制数据,避免修改原始数据 df = data.copy() # 检查并处理缺失值 if df.isnull().any().any(): self.logger.warning(f"数据中存在缺失值,进行处理") # 对于价格数据,使用前向填充 for col in ['open', 'high', 'low', 'close']: if col in df.columns and df[col].isnull().any(): df[col] = df[col].fillna(method='ffill') # 对于成交量,使用0填充 if 'volume' in df.columns and df['volume'].isnull().any(): df['volume'] = df['volume'].fillna(0) # 检查并处理异常值 for col in ['open', 'high', 'low', 'close']: if col in df.columns: # 计算3倍标准差范围 mean = df[col].mean() std = df[col].std() lower_bound = mean - 3 * std upper_bound = mean + 3 * std # 标记异常值 outliers = (df[col] < lower_bound) | (df[col] > upper_bound) if outliers.any(): self.logger.warning(f"列 {col} 中存在异常值,进行处理") # 使用中位数替换异常值 df.loc[outliers, col] = df[col].median() return df def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame: """ 计算技术指标 Args: data: 预处理后的数据 Returns: 添加了技术指标的数据 """ df = data.copy() # 确保数据按日期和股票代码排序 if 'date' in df.columns and 'stock_code' in df.columns: df = df.sort_values(['stock_code', 'date']) # 计算移动平均线 if 'close' in df.columns: # 计算不同周期的移动平均线 for period in [5, 10, 20, 30, 60]: df[f'ma_{period}'] = df.groupby('stock_code')['close'].transform( lambda x: x.rolling(window=period).mean() ) # 计算MACD if 'close' in df.columns: # 对每只股票分别计算 for stock_code, group in df.groupby('stock_code'): # 计算MACD macd, macd_signal, macd_hist = talib.MACD( group['close'].values, fastperiod=12, slowperiod=26, signalperiod=9 ) # 将结果添加到DataFrame df.loc[group.index, 'macd'] = macd df.loc[group.index, 'macd_signal'] = macd_signal df.loc[group.index, 'macd_hist'] = macd_hist # 计算RSI if 'close' in df.columns: for period in [6, 14, 21]: for stock_code, group in df.groupby('stock_code'): rsi = talib.RSI(group['close'].values, timeperiod=period) df.loc[group.index, f'rsi_{period}'] = rsi # 计算布林带 if 'close' in df.columns: for stock_code, group in df.groupby('stock_code'): upper, middle, lower = talib.BBANDS( group['close'].values, timeperiod=20, nbdevup=2, nbdevdn=2, matype=0 ) df.loc[group.index, 'bb_upper'] = upper df.loc[group.index, 'bb_middle'] = middle df.loc[group.index, 'bb_lower'] = lower # 计算KDJ if all(col in df.columns for col in ['high', 'low', 'close']): for stock_code, group in df.groupby('stock_code'): k, d = talib.STOCH( group['high'].values, group['low'].values, group['close'].values, fastk_period=9, slowk_period=3, slowk_matype=0, slowd_period=3, slowd_matype=0 ) df.loc[group.index, 'kdj_k'] = k df.loc[group.index, 'kdj_d'] = d df.loc[group.index, 'kdj_j'] = 3 * k - 2 * d return df def calculate_fundamental_indicators( self, price_data: pd.DataFrame, financial_data: pd.DataFrame) -> pd.DataFrame: """ 计算基本面指标 Args: price_data: 价格数据 financial_data: 财务数据 Returns: 添加了基本面指标的数据 """ # 确保数据按日期和股票代码排序 price_df = price_data.copy().sort_values(['stock_code', 'date']) financial_df = financial_data.copy() # 获取最新的收盘价 latest_prices = price_df.groupby('stock_code')['close'].last().reset_index() latest_prices.rename(columns={'close': 'latest_price'}, inplace=True) # 合并财务数据和价格数据 result = pd.merge(financial_df, latest_prices, on='stock_code', how='left') # 计算市值 if 'total_shares' in financial_df.columns and 'latest_price' in result.columns: result['market_cap'] = result['total_shares'] * result['latest_price'] # 计算市盈率 (PE) if 'net_profit' in financial_df.columns and 'market_cap' in result.columns: result['pe_ratio'] = result['market_cap'] / result['net_profit'] # 计算市净率 (PB) if 'total_equity' in financial_df.columns and 'market_cap' in result.columns: result['pb_ratio'] = result['market_cap'] / result['total_equity'] # 计算股息率 if 'dividend' in financial_df.columns and 'latest_price' in result.columns: result['dividend_yield'] = result['dividend'] / result['latest_price'] * 100 # 计算净资产收益率 (ROE) if 'net_profit' in financial_df.columns and 'total_equity' in financial_df.columns: result['roe'] = result['net_profit'] / result['total_equity'] * 100 # 计算总资产收益率 (ROA) if 'net_profit' in financial_df.columns and 'total_assets' in financial_df.columns: result['roa'] = result['net_profit'] / result['total_assets'] * 100 return result def calculate_risk_metrics(self, returns: pd.DataFrame) -> Dict[str, Any]: """ 计算风险指标 Args: returns: 收益率数据 Returns: 风险指标字典 """ # 计算年化收益率 annual_return = returns.mean() * 252 # 计算年化波动率 annual_volatility = returns.std() * np.sqrt(252) # 计算夏普比率 risk_free_rate = 0.03 # 假设无风险利率为3% sharpe_ratio = (annual_return - risk_free_rate) / \ annual_volatility if annual_volatility != 0 else 0 # 计算最大回撤 cumulative_returns = (1 + returns).cumprod() max_drawdown = (cumulative_returns / cumulative_returns.cummax() - 1).min() # 计算索提诺比率 downside_returns = returns[returns < 0] downside_deviation = downside_returns.std() * np.sqrt(252) sortino_ratio = (annual_return - risk_free_rate) / \ downside_deviation if downside_deviation != 0 else 0 # 计算信息比率 # 假设基准收益率为0.08 benchmark_return = 0.08 information_ratio = (annual_return - benchmark_return) / \ (returns - benchmark_return).std() * np.sqrt(252) return { "annual_return": annual_return, "annual_volatility": annual_volatility, "sharpe_ratio": sharpe_ratio, "max_drawdown": max_drawdown, "sortino_ratio": sortino_ratio, "information_ratio": information_ratio } def prepare_strategy_data( self, stock_data: pd.DataFrame, include_indicators: List[str] = None) -> pd.DataFrame: """ 准备策略所需的数据 Args: stock_data: 股票数据 include_indicators: 需要包含的指标列表 Returns: 准备好的策略数据 """ # 预处理数据 df = self.preprocess(stock_data) # 计算技术指标 df = self.calculate_technical_indicators(df) # 如果指定了需要包含的指标,则只保留这些指标 if include_indicators: # 确保保留必要的列 essential_cols = [ 'date', 'stock_code', 'open', 'high', 'low', 'close', 'volume'] cols_to_keep = essential_cols + \ [col for col in include_indicators if col in df.columns] df = df[cols_to_keep] return df