| |
| import pandas as pd |
| import numpy as np |
| from typing import Dict, List, Any, Optional |
| import talib |
| import logging |
|
|
|
|
| class DataProcessor: |
| """ |
| 数据处理器,负责数据预处理、特征工程和技术指标计算 |
| """ |
|
|
| def __init__(self): |
| """ |
| 初始化数据处理器 |
| """ |
| self.logger = logging.getLogger(__name__) |
|
|
| def preprocess(self, data: pd.DataFrame) -> pd.DataFrame: |
| """ |
| 数据预处理 |
| |
| Args: |
| data: 原始数据 |
| |
| Returns: |
| 预处理后的数据 |
| """ |
| |
| df = data.copy() |
|
|
| |
| if df.isnull().any().any(): |
| self.logger.warning(f"数据中存在缺失值,进行处理") |
| |
| for col in ['open', 'high', 'low', 'close']: |
| if col in df.columns and df[col].isnull().any(): |
| df[col] = df[col].fillna(method='ffill') |
|
|
| |
| if 'volume' in df.columns and df['volume'].isnull().any(): |
| df['volume'] = df['volume'].fillna(0) |
|
|
| |
| for col in ['open', 'high', 'low', 'close']: |
| if col in df.columns: |
| |
| mean = df[col].mean() |
| std = df[col].std() |
| lower_bound = mean - 3 * std |
| upper_bound = mean + 3 * std |
|
|
| |
| outliers = (df[col] < lower_bound) | (df[col] > upper_bound) |
| if outliers.any(): |
| self.logger.warning(f"列 {col} 中存在异常值,进行处理") |
| |
| df.loc[outliers, col] = df[col].median() |
|
|
| return df |
|
|
| def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame: |
| """ |
| 计算技术指标 |
| |
| Args: |
| data: 预处理后的数据 |
| |
| Returns: |
| 添加了技术指标的数据 |
| """ |
| df = data.copy() |
|
|
| |
| if 'date' in df.columns and 'stock_code' in df.columns: |
| df = df.sort_values(['stock_code', 'date']) |
|
|
| |
| if 'close' in df.columns: |
| |
| for period in [5, 10, 20, 30, 60]: |
| df[f'ma_{period}'] = df.groupby('stock_code')['close'].transform( |
| lambda x: x.rolling(window=period).mean() |
| ) |
|
|
| |
| if 'close' in df.columns: |
| |
| for stock_code, group in df.groupby('stock_code'): |
| |
| macd, macd_signal, macd_hist = talib.MACD( |
| group['close'].values, |
| fastperiod=12, |
| slowperiod=26, |
| signalperiod=9 |
| ) |
|
|
| |
| df.loc[group.index, 'macd'] = macd |
| df.loc[group.index, 'macd_signal'] = macd_signal |
| df.loc[group.index, 'macd_hist'] = macd_hist |
|
|
| |
| if 'close' in df.columns: |
| for period in [6, 14, 21]: |
| for stock_code, group in df.groupby('stock_code'): |
| rsi = talib.RSI(group['close'].values, timeperiod=period) |
| df.loc[group.index, f'rsi_{period}'] = rsi |
|
|
| |
| if 'close' in df.columns: |
| for stock_code, group in df.groupby('stock_code'): |
| upper, middle, lower = talib.BBANDS( |
| group['close'].values, |
| timeperiod=20, |
| nbdevup=2, |
| nbdevdn=2, |
| matype=0 |
| ) |
| df.loc[group.index, 'bb_upper'] = upper |
| df.loc[group.index, 'bb_middle'] = middle |
| df.loc[group.index, 'bb_lower'] = lower |
|
|
| |
| if all(col in df.columns for col in ['high', 'low', 'close']): |
| for stock_code, group in df.groupby('stock_code'): |
| k, d = talib.STOCH( |
| group['high'].values, |
| group['low'].values, |
| group['close'].values, |
| fastk_period=9, |
| slowk_period=3, |
| slowk_matype=0, |
| slowd_period=3, |
| slowd_matype=0 |
| ) |
| df.loc[group.index, 'kdj_k'] = k |
| df.loc[group.index, 'kdj_d'] = d |
| df.loc[group.index, 'kdj_j'] = 3 * k - 2 * d |
|
|
| return df |
|
|
| def calculate_fundamental_indicators( |
| self, |
| price_data: pd.DataFrame, |
| financial_data: pd.DataFrame) -> pd.DataFrame: |
| """ |
| 计算基本面指标 |
| |
| Args: |
| price_data: 价格数据 |
| financial_data: 财务数据 |
| |
| Returns: |
| 添加了基本面指标的数据 |
| """ |
| |
| price_df = price_data.copy().sort_values(['stock_code', 'date']) |
| financial_df = financial_data.copy() |
|
|
| |
| latest_prices = price_df.groupby('stock_code')['close'].last().reset_index() |
| latest_prices.rename(columns={'close': 'latest_price'}, inplace=True) |
|
|
| |
| result = pd.merge(financial_df, latest_prices, on='stock_code', how='left') |
|
|
| |
| if 'total_shares' in financial_df.columns and 'latest_price' in result.columns: |
| result['market_cap'] = result['total_shares'] * result['latest_price'] |
|
|
| |
| if 'net_profit' in financial_df.columns and 'market_cap' in result.columns: |
| result['pe_ratio'] = result['market_cap'] / result['net_profit'] |
|
|
| |
| if 'total_equity' in financial_df.columns and 'market_cap' in result.columns: |
| result['pb_ratio'] = result['market_cap'] / result['total_equity'] |
|
|
| |
| if 'dividend' in financial_df.columns and 'latest_price' in result.columns: |
| result['dividend_yield'] = result['dividend'] / result['latest_price'] * 100 |
|
|
| |
| if 'net_profit' in financial_df.columns and 'total_equity' in financial_df.columns: |
| result['roe'] = result['net_profit'] / result['total_equity'] * 100 |
|
|
| |
| if 'net_profit' in financial_df.columns and 'total_assets' in financial_df.columns: |
| result['roa'] = result['net_profit'] / result['total_assets'] * 100 |
|
|
| return result |
|
|
| def calculate_risk_metrics(self, returns: pd.DataFrame) -> Dict[str, Any]: |
| """ |
| 计算风险指标 |
| |
| Args: |
| returns: 收益率数据 |
| |
| Returns: |
| 风险指标字典 |
| """ |
| |
| annual_return = returns.mean() * 252 |
|
|
| |
| annual_volatility = returns.std() * np.sqrt(252) |
|
|
| |
| risk_free_rate = 0.03 |
| sharpe_ratio = (annual_return - risk_free_rate) / \ |
| annual_volatility if annual_volatility != 0 else 0 |
|
|
| |
| cumulative_returns = (1 + returns).cumprod() |
| max_drawdown = (cumulative_returns / cumulative_returns.cummax() - 1).min() |
|
|
| |
| downside_returns = returns[returns < 0] |
| downside_deviation = downside_returns.std() * np.sqrt(252) |
| sortino_ratio = (annual_return - risk_free_rate) / \ |
| downside_deviation if downside_deviation != 0 else 0 |
|
|
| |
| |
| benchmark_return = 0.08 |
| information_ratio = (annual_return - benchmark_return) / \ |
| (returns - benchmark_return).std() * np.sqrt(252) |
|
|
| return { |
| "annual_return": annual_return, |
| "annual_volatility": annual_volatility, |
| "sharpe_ratio": sharpe_ratio, |
| "max_drawdown": max_drawdown, |
| "sortino_ratio": sortino_ratio, |
| "information_ratio": information_ratio |
| } |
|
|
| def prepare_strategy_data( |
| self, |
| stock_data: pd.DataFrame, |
| include_indicators: List[str] = None) -> pd.DataFrame: |
| """ |
| 准备策略所需的数据 |
| |
| Args: |
| stock_data: 股票数据 |
| include_indicators: 需要包含的指标列表 |
| |
| Returns: |
| 准备好的策略数据 |
| """ |
| |
| df = self.preprocess(stock_data) |
|
|
| |
| df = self.calculate_technical_indicators(df) |
|
|
| |
| if include_indicators: |
| |
| essential_cols = [ |
| 'date', |
| 'stock_code', |
| 'open', |
| 'high', |
| 'low', |
| 'close', |
| 'volume'] |
| cols_to_keep = essential_cols + \ |
| [col for col in include_indicators if col in df.columns] |
| df = df[cols_to_keep] |
|
|
| return df |
|
|