stockany2 / app /analysis /stock_analyzer.py
fiewolf1000's picture
Upload 67 files
0f10134 verified
# -*- coding: utf-8 -*-
"""
智能分析系统(股票) - 股票市场数据分析系统
修改:熊猫大侠
版本:v2.1.0
许可证:MIT License
"""
# stock_analyzer.py
import time
import traceback
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import os
import requests
from typing import Dict, List, Optional, Tuple
from dotenv import load_dotenv
import logging
import math
import json
import threading
from urllib.parse import urlparse
from openai import OpenAI
# 线程局部存储
thread_local = threading.local()
class StockAnalyzer:
"""
股票分析器 - 原有API保持不变,内部实现增强
"""
def __init__(self, initial_cash=1000000):
# 设置日志
self.logger = logging.getLogger(__name__)
# 加载环境变量
load_dotenv()
# 设置 OpenAI API (原 Gemini API)
self.openai_api_key = os.getenv('OPENAI_API_KEY', os.getenv('OPENAI_API_KEY'))
self.openai_api_url = os.getenv('OPENAI_API_URL', 'https://api.openai.com/v1')
self.openai_model = os.getenv('OPENAI_API_MODEL', 'gemini-2.0-pro-exp-02-05')
self.function_call_model = os.getenv('FUNCTION_CALL_MODEL','gpt-4o')
self.news_model = os.getenv('NEWS_MODEL')
self.client = OpenAI(
api_key=self.openai_api_key,
base_url=self.openai_api_url
)
# 配置参数
self.params = {
'ma_periods': {'short': 5, 'medium': 20, 'long': 60},
'rsi_period': 14,
'bollinger_period': 20,
'bollinger_std': 2,
'volume_ma_period': 20,
'atr_period': 14
}
# 添加缓存初始化
self.data_cache = {}
# JSON匹配标志
self.json_match_flag = True
def get_stock_data(self, stock_code, market_type='A', start_date=None, end_date=None):
"""获取股票数据 - 增强版,具备更强的容错能力"""
import akshare as ak
self.logger.info(f"开始获取股票 {stock_code} 数据,市场类型: {market_type}")
cache_key = f"{stock_code}_{market_type}_{start_date}_{end_date}_price"
if cache_key in self.data_cache:
return self.data_cache[cache_key].copy()
if start_date is None:
start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d')
if end_date is None:
end_date = datetime.now().strftime('%Y%m%d')
try:
df = None
if market_type == 'A':
df = ak.stock_zh_a_hist(symbol=stock_code, start_date=start_date, end_date=end_date, adjust="qfq")
elif market_type == 'HK':
df = ak.stock_hk_daily(symbol=stock_code, adjust="qfq")
elif market_type == 'US':
df = ak.stock_us_hist(symbol=stock_code, start_date=start_date, end_date=end_date, adjust="qfq")
else:
raise ValueError(f"不支持的市场类型: {market_type}")
if df is None or df.empty:
raise ValueError("akshare返回了空的DataFrame")
# 1. 标准化列名
rename_map = {
"日期": "date", "开盘": "open", "收盘": "close", "最高": "high",
"最低": "low", "成交量": "volume", "成交额": "amount",
"trade_date": "date" # 兼容不同命名
}
df.rename(columns=rename_map, inplace=True)
# 2. 验证关键列是否存在
essential_columns = ['date', 'open', 'close', 'high', 'low', 'volume']
missing_cols = [col for col in essential_columns if col not in df.columns]
if missing_cols:
raise ValueError(f"数据中缺少关键列: {', '.join(missing_cols)}. 可用列: {df.columns.tolist()}")
# 3. 数据清洗和类型转换
df['date'] = pd.to_datetime(df['date'], errors='coerce')
df.dropna(subset=['date'], inplace=True)
for col in ['open', 'close', 'high', 'low', 'volume']:
df[col] = pd.to_numeric(df[col], errors='coerce')
df.dropna(subset=essential_columns, inplace=True)
if df.empty:
raise ValueError("数据清洗后DataFrame为空")
# 4. 排序并返回
result = df.sort_values('date').reset_index(drop=True)
self.data_cache[cache_key] = result.copy()
return result
except Exception as e:
self.logger.error(f"获取股票 {stock_code} 数据失败: {e}")
# 返回一个空的DataFrame以避免下游崩溃
return pd.DataFrame()
def get_north_flow_history(self, stock_code, start_date=None, end_date=None):
"""获取单个股票的北向资金历史持股数据"""
try:
import akshare as ak
# 获取历史持股数据
if start_date is None and end_date is None:
# 默认获取近90天数据
north_hist_data = ak.stock_hsgt_hist_em(symbol=stock_code)
else:
north_hist_data = ak.stock_hsgt_hist_em(symbol=stock_code, start_date=start_date, end_date=end_date)
if north_hist_data.empty:
return {"history": []}
# 转换为列表格式返回
history = []
for _, row in north_hist_data.iterrows():
history.append({
"date": row.get('日期', ''),
"holding": float(row.get('持股数', 0)) if '持股数' in row else 0,
"ratio": float(row.get('持股比例', 0)) if '持股比例' in row else 0,
"change": float(row.get('持股变动', 0)) if '持股变动' in row else 0,
"market_value": float(row.get('持股市值', 0)) if '持股市值' in row else 0
})
return {"history": history}
except Exception as e:
self.logger.error(f"获取北向资金历史数据出错: {str(e)}")
return {"history": []}
def calculate_ema(self, series, period):
"""计算指数移动平均线"""
return series.ewm(span=period, adjust=False).mean()
def calculate_rsi(self, series, period):
"""计算RSI指标"""
delta = series.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean()
rs = gain / loss
return 100 - (100 / (1 + rs))
def calculate_macd(self, series):
"""计算MACD指标"""
exp1 = series.ewm(span=12, adjust=False).mean()
exp2 = series.ewm(span=26, adjust=False).mean()
macd = exp1 - exp2
signal = macd.ewm(span=9, adjust=False).mean()
hist = macd - signal
return macd, signal, hist
def calculate_bollinger_bands(self, series, period, std_dev):
"""计算布林带"""
middle = series.rolling(window=period).mean()
std = series.rolling(window=period).std()
upper = middle + (std * std_dev)
lower = middle - (std * std_dev)
return upper, middle, lower
def calculate_atr(self, df, period):
"""计算ATR指标"""
high = df['high']
low = df['low']
close = df['close'].shift(1)
tr1 = high - low
tr2 = abs(high - close)
tr3 = abs(low - close)
tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
return tr.rolling(window=period).mean()
def format_indicator_data(self, df):
"""格式化指标数据,控制小数位数"""
# 格式化价格数据 (2位小数)
price_columns = ['open', 'close', 'high', 'low', 'MA5', 'MA20', 'MA60', 'BB_upper', 'BB_middle', 'BB_lower']
for col in price_columns:
if col in df.columns:
df[col] = df[col].round(2)
# 格式化MACD相关指标 (3位小数)
macd_columns = ['MACD', 'Signal', 'MACD_hist']
for col in macd_columns:
if col in df.columns:
df[col] = df[col].round(3)
# 格式化其他技术指标 (2位小数)
other_columns = ['RSI', 'Volatility', 'ROC', 'Volume_Ratio']
for col in other_columns:
if col in df.columns:
df[col] = df[col].round(2)
return df
def calculate_indicators(self, df):
"""计算技术指标"""
try:
# 计算移动平均线
df['MA5'] = self.calculate_ema(df['close'], self.params['ma_periods']['short'])
df['MA20'] = self.calculate_ema(df['close'], self.params['ma_periods']['medium'])
df['MA60'] = self.calculate_ema(df['close'], self.params['ma_periods']['long'])
# 计算RSI
df['RSI'] = self.calculate_rsi(df['close'], self.params['rsi_period'])
# 计算MACD
df['MACD'], df['Signal'], df['MACD_hist'] = self.calculate_macd(df['close'])
# 计算布林带
df['BB_upper'], df['BB_middle'], df['BB_lower'] = self.calculate_bollinger_bands(
df['close'],
self.params['bollinger_period'],
self.params['bollinger_std']
)
# 成交量分析
df['Volume_MA'] = df['volume'].rolling(window=self.params['volume_ma_period']).mean()
df['Volume_Ratio'] = df['volume'] / df['Volume_MA']
# 计算ATR和波动率
df['ATR'] = self.calculate_atr(df, self.params['atr_period'])
df['Volatility'] = df['ATR'] / df['close'] * 100
# 动量指标
df['ROC'] = df['close'].pct_change(periods=10) * 100
# 格式化数据
df = self.format_indicator_data(df)
return df
except Exception as e:
self.logger.error(f"计算技术指标时出错: {str(e)}")
raise
def calculate_score(self, df, market_type='A'):
"""
计算股票评分 - 使用时空共振交易系统增强
根据不同的市场特征调整评分权重和标准
"""
try:
score = 0
latest = df.iloc[-1]
prev_days = min(30, len(df) - 1) # Get the most recent 30 days or all available data
# 时空共振框架 - 维度1:多时间框架分析
# 基础权重配置
weights = {
'trend': 0.30, # 趋势因子权重(日线级别)
'volatility': 0.15, # 波动率因子权重
'technical': 0.25, # 技术指标因子权重
'volume': 0.20, # 成交量因子权重(能量守恒维度)
'momentum': 0.10 # 动量因子权重(周线级别)
}
# 根据市场类型调整权重(维度1:时间框架嵌套)
if market_type == 'US':
# 美股优先考虑长期趋势
weights['trend'] = 0.35
weights['volatility'] = 0.10
weights['momentum'] = 0.15
elif market_type == 'HK':
# 港股调整波动率和成交量权重
weights['volatility'] = 0.20
weights['volume'] = 0.25
# 1. 趋势评分(最高30分)- 日线级别分析
trend_score = 0
# 均线评估 - "三线形态"分析
if latest['MA5'] > latest['MA20'] and latest['MA20'] > latest['MA60']:
# 完美多头排列(维度1:日线形态)
trend_score += 15
elif latest['MA5'] > latest['MA20']:
# 短期上升趋势(维度1:5分钟形态)
trend_score += 10
elif latest['MA20'] > latest['MA60']:
# 中期上升趋势
trend_score += 5
# 价格位置评估
if latest['close'] > latest['MA5']:
trend_score += 5
if latest['close'] > latest['MA20']:
trend_score += 5
if latest['close'] > latest['MA60']:
trend_score += 5
# 确保不超过最高分数限制
trend_score = min(30, trend_score)
# 2. 波动率评分(最高15分)- 维度2:过滤
volatility_score = 0
# 适度的波动率最理想
volatility = latest['Volatility']
if 1.0 <= volatility <= 2.5:
# 最佳波动率范围
volatility_score += 15
elif 2.5 < volatility <= 4.0:
# 较高波动率,次优选择
volatility_score += 10
elif volatility < 1.0:
# 波动率过低,缺乏能量
volatility_score += 5
else:
# 波动率过高,风险较大
volatility_score += 0
# 3. 技术指标评分(最高25分)- "峰值检测系统"
technical_score = 0
# RSI指标评估(10分)
rsi = latest['RSI']
if 40 <= rsi <= 60:
# 中性区域,趋势稳定
technical_score += 7
elif 30 <= rsi < 40 or 60 < rsi <= 70:
# 阈值区域,可能出现反转信号
technical_score += 10
elif rsi < 30:
# 超卖区域,可能出现买入机会
technical_score += 8
elif rsi > 70:
# 超买区域,可能存在卖出风险
technical_score += 2
# MACD指标评估(10分)- "峰值预警信号"
if latest['MACD'] > latest['Signal'] and latest['MACD_hist'] > 0:
# MACD金叉且柱状图为正
technical_score += 10
elif latest['MACD'] > latest['Signal']:
# MACD金叉
technical_score += 8
elif latest['MACD'] < latest['Signal'] and latest['MACD_hist'] < 0:
# MACD死叉且柱状图为负
technical_score += 0
elif latest['MACD_hist'] > df.iloc[-2]['MACD_hist']:
# MACD柱状图增长,可能出现反转信号
technical_score += 5
# 布林带位置评估(5分)
bb_position = (latest['close'] - latest['BB_lower']) / (latest['BB_upper'] - latest['BB_lower'])
if 0.3 <= bb_position <= 0.7:
# 价格在布林带中间区域,趋势稳定
technical_score += 3
elif bb_position < 0.2:
# 价格接近下轨,可能超卖
technical_score += 5
elif bb_position > 0.8:
# 价格接近上轨,可能超买
technical_score += 1
# 确保最大分数限制
technical_score = min(25, technical_score)
# 4. 成交量评分(最高20分)- "能量守恒维度"
volume_score = 0
# 成交量趋势分析
recent_vol_ratio = [df.iloc[-i]['Volume_Ratio'] for i in range(1, min(6, len(df)))]
avg_vol_ratio = sum(recent_vol_ratio) / len(recent_vol_ratio)
if avg_vol_ratio > 1.5 and latest['close'] > df.iloc[-2]['close']:
# 成交量放大且价格上涨 - "成交量能量阈值突破"
volume_score += 20
elif avg_vol_ratio > 1.2 and latest['close'] > df.iloc[-2]['close']:
# 成交量和价格同步上涨
volume_score += 15
elif avg_vol_ratio < 0.8 and latest['close'] < df.iloc[-2]['close']:
# 成交量和价格同步下跌,可能是健康回调
volume_score += 10
elif avg_vol_ratio > 1.2 and latest['close'] < df.iloc[-2]['close']:
# 成交量增加但价格下跌,可能存在较大卖压
volume_score += 0
else:
# 其他情况
volume_score += 8
# 5. 动量评分(最高10分)- 维度1:周线级别
momentum_score = 0
# ROC动量指标
roc = latest['ROC']
if roc > 5:
# Strong upward momentum
momentum_score += 10
elif 2 <= roc <= 5:
# Moderate upward momentum
momentum_score += 8
elif 0 <= roc < 2:
# Weak upward momentum
momentum_score += 5
elif -2 <= roc < 0:
# Weak downward momentum
momentum_score += 3
else:
# Strong downward momentum
momentum_score += 0
# 根据加权因子计算总分 - “共振公式”
final_score = (
trend_score * weights['trend'] / 0.30 +
volatility_score * weights['volatility'] / 0.15 +
technical_score * weights['technical'] / 0.25 +
volume_score * weights['volume'] / 0.20 +
momentum_score * weights['momentum'] / 0.10
)
# 特殊市场调整 - “市场适应机制”
if market_type == 'US':
# 美国市场额外调整因素
# 检查是否为财报季
is_earnings_season = self._is_earnings_season()
if is_earnings_season:
# Earnings season has higher volatility, adjust score certainty
final_score = 0.9 * final_score + 5 # Slight regression to the mean
elif market_type == 'HK':
# 港股特殊调整
# 检查A股联动效应
a_share_linkage = self._check_a_share_linkage(df)
if a_share_linkage > 0.7: # High linkage
# 根据大陆市场情绪调整
mainland_sentiment = self._get_mainland_market_sentiment()
if mainland_sentiment > 0:
final_score += 5
else:
final_score -= 5
# Ensure score remains within 0-100 range
final_score = max(0, min(100, round(final_score)))
# Store sub-scores for display
self.score_details = {
'trend': trend_score,
'volatility': volatility_score,
'technical': technical_score,
'volume': volume_score,
'momentum': momentum_score,
'total': final_score
}
return final_score
except Exception as e:
self.logger.error(f"Error calculating score: {str(e)}")
# Return neutral score on error
return 50
def calculate_position_size(self, stock_code, risk_percent=2.0, stop_loss_percent=5.0):
"""
根据风险管理原则计算最佳仓位大小
实施时空共振系统的“仓位大小公式”
参数:
stock_code: 要分析的股票代码
risk_percent: 在此交易中承担风险的总资本百分比(默认为2%)
stop_loss_percent: 从入场点的止损百分比(默认为5%)
返回:
仓位大小占总资本的百分比
"""
try:
# Get stock data
df = self.get_stock_data(stock_code)
df = self.calculate_indicators(df)
# 获取波动率因子(来自维度3:能量守恒)
latest = df.iloc[-1]
volatility = latest['Volatility']
# 计算波动率调整因子(较高波动率=较小仓位)
volatility_factor = 1.0
if volatility > 4.0:
volatility_factor = 0.6 # Reduce position for high volatility stocks
elif volatility > 2.5:
volatility_factor = 0.8 # Slightly reduce position
elif volatility < 1.0:
volatility_factor = 1.2 # Can increase position for low volatility stocks
# Calculate position size using risk formula
# 公式:position_size = (风险金额) / (止损 * 波动率因子)
position_size = (risk_percent) / (stop_loss_percent * volatility_factor)
# 限制最大仓位为25%以实现多元化
position_size = min(position_size, 25.0)
return position_size
except Exception as e:
self.logger.error(f"Error calculating position size: {str(e)}")
# 返回保守的默认仓位大小(出错时)
return 5.0
def get_recommendation(self, score, market_type='A', technical_data=None, news_data=None):
"""
根据得分和附加信息生成投资建议
使用时空共振交易系统策略增强
"""
try:
# 1. Base recommendation logic - Dynamic threshold adjustment based on score
if score >= 85:
base_recommendation = '强烈建议买入'
confidence = 'high'
action = 'strong_buy'
elif score >= 70:
base_recommendation = '建议买入'
confidence = 'medium_high'
action = 'buy'
elif score >= 55:
base_recommendation = '谨慎买入'
confidence = 'medium'
action = 'cautious_buy'
elif score >= 45:
base_recommendation = '持观望态度'
confidence = 'medium'
action = 'hold'
elif score >= 30:
base_recommendation = '谨慎持有'
confidence = 'medium'
action = 'cautious_hold'
elif score >= 15:
base_recommendation = '建议减仓'
confidence = 'medium_high'
action = 'reduce'
else:
base_recommendation = '建议卖出'
confidence = 'high'
action = 'sell'
# 2. Consider market characteristics (Dimension 1: Timeframe Nesting)
market_adjustment = ""
if market_type == 'US':
# US market adjustment factors
if self._is_earnings_season():
if confidence == 'high' or confidence == 'medium_high':
confidence = 'medium'
market_adjustment = "(财报季临近,波动可能加大,建议适当控制仓位)"
elif market_type == 'HK':
# HK market adjustment factors
mainland_sentiment = self._get_mainland_market_sentiment()
if mainland_sentiment < -0.3 and (action == 'buy' or action == 'strong_buy'):
action = 'cautious_buy'
confidence = 'medium'
market_adjustment = "(受大陆市场情绪影响,建议控制风险)"
elif market_type == 'A':
# A-share specific adjustment factors
if technical_data and 'Volatility' in technical_data:
vol = technical_data.get('Volatility', 0)
if vol > 4.0 and (action == 'buy' or action == 'strong_buy'):
action = 'cautious_buy'
confidence = 'medium'
market_adjustment = "(市场波动较大,建议分批买入)"
# 3. Consider market sentiment (Dimension 2: Filtering)
sentiment_adjustment = ""
if news_data and 'market_sentiment' in news_data:
sentiment = news_data.get('market_sentiment', 'neutral')
if sentiment == 'bullish' and action in ['hold', 'cautious_hold']:
action = 'cautious_buy'
sentiment_adjustment = "(市场氛围积极,可适当提高仓位)"
elif sentiment == 'bearish' and action in ['buy', 'cautious_buy']:
action = 'hold'
sentiment_adjustment = "(市场氛围悲观,建议等待更好买点)"
elif self.json_match_flag==False and news_data:
import re
# 如果JSON解析失败,尝试从原始内容中匹配市场情绪
sentiment_pattern = r'(bullish|neutral|bearish)'
sentiment_match = re.search(sentiment_pattern, news_data.get('original_content', ''))
if sentiment_match:
sentiment_map = {
'bullish': 'bullish',
'neutral': 'neutral',
'bearish': 'bearish'
}
sentiment = sentiment_map.get(sentiment_match.group(1), 'neutral')
if sentiment == 'bullish' and action in ['hold', 'cautious_hold']:
action = 'cautious_buy'
sentiment_adjustment = "(市场氛围积极,可适当提高仓位)"
elif sentiment == 'bearish' and action in ['buy', 'cautious_buy']:
action = 'hold'
sentiment_adjustment = "(市场氛围悲观,建议等待更好买点)"
# 4. Technical indicators adjustment (Dimension 2: "Peak Detection System")
technical_adjustment = ""
if technical_data:
rsi = technical_data.get('RSI', 50)
macd_signal = technical_data.get('MACD_signal', 'neutral')
# RSI overbought/oversold adjustment
if rsi > 80 and action in ['buy', 'strong_buy']:
action = 'hold'
technical_adjustment = "(RSI指标显示超买,建议等待回调)"
elif rsi < 20 and action in ['sell', 'reduce']:
action = 'hold'
technical_adjustment = "(RSI指标显示超卖,可能存在反弹机会)"
# MACD signal adjustment
if macd_signal == 'bullish' and action in ['hold', 'cautious_hold']:
action = 'cautious_buy'
if not technical_adjustment:
technical_adjustment = "(MACD显示买入信号)"
elif macd_signal == 'bearish' and action in ['cautious_buy', 'buy']:
action = 'hold'
if not technical_adjustment:
technical_adjustment = "(MACD显示卖出信号)"
# 5. Convert adjusted action to final recommendation
action_to_recommendation = {
'strong_buy': '强烈建议买入',
'buy': '建议买入',
'cautious_buy': '谨慎买入',
'hold': '持观望态度',
'cautious_hold': '谨慎持有',
'reduce': '建议减仓',
'sell': '建议卖出'
}
final_recommendation = action_to_recommendation.get(action, base_recommendation)
# 6. Combine all adjustment factors
adjustments = " ".join(filter(None, [market_adjustment, sentiment_adjustment, technical_adjustment]))
if adjustments:
return f"{final_recommendation} {adjustments}"
else:
return final_recommendation
except Exception as e:
self.logger.error(f"Error generating investment recommendation: {str(e)}")
# Return safe default recommendation on error
return "无法提供明确建议,请结合多种因素谨慎决策"
def check_consecutive_losses(self, trade_history, max_consecutive_losses=3):
"""
实施“冷静期风险控制” - 连续亏损后停止交易
参数:
trade_history: 最近交易结果列表 (True 表示盈利, False 表示亏损)
max_consecutive_losses: 允许的最大连续亏损次数
返回:
Boolean: True 如果应该暂停交易, False 如果可以继续交易
"""
consecutive_losses = 0
# Count consecutive losses from most recent trades
for trade in reversed(trade_history):
if not trade: # If trade is a loss
consecutive_losses += 1
else:
break # Break on first profitable trade
# Return True if we've hit max consecutive losses
return consecutive_losses >= max_consecutive_losses
def check_profit_taking(self, current_profit_percent, threshold=20.0):
"""
当回报超过阈值时,实施获利了结机制
属于“能量守恒维度”的一部分
参数:
current_profit_percent: 当前利润百分比
threshold: 用于获利了结的利润百分比阈值
返回:
Float: 减少仓位的百分比 (0.0-1.0)
"""
if current_profit_percent >= threshold:
# If profit exceeds threshold, suggest reducing position by 50%
return 0.5
return 0.0 # No position reduction recommended
def _is_earnings_season(self):
"""检查当前是否处于财报季(辅助函数)"""
from datetime import datetime
current_month = datetime.now().month
# 美股财报季大致在1月、4月、7月和10月
return current_month in [1, 4, 7, 10]
def _check_a_share_linkage(self, df, window=20):
"""检查港股与A股的联动性(辅助函数)"""
# 该函数需要获取对应的A股指数数据
# 简化版实现:
try:
# 获取恒生指数与上证指数的相关系数
# 实际实现中需要获取真实数据
correlation = 0.6 # 示例值
return correlation
except:
return 0.5 # 默认中等关联度
def _get_mainland_market_sentiment(self):
"""获取中国大陆市场情绪(辅助函数)"""
# 实际实现中需要分析上证指数、北向资金等因素
try:
# 简化版实现,返回-1到1之间的值,1表示积极情绪
sentiment = 0.2 # 示例值
return sentiment
except:
return 0 # 默认中性情绪
def get_stock_news(self, stock_code, market_type='A', limit=5):
"""
获取股票相关新闻和实时信息,直接调用搜索工具
参数:
stock_code: 股票代码
market_type: 市场类型 (A/HK/US)
limit: 返回的新闻条数上限
返回:
包含新闻和公告的字典
"""
try:
self.logger.info(f"获取股票 {stock_code} 的相关新闻和信息")
# 缓存键
cache_key = f"{stock_code}_{market_type}_news"
if cache_key in self.data_cache and (
datetime.now() - self.data_cache[cache_key]['timestamp']).seconds < 3600:
# 缓存1小时内的数据
return self.data_cache[cache_key]['data']
# 获取股票基本信息
stock_info = self.get_stock_info(stock_code)
stock_name = stock_info.get('股票名称', '未知')
industry = stock_info.get('行业', '未知')
market_name = "A股" if market_type == 'A' else "港股" if market_type == 'HK' else "美股"
def search_news_local():
"""实际执行搜索的函数"""
try:
# 获取API密钥
serp_api_key = os.getenv('SERP_API_KEY')
tavily_api_key = os.getenv('TAVILY_API_KEY')
if not serp_api_key and not tavily_api_key:
self.logger.error("未找到SERP_API_KEY或TAVILY_API_KEY环境变量")
return {"error": "未配置搜索API密钥"}
# 构建搜索查询
search_query = f"{stock_name} {stock_code} {market_name} 最新新闻 公告"
industry_query = f"{industry} {market_name} 行业动态 最新消息"
news_results = []
industry_news = []
# 如果配置了SERP API,使用SERP API搜索
if serp_api_key:
# ... (SERP API logic remains the same)
pass
# 如果配置了Tavily API,使用Tavily API搜索
if tavily_api_key and tavily_api_key != 'your_tavily_api_key':
self.logger.info(f"使用Tavily API搜索新闻: {search_query}")
try:
from tavily import TavilyClient
client = TavilyClient(tavily_api_key)
# Search for stock news
tavily_response = client.search(query=search_query, topic="finance", search_depth="advanced")
if "results" in tavily_response:
for item in tavily_response["results"][:limit]:
source = urlparse(item.get("url")).netloc if item.get("url") else ""
news_results.append({
"title": item.get("title", ""), "date": datetime.now().strftime("%Y-%m-%d"),
"source": source, "link": item.get("url", ""), "snippet": item.get("content", "")
})
# Search for industry news
tavily_industry_response = client.search(query=industry_query, topic="finance", search_depth="advanced")
if "results" in tavily_industry_response:
for item in tavily_industry_response["results"][:limit]:
source = urlparse(item.get("url")).netloc if item.get("url") else ""
industry_news.append({
"title": item.get("title", ""), "date": datetime.now().strftime("%Y-%m-%d"),
"source": source, "summary": item.get("content", "")
})
except ImportError:
self.logger.error("Tavily client not installed. Please run: pip install tavily-python")
except Exception as e:
self.logger.error(f"Error during Tavily API search: {e}", exc_info=True)
# 移除可能的重复结果
unique_news = [dict(t) for t in {tuple(d.items()) for d in news_results}]
unique_industry_news = [dict(t) for t in {tuple(d.items()) for d in industry_news}]
# 分析市场情绪
sentiment_keywords = {
'bullish': ['上涨', '增长', '利好', '突破', '强势', '看好', '机会', '利润'],
'slightly_bullish': ['回升', '改善', '企稳', '向好', '期待'],
'neutral': ['稳定', '平稳', '持平', '不变'],
'slightly_bearish': ['回调', '承压', '谨慎', '风险', '下滑'],
'bearish': ['下跌', '亏损', '跌破', '利空', '警惕', '危机', '崩盘']
}
sentiment_scores = {k: 0 for k in sentiment_keywords}
all_text = " ".join([n.get("title", "") + " " + n.get("snippet", "") for n in unique_news])
for sentiment, keywords in sentiment_keywords.items():
for keyword in keywords:
if keyword in all_text:
sentiment_scores[sentiment] += 1
market_sentiment = max(sentiment_scores, key=sentiment_scores.get) if any(sentiment_scores.values()) else "neutral"
self.logger.info(f"搜索完成,共获取到 {len(unique_news)} 条新闻和 {len(unique_industry_news)} 条行业新闻")
return {
"news": unique_news,
"announcements": [],
"industry_news": unique_industry_news,
"market_sentiment": market_sentiment
}
except Exception as e:
self.logger.error(f"搜索新闻时出错: {e}", exc_info=True)
return {"error": str(e)}
news_data = search_news_local()
# 确保数据结构完整
news_data.setdefault('news', [])
news_data.setdefault('announcements', [])
news_data.setdefault('industry_news', [])
news_data.setdefault('market_sentiment', 'neutral')
news_data['timestamp'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 缓存结果
self.data_cache[cache_key] = {'data': news_data, 'timestamp': datetime.now()}
return news_data
except Exception as e:
self.logger.error(f"获取股票新闻时出错: {e}", exc_info=True)
return {
'news': [], 'announcements': [], 'industry_news': [],
'market_sentiment': 'neutral',
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
def get_ai_analysis_from_prompt(self, prompt: str) -> str:
"""
接收一个已经构建好的prompt,并返回AI模型的分析结果。
"""
try:
import queue
import threading
messages = [{"role": "user", "content": prompt}]
result_queue = queue.Queue()
def call_api():
try:
response = self.client.chat.completions.create(
model=self.openai_model,
messages=messages,
temperature=0.8,
max_tokens=4000,
stream=False,
timeout=300
)
result_queue.put(response)
except Exception as e:
result_queue.put(e)
api_thread = threading.Thread(target=call_api)
api_thread.daemon = True
api_thread.start()
try:
result = result_queue.get(timeout=240)
if isinstance(result, Exception):
raise result
assistant_reply = result.choices[0].message.content.strip()
return assistant_reply
except queue.Empty:
return "AI分析超时,无法获取分析结果。请稍后再试。"
except Exception as e:
return f"AI分析过程中发生错误: {str(e)}"
except Exception as e:
self.logger.error(f"从prompt进行AI分析时出错: {str(e)}")
return f"AI分析过程中发生错误,请稍后再试。错误信息: {str(e)}"
def _build_stock_prompt_and_get_analysis(self, df, stock_code, market_type='A'):
"""
为个股分析构建详细的prompt,并调用AI模型。
"""
try:
# 1. 获取最近K线数据
recent_data = df.tail(20).to_dict('records')
# 2. 计算技术指标摘要
technical_summary = {
'trend': 'upward' if df.iloc[-1]['MA5'] > df.iloc[-1]['MA20'] else 'downward',
'volatility': f"{df.iloc[-1]['Volatility']:.2f}%",
'volume_trend': 'increasing' if df.iloc[-1]['Volume_Ratio'] > 1 else 'decreasing',
'rsi_level': df.iloc[-1]['RSI'],
'macd_signal': 'bullish' if df.iloc[-1]['MACD'] > df.iloc[-1]['Signal'] else 'bearish',
'bb_position': self._calculate_bb_position(df)
}
# 3. 获取支撑压力位
sr_levels = self.identify_support_resistance(df)
# 4. 获取股票基本信息
stock_info = self.get_stock_info(stock_code)
stock_name = stock_info.get('股票名称', '未知')
industry = stock_info.get('行业', '未知')
# 5. 获取相关新闻和实时信息
self.logger.info(f"获取 {stock_code} 的相关新闻和市场信息")
news_data = self.get_stock_news(stock_code, market_type)
# 6. 评分分解
score = self.calculate_score(df, market_type)
score_details = getattr(self, 'score_details', {'total': score})
# 7. 获取投资建议
tech_data = {
'RSI': technical_summary['rsi_level'],
'MACD_signal': technical_summary['macd_signal'],
'Volatility': df.iloc[-1]['Volatility']
}
recommendation = self.get_recommendation(score, market_type, tech_data, news_data)
# 8. 构建全面的prompt
prompt = f"""作为专业的股票分析师,请对{stock_name}({stock_code})进行全面分析:
1. 基本信息:
- 股票名称: {stock_name}
- 股票代码: {stock_code}
- 行业: {industry}
- 市场类型: {"A股" if market_type == 'A' else "港股" if market_type == 'HK' else "美股"}
2. 技术指标摘要:
- 趋势: {technical_summary['trend']}
- 波动率: {technical_summary['volatility']}
- 成交量趋势: {technical_summary['volume_trend']}
- RSI: {technical_summary['rsi_level']:.2f}
- MACD信号: {technical_summary['macd_signal']}
- 布林带位置: {technical_summary['bb_position']}
3. 支撑与压力位:
- 短期支撑位: {', '.join([str(level) for level in sr_levels['support_levels']['short_term']])}
- 中期支撑位: {', '.join([str(level) for level in sr_levels['support_levels']['medium_term']])}
- 短期压力位: {', '.join([str(level) for level in sr_levels['resistance_levels']['short_term']])}
- 中期压力位: {', '.join([str(level) for level in sr_levels['resistance_levels']['medium_term']])}
4. 综合评分: {score_details['total']}
- 趋势评分: {score_details.get('trend', 0)}
- 波动率评分: {score_details.get('volatility', 0)}
- 技术指标评分: {score_details.get('technical', 0)}
- 成交量评分: {score_details.get('volume', 0)}
- 动量评分: {score_details.get('momentum', 0)}
5. 投资建议: {recommendation}"""
# 检查是否有JSON解析失败的情况
if hasattr(self, 'json_match_flag') and not self.json_match_flag and 'original_content' in news_data:
# 如果JSON解析失败,直接使用原始内容
prompt += f"""
6. 相关新闻和市场信息:
{news_data.get('original_content', '无法获取相关新闻')}
"""
else:
# 正常情况下使用格式化的新闻数据
prompt += f"""
6. 近期相关新闻:
{self._format_news_for_prompt(news_data.get('news', []))}
7. 公司公告:
{self._format_announcements_for_prompt(news_data.get('announcements', []))}
8. 行业动态:
{self._format_news_for_prompt(news_data.get('industry_news', []))}
9. 市场情绪: {news_data.get('market_sentiment', 'neutral')}
请提供以下内容:
1. 技术面分析 - 详细分析价格走势、支撑压力位、主要技术指标的信号
2. 行业和市场环境 - 结合新闻和行业动态分析公司所处环境
3. 风险因素 - 识别潜在风险点
4. 具体交易策略 - 给出明确的买入/卖出建议,包括入场点、止损位和目标价位
5. 短期(1周)、中期(1-3个月)和长期(半年)展望
请基于数据给出客观分析,不要过度乐观或悲观。分析应该包含具体数据和百分比,避免模糊表述。
"""
return self.get_ai_analysis_from_prompt(prompt)
except Exception as e:
self.logger.error(f"构建个股分析prompt时出错: {str(e)}")
return f"AI分析过程中发生错误: {str(e)}"
def _calculate_bb_position(self, df):
"""计算价格在布林带中的位置"""
latest = df.iloc[-1]
bb_width = latest['BB_upper'] - latest['BB_lower']
if bb_width == 0:
return "middle"
position = (latest['close'] - latest['BB_lower']) / bb_width
if position < 0.2:
return "near lower band (potential oversold)"
elif position < 0.4:
return "below middle band"
elif position < 0.6:
return "near middle band"
elif position < 0.8:
return "above middle band"
else:
return "near upper band (potential overbought)"
def _format_news_for_prompt(self, news_list):
"""格式化新闻列表为prompt字符串"""
if not news_list:
return " 无最新相关新闻"
formatted = ""
for i, news in enumerate(news_list[:3]): # 最多显示3条
date = news.get('date', '')
title = news.get('title', '')
source = news.get('source', '')
formatted += f" {i + 1}. [{date}] {title} (来源: {source})\\n"
return formatted
def _format_announcements_for_prompt(self, announcements):
"""格式化公告列表为prompt字符串"""
if not announcements:
return " 无最新公告"
formatted = ""
for i, ann in enumerate(announcements[:3]): # 最多显示3条
date = ann.get('date', '')
title = ann.get('title', '')
type_ = ann.get('type', '')
formatted += f" {i + 1}. [{date}] {title} (类型: {type_})\\n"
return formatted
# 原有API:保持接口不变
def analyze_stock(self, stock_code, market_type='A'):
"""分析单个股票"""
try:
# self.clear_cache(stock_code, market_type)
# 获取股票数据
df = self.get_stock_data(stock_code, market_type)
self.logger.info(f"获取股票数据完成")
# 计算技术指标
df = self.calculate_indicators(df)
self.logger.info(f"计算技术指标完成")
# 评分系统
score = self.calculate_score(df)
self.logger.info(f"评分系统完成")
# 获取最新数据
latest = df.iloc[-1]
prev = df.iloc[-2]
# 获取基本信息
stock_info = self.get_stock_info(stock_code)
stock_name = stock_info.get('股票名称', '未知')
industry = stock_info.get('行业', '未知')
# 生成报告(保持原有格式)
report = {
'stock_code': stock_code,
'stock_name': stock_name,
'industry': industry,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'score': score,
'price': latest['close'],
'price_change': (latest['close'] - prev['close']) / prev['close'] * 100,
'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
'rsi': latest['RSI'],
'macd_signal': 'BUY' if latest['MACD'] > latest['Signal'] else 'SELL',
'volume_status': '放量' if latest['Volume_Ratio'] > 1.5 else '平量',
'recommendation': self.get_recommendation(score),
'ai_analysis': self._build_stock_prompt_and_get_analysis(df, stock_code)
}
return report
except Exception as e:
self.logger.error(f"分析股票时出错: {str(e)}")
raise
# 原有API:保持接口不变
def scan_market(self, stock_list, min_score=60, market_type='A'):
"""扫描市场,寻找符合条件的股票"""
recommendations = []
total_stocks = len(stock_list)
self.logger.info(f"开始市场扫描,共 {total_stocks} 只股票")
start_time = time.time()
processed = 0
# 批量处理,减少日志输出
batch_size = 10
for i in range(0, total_stocks, batch_size):
batch = stock_list[i:i + batch_size]
batch_results = []
for stock_code in batch:
try:
# 使用简化版分析以加快速度
report = self.quick_analyze_stock(stock_code, market_type)
if report['score'] >= min_score:
batch_results.append(report)
except Exception as e:
self.logger.error(f"分析股票 {stock_code} 时出错: {str(e)}")
continue
# 添加批处理结果
recommendations.extend(batch_results)
# 更新处理进度
processed += len(batch)
elapsed = time.time() - start_time
remaining = (elapsed / processed) * (total_stocks - processed) if processed > 0 else 0
self.logger.info(
f"已处理 {processed}/{total_stocks} 只股票,耗时 {elapsed:.1f}秒,预计剩余 {remaining:.1f}秒")
# 按得分排序
recommendations.sort(key=lambda x: x['score'], reverse=True)
total_time = time.time() - start_time
self.logger.info(
f"市场扫描完成,共分析 {total_stocks} 只股票,找到 {len(recommendations)} 只符合条件的股票,总耗时 {total_time:.1f}秒")
return recommendations
# def quick_analyze_stock(self, stock_code, market_type='A'):
# """快速分析股票,用于市场扫描"""
# try:
# # 获取股票数据
# df = self.get_stock_data(stock_code, market_type)
# # 计算技术指标
# df = self.calculate_indicators(df)
# # 简化评分计算
# score = self.calculate_score(df)
# # 获取最新数据
# latest = df.iloc[-1]
# prev = df.iloc[-2] if len(df) > 1 else latest
# # 尝试获取股票名称和行业
# try:
# stock_info = self.get_stock_info(stock_code)
# stock_name = stock_info.get('股票名称', '未知')
# industry = stock_info.get('行业', '未知')
# except:
# stock_name = '未知'
# industry = '未知'
# # 生成简化报告
# report = {
# 'stock_code': stock_code,
# 'stock_name': stock_name,
# 'industry': industry,
# 'analysis_date': datetime.now().strftime('%Y-%m-%d'),
# 'score': score,
# 'price': float(latest['close']),
# 'price_change': float((latest['close'] - prev['close']) / prev['close'] * 100),
# 'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
# 'rsi': float(latest['RSI']),
# 'macd_signal': 'BUY' if latest['MACD'] > latest['Signal'] else 'SELL',
# 'volume_status': '放量' if latest['Volume_Ratio'] > 1.5 else '平量',
# 'recommendation': self.get_recommendation(score)
# }
# return report
# except Exception as e:
# self.logger.error(f"快速分析股票 {stock_code} 时出错: {str(e)}")
# raise
def quick_analyze_stock(self, stock_code, market_type='A'):
"""快速分析股票,用于市场扫描"""
try:
# 获取股票数据
df = self.get_stock_data(stock_code, market_type)
if df.empty:
self.logger.warning(f"无法为 {stock_code} 获取有效数据,跳过分析。")
raise ValueError(f"股票 {stock_code} 的数据为空或无法处理")
# 计算技术指标
df = self.calculate_indicators(df)
# 简化评分计算
score = self.calculate_score(df)
# 获取最新数据
latest = df.iloc[-1]
prev = df.iloc[-2] if len(df) > 1 else latest
# 先获取股票信息再生成报告
try:
stock_info = self.get_stock_info(stock_code)
stock_name = stock_info.get('股票名称', '未知')
industry = stock_info.get('行业', '未知')
# 添加日志
self.logger.info(f"股票 {stock_code} 信息: 名称={stock_name}, 行业={industry}")
except Exception as e:
self.logger.error(f"获取股票 {stock_code} 信息时出错: {str(e)}")
stock_name = '未知'
industry = '未知'
# 生成简化报告
report = {
'stock_code': stock_code,
'stock_name': stock_name,
'industry': industry,
'analysis_date': datetime.now().strftime('%Y-%m-%d'),
'score': score,
'price': float(latest['close']),
'price_change': float((latest['close'] - prev['close']) / prev['close'] * 100),
'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
'rsi': float(latest['RSI']),
'macd_signal': 'BUY' if latest['MACD'] > latest['Signal'] else 'SELL',
'volume_status': 'HIGH' if latest['Volume_Ratio'] > 1.5 else 'NORMAL',
'recommendation': self.get_recommendation(score)
}
return report
except Exception as e:
self.logger.error(f"快速分析股票 {stock_code} 时出错: {str(e)}")
raise
# ======================== 新增功能 ========================#
def get_stock_info(self, stock_code):
"""获取股票基本信息"""
import akshare as ak
cache_key = f"{stock_code}_info"
if cache_key in self.data_cache:
return self.data_cache[cache_key]
try:
# 获取A股股票基本信息
stock_info = ak.stock_individual_info_em(symbol=stock_code)
# 修改:使用列名而不是索引访问数据
info_dict = {}
for _, row in stock_info.iterrows():
# 使用iloc安全地获取数据
if len(row) >= 2: # 确保有至少两列
info_dict[row.iloc[0]] = row.iloc[1]
# 获取股票名称
try:
stock_name_df = ak.stock_info_a_code_name()
# 标准化列名
rename_map = {
"代码": "code", "名称": "name", "symbol": "code", "股票代码": "code", "stock_code": "code",
"股票名称": "name", "stock_name": "name"
}
stock_name_df.rename(columns=lambda c: rename_map.get(c, c), inplace=True)
if 'code' in stock_name_df.columns and 'name' in stock_name_df.columns:
name_series = stock_name_df.set_index('code')['name']
name = name_series.get(str(stock_code))
if not name:
self.logger.warning(f"无法从 stock_info_a_code_name 找到股票代码 {stock_code} 的名称")
name = "未知"
else:
self.logger.warning(f"stock_info_a_code_name 返回的DataFrame缺少 'code' 或 'name' 列: {stock_name_df.columns.tolist()}")
name = "未知"
except Exception as e:
self.logger.error(f"获取股票名称时出错: {str(e)}")
name = "未知"
info_dict['股票名称'] = name
# 确保基本字段存在
if '行业' not in info_dict:
info_dict['行业'] = "未知"
if '地区' not in info_dict:
info_dict['地区'] = "未知"
# 增加更多日志来调试问题
self.logger.info(f"获取到股票信息: 名称={name}, 行业={info_dict.get('行业', '未知')}")
self.data_cache[cache_key] = info_dict
return info_dict
except Exception as e:
self.logger.error(f"获取股票信息失败: {str(e)}")
return {"股票名称": "未知", "行业": "未知", "地区": "未知"}
def identify_support_resistance(self, df):
"""识别支撑位和压力位"""
latest_price = df['close'].iloc[-1]
# 使用布林带作为支撑压力参考
support_levels = [df['BB_lower'].iloc[-1]]
resistance_levels = [df['BB_upper'].iloc[-1]]
# 添加主要均线作为支撑压力
if latest_price < df['MA5'].iloc[-1]:
resistance_levels.append(df['MA5'].iloc[-1])
else:
support_levels.append(df['MA5'].iloc[-1])
if latest_price < df['MA20'].iloc[-1]:
resistance_levels.append(df['MA20'].iloc[-1])
else:
support_levels.append(df['MA20'].iloc[-1])
# 添加整数关口
price_digits = len(str(int(latest_price)))
base = 10 ** (price_digits - 1)
lower_integer = math.floor(latest_price / base) * base
upper_integer = math.ceil(latest_price / base) * base
if lower_integer < latest_price:
support_levels.append(lower_integer)
if upper_integer > latest_price:
resistance_levels.append(upper_integer)
# 排序并格式化
support_levels = sorted(set([round(x, 2) for x in support_levels if x < latest_price]), reverse=True)
resistance_levels = sorted(set([round(x, 2) for x in resistance_levels if x > latest_price]))
# 分类为短期和中期
short_term_support = support_levels[:1] if support_levels else []
medium_term_support = support_levels[1:2] if len(support_levels) > 1 else []
short_term_resistance = resistance_levels[:1] if resistance_levels else []
medium_term_resistance = resistance_levels[1:2] if len(resistance_levels) > 1 else []
return {
'support_levels': {
'short_term': short_term_support,
'medium_term': medium_term_support
},
'resistance_levels': {
'short_term': short_term_resistance,
'medium_term': medium_term_resistance
}
}
def calculate_technical_score(self, df):
"""计算技术面评分 (0-40分)"""
try:
score = 0
# 确保有足够的数据
if len(df) < 2:
self.logger.warning("数据不足,无法计算技术面评分")
return {'total': 0, 'trend': 0, 'indicators': 0, 'support_resistance': 0, 'volatility_volume': 0}
latest = df.iloc[-1]
prev = df.iloc[-2] # 获取前一个时间点的数据
prev_close = prev['close']
# 1. 趋势分析 (0-10分)
trend_score = 0
# 均线排列情况
if latest['MA5'] > latest['MA20'] > latest['MA60']: # 多头排列
trend_score += 5
elif latest['MA5'] < latest['MA20'] < latest['MA60']: # 空头排列
trend_score = 0
else: # 交叉状态
if latest['MA5'] > latest['MA20']:
trend_score += 3
if latest['MA20'] > latest['MA60']:
trend_score += 2
# 价格与均线关系
if latest['close'] > latest['MA5']:
trend_score += 3
elif latest['close'] > latest['MA20']:
trend_score += 2
# 限制最大值
trend_score = min(trend_score, 10)
score += trend_score
# 2. 技术指标分析 (0-10分)
indicator_score = 0
# RSI
if 40 <= latest['RSI'] <= 60: # 中性
indicator_score += 2
elif 30 <= latest['RSI'] < 40 or 60 < latest['RSI'] <= 70: # 边缘区域
indicator_score += 4
elif latest['RSI'] < 30: # 超卖
indicator_score += 5
elif latest['RSI'] > 70: # 超买
indicator_score += 0
# MACD
if latest['MACD'] > latest['Signal']: # MACD金叉或在零轴上方
indicator_score += 3
else:
# 修复:比较当前和前一个时间点的MACD柱状图值
if latest['MACD_hist'] > prev['MACD_hist']: # 柱状图上升
indicator_score += 1
# 限制最大值和最小值
indicator_score = max(0, min(indicator_score, 10))
score += indicator_score
# 3. 支撑压力位分析 (0-10分)
sr_score = 0
# 识别支撑位和压力位
middle_price = latest['close']
upper_band = latest['BB_upper']
lower_band = latest['BB_lower']
# 距离布林带上下轨的距离
upper_distance = (upper_band - middle_price) / middle_price * 100
lower_distance = (middle_price - lower_band) / middle_price * 100
if lower_distance < 2: # 接近下轨
sr_score += 5
elif lower_distance < 5:
sr_score += 3
if upper_distance > 5: # 距上轨较远
sr_score += 5
elif upper_distance > 2:
sr_score += 2
# 限制最大值
sr_score = min(sr_score, 10)
score += sr_score
# 4. 波动性和成交量分析 (0-10分)
vol_score = 0
# 波动率分析
if latest['Volatility'] < 2: # 低波动率
vol_score += 3
elif latest['Volatility'] < 4: # 中等波动率
vol_score += 2
# 成交量分析
if 'Volume_Ratio' in df.columns:
if latest['Volume_Ratio'] > 1.5 and latest['close'] > prev_close: # 放量上涨
vol_score += 4
elif latest['Volume_Ratio'] < 0.8 and latest['close'] < prev_close: # 缩量下跌
vol_score += 3
elif latest['Volume_Ratio'] > 1 and latest['close'] > prev_close: # 普通放量上涨
vol_score += 2
# 限制最大值
vol_score = min(vol_score, 10)
score += vol_score
# 保存各个维度的分数
technical_scores = {
'total': score,
'trend': trend_score,
'indicators': indicator_score,
'support_resistance': sr_score,
'volatility_volume': vol_score
}
return technical_scores
except Exception as e:
self.logger.error(f"计算技术面评分时出错: {str(e)}")
self.logger.error(f"错误详情: {traceback.format_exc()}")
return {'total': 0, 'trend': 0, 'indicators': 0, 'support_resistance': 0, 'volatility_volume': 0}
def perform_enhanced_analysis(self, stock_code, market_type='A'):
"""执行增强版分析"""
try:
# 记录开始时间,便于性能分析
start_time = time.time()
self.logger.info(f"开始执行股票 {stock_code} 的增强分析")
# 获取股票数据
df = self.get_stock_data(stock_code, market_type)
data_time = time.time()
self.logger.info(f"获取股票数据耗时: {data_time - start_time:.2f}秒")
# 计算技术指标
df = self.calculate_indicators(df)
indicator_time = time.time()
self.logger.info(f"计算技术指标耗时: {indicator_time - data_time:.2f}秒")
# 获取最新数据
latest = df.iloc[-1]
prev = df.iloc[-2] if len(df) > 1 else latest
# 获取支撑压力位
sr_levels = self.identify_support_resistance(df)
# 计算技术面评分
technical_score = self.calculate_technical_score(df)
# 获取股票信息
stock_info = self.get_stock_info(stock_code)
# 确保technical_score包含必要的字段
if 'total' not in technical_score:
technical_score['total'] = 0
# 生成增强版报告
enhanced_report = {
'basic_info': {
'stock_code': stock_code,
'stock_name': stock_info.get('股票名称', '未知'),
'industry': stock_info.get('行业', '未知'),
'analysis_date': datetime.now().strftime('%Y-%m-%d')
},
'price_data': {
'current_price': float(latest['close']), # 确保是Python原生类型
'price_change': float((latest['close'] - prev['close']) / prev['close'] * 100),
'price_change_value': float(latest['close'] - prev['close'])
},
'technical_analysis': {
'trend': {
'ma_trend': 'UP' if latest['MA5'] > latest['MA20'] else 'DOWN',
'ma_status': "多头排列" if latest['MA5'] > latest['MA20'] > latest['MA60'] else
"空头排列" if latest['MA5'] < latest['MA20'] < latest['MA60'] else
"交叉状态",
'ma_values': {
'ma5': float(latest['MA5']),
'ma20': float(latest['MA20']),
'ma60': float(latest['MA60'])
}
},
'indicators': {
# 确保所有指标都存在并是原生类型
'rsi': float(latest['RSI']) if 'RSI' in latest else 50.0,
'macd': float(latest['MACD']) if 'MACD' in latest else 0.0,
'macd_signal': float(latest['Signal']) if 'Signal' in latest else 0.0,
'macd_histogram': float(latest['MACD_hist']) if 'MACD_hist' in latest else 0.0,
'volatility': float(latest['Volatility']) if 'Volatility' in latest else 0.0
},
'volume': {
'current_volume': float(latest['volume']) if 'volume' in latest else 0.0,
'volume_ratio': float(latest['Volume_Ratio']) if 'Volume_Ratio' in latest else 1.0,
'volume_status': '放量' if 'Volume_Ratio' in latest and latest['Volume_Ratio'] > 1.5 else '平量'
},
'support_resistance': sr_levels
},
'scores': technical_score,
'recommendation': {
'action': self.get_recommendation(technical_score['total']),
'key_points': []
},
'ai_analysis': self._build_stock_prompt_and_get_analysis(df, stock_code)
}
# 最后检查并修复报告结构
self._validate_and_fix_report(enhanced_report)
# 在函数结束时记录总耗时
end_time = time.time()
self.logger.info(f"执行增强分析总耗时: {end_time - start_time:.2f}秒")
return enhanced_report
except Exception as e:
self.logger.error(f"执行增强版分析时出错: {str(e)}")
self.logger.error(traceback.format_exc())
# 返回基础错误报告
return {
'basic_info': {
'stock_code': stock_code,
'stock_name': '分析失败',
'industry': '未知',
'analysis_date': datetime.now().strftime('%Y-%m-%d')
},
'price_data': {
'current_price': 0.0,
'price_change': 0.0,
'price_change_value': 0.0
},
'technical_analysis': {
'trend': {
'ma_trend': 'UNKNOWN',
'ma_status': '未知',
'ma_values': {'ma5': 0.0, 'ma20': 0.0, 'ma60': 0.0}
},
'indicators': {
'rsi': 50.0,
'macd': 0.0,
'macd_signal': 0.0,
'macd_histogram': 0.0,
'volatility': 0.0
},
'volume': {
'current_volume': 0.0,
'volume_ratio': 0.0,
'volume_status': 'NORMAL'
},
'support_resistance': {
'support_levels': {'short_term': [], 'medium_term': []},
'resistance_levels': {'short_term': [], 'medium_term': []}
}
},
'scores': {'total': 0},
'recommendation': {'action': '分析出错,无法提供建议'},
'ai_analysis': f"分析过程中出错: {str(e)}"
}
return error_report
# 添加一个辅助方法确保报告结构完整
def _validate_and_fix_report(self, report):
"""确保分析报告结构完整"""
# 检查必要的顶级字段
required_sections = ['basic_info', 'price_data', 'technical_analysis', 'scores', 'recommendation',
'ai_analysis']
for section in required_sections:
if section not in report:
self.logger.warning(f"报告缺少 {section} 部分,添加空对象")
report[section] = {}
# 检查technical_analysis的结构
if 'technical_analysis' in report:
tech = report['technical_analysis']
if not isinstance(tech, dict):
report['technical_analysis'] = {}
tech = report['technical_analysis']
# 检查indicators部分
if 'indicators' not in tech or not isinstance(tech['indicators'], dict):
tech['indicators'] = {
'rsi': 50.0,
'macd': 0.0,
'macd_signal': 0.0,
'macd_histogram': 0.0,
'volatility': 0.0
}
# 转换所有指标为原生Python类型
for key, value in tech['indicators'].items():
try:
tech['indicators'][key] = float(value)
except (TypeError, ValueError):
tech['indicators'][key] = 0.0