QSSS_new / backend /data /data_processor.py
misonL's picture
特性:实现对话框、表单、输入框、标签、下拉菜单、分隔线、骨架屏、滑块、表格、文本区域的 UI 组件
c78ce9e
Raw
History Blame Contribute Delete
9.61 kB
# 数据处理器
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional
import talib
import logging
class DataProcessor:
"""
数据处理器,负责数据预处理、特征工程和技术指标计算
"""
def __init__(self):
"""
初始化数据处理器
"""
self.logger = logging.getLogger(__name__)
def preprocess(self, data: pd.DataFrame) -> pd.DataFrame:
"""
数据预处理
Args:
data: 原始数据
Returns:
预处理后的数据
"""
# 复制数据,避免修改原始数据
df = data.copy()
# 检查并处理缺失值
if df.isnull().any().any():
self.logger.warning(f"数据中存在缺失值,进行处理")
# 对于价格数据,使用前向填充
for col in ['open', 'high', 'low', 'close']:
if col in df.columns and df[col].isnull().any():
df[col] = df[col].fillna(method='ffill')
# 对于成交量,使用0填充
if 'volume' in df.columns and df['volume'].isnull().any():
df['volume'] = df['volume'].fillna(0)
# 检查并处理异常值
for col in ['open', 'high', 'low', 'close']:
if col in df.columns:
# 计算3倍标准差范围
mean = df[col].mean()
std = df[col].std()
lower_bound = mean - 3 * std
upper_bound = mean + 3 * std
# 标记异常值
outliers = (df[col] < lower_bound) | (df[col] > upper_bound)
if outliers.any():
self.logger.warning(f"列 {col} 中存在异常值,进行处理")
# 使用中位数替换异常值
df.loc[outliers, col] = df[col].median()
return df
def calculate_technical_indicators(self, data: pd.DataFrame) -> pd.DataFrame:
"""
计算技术指标
Args:
data: 预处理后的数据
Returns:
添加了技术指标的数据
"""
df = data.copy()
# 确保数据按日期和股票代码排序
if 'date' in df.columns and 'stock_code' in df.columns:
df = df.sort_values(['stock_code', 'date'])
# 计算移动平均线
if 'close' in df.columns:
# 计算不同周期的移动平均线
for period in [5, 10, 20, 30, 60]:
df[f'ma_{period}'] = df.groupby('stock_code')['close'].transform(
lambda x: x.rolling(window=period).mean()
)
# 计算MACD
if 'close' in df.columns:
# 对每只股票分别计算
for stock_code, group in df.groupby('stock_code'):
# 计算MACD
macd, macd_signal, macd_hist = talib.MACD(
group['close'].values,
fastperiod=12,
slowperiod=26,
signalperiod=9
)
# 将结果添加到DataFrame
df.loc[group.index, 'macd'] = macd
df.loc[group.index, 'macd_signal'] = macd_signal
df.loc[group.index, 'macd_hist'] = macd_hist
# 计算RSI
if 'close' in df.columns:
for period in [6, 14, 21]:
for stock_code, group in df.groupby('stock_code'):
rsi = talib.RSI(group['close'].values, timeperiod=period)
df.loc[group.index, f'rsi_{period}'] = rsi
# 计算布林带
if 'close' in df.columns:
for stock_code, group in df.groupby('stock_code'):
upper, middle, lower = talib.BBANDS(
group['close'].values,
timeperiod=20,
nbdevup=2,
nbdevdn=2,
matype=0
)
df.loc[group.index, 'bb_upper'] = upper
df.loc[group.index, 'bb_middle'] = middle
df.loc[group.index, 'bb_lower'] = lower
# 计算KDJ
if all(col in df.columns for col in ['high', 'low', 'close']):
for stock_code, group in df.groupby('stock_code'):
k, d = talib.STOCH(
group['high'].values,
group['low'].values,
group['close'].values,
fastk_period=9,
slowk_period=3,
slowk_matype=0,
slowd_period=3,
slowd_matype=0
)
df.loc[group.index, 'kdj_k'] = k
df.loc[group.index, 'kdj_d'] = d
df.loc[group.index, 'kdj_j'] = 3 * k - 2 * d
return df
def calculate_fundamental_indicators(
self,
price_data: pd.DataFrame,
financial_data: pd.DataFrame) -> pd.DataFrame:
"""
计算基本面指标
Args:
price_data: 价格数据
financial_data: 财务数据
Returns:
添加了基本面指标的数据
"""
# 确保数据按日期和股票代码排序
price_df = price_data.copy().sort_values(['stock_code', 'date'])
financial_df = financial_data.copy()
# 获取最新的收盘价
latest_prices = price_df.groupby('stock_code')['close'].last().reset_index()
latest_prices.rename(columns={'close': 'latest_price'}, inplace=True)
# 合并财务数据和价格数据
result = pd.merge(financial_df, latest_prices, on='stock_code', how='left')
# 计算市值
if 'total_shares' in financial_df.columns and 'latest_price' in result.columns:
result['market_cap'] = result['total_shares'] * result['latest_price']
# 计算市盈率 (PE)
if 'net_profit' in financial_df.columns and 'market_cap' in result.columns:
result['pe_ratio'] = result['market_cap'] / result['net_profit']
# 计算市净率 (PB)
if 'total_equity' in financial_df.columns and 'market_cap' in result.columns:
result['pb_ratio'] = result['market_cap'] / result['total_equity']
# 计算股息率
if 'dividend' in financial_df.columns and 'latest_price' in result.columns:
result['dividend_yield'] = result['dividend'] / result['latest_price'] * 100
# 计算净资产收益率 (ROE)
if 'net_profit' in financial_df.columns and 'total_equity' in financial_df.columns:
result['roe'] = result['net_profit'] / result['total_equity'] * 100
# 计算总资产收益率 (ROA)
if 'net_profit' in financial_df.columns and 'total_assets' in financial_df.columns:
result['roa'] = result['net_profit'] / result['total_assets'] * 100
return result
def calculate_risk_metrics(self, returns: pd.DataFrame) -> Dict[str, Any]:
"""
计算风险指标
Args:
returns: 收益率数据
Returns:
风险指标字典
"""
# 计算年化收益率
annual_return = returns.mean() * 252
# 计算年化波动率
annual_volatility = returns.std() * np.sqrt(252)
# 计算夏普比率
risk_free_rate = 0.03 # 假设无风险利率为3%
sharpe_ratio = (annual_return - risk_free_rate) / \
annual_volatility if annual_volatility != 0 else 0
# 计算最大回撤
cumulative_returns = (1 + returns).cumprod()
max_drawdown = (cumulative_returns / cumulative_returns.cummax() - 1).min()
# 计算索提诺比率
downside_returns = returns[returns < 0]
downside_deviation = downside_returns.std() * np.sqrt(252)
sortino_ratio = (annual_return - risk_free_rate) / \
downside_deviation if downside_deviation != 0 else 0
# 计算信息比率
# 假设基准收益率为0.08
benchmark_return = 0.08
information_ratio = (annual_return - benchmark_return) / \
(returns - benchmark_return).std() * np.sqrt(252)
return {
"annual_return": annual_return,
"annual_volatility": annual_volatility,
"sharpe_ratio": sharpe_ratio,
"max_drawdown": max_drawdown,
"sortino_ratio": sortino_ratio,
"information_ratio": information_ratio
}
def prepare_strategy_data(
self,
stock_data: pd.DataFrame,
include_indicators: List[str] = None) -> pd.DataFrame:
"""
准备策略所需的数据
Args:
stock_data: 股票数据
include_indicators: 需要包含的指标列表
Returns:
准备好的策略数据
"""
# 预处理数据
df = self.preprocess(stock_data)
# 计算技术指标
df = self.calculate_technical_indicators(df)
# 如果指定了需要包含的指标,则只保留这些指标
if include_indicators:
# 确保保留必要的列
essential_cols = [
'date',
'stock_code',
'open',
'high',
'low',
'close',
'volume']
cols_to_keep = essential_cols + \
[col for col in include_indicators if col in df.columns]
df = df[cols_to_keep]
return df