diff --git "a/app/web/web_server.py" "b/app/web/web_server.py" --- "a/app/web/web_server.py" +++ "b/app/web/web_server.py" @@ -1,2234 +1,2201 @@ -# -*- coding: utf-8 -*- -""" -智能分析系统(股票) - 股票市场数据分析系统 -修改:熊猫大侠 -版本:v2.1.0 -""" -# web_server.py - -import numpy as np -import pandas as pd -from flask import Flask, render_template, request, jsonify, redirect, url_for -from app.analysis.stock_analyzer import StockAnalyzer -from app.analysis.us_stock_service import USStockService -import threading -import logging -from logging.handlers import RotatingFileHandler -import traceback -import os -import json -from datetime import date, datetime, timedelta -from flask_cors import CORS -from pathlib import Path -import time -from flask_caching import Cache -import threading -import sys -from flask_swagger_ui import get_swaggerui_blueprint -from app.core.database import get_session, StockInfo, AnalysisResult, Portfolio, USE_DATABASE -from dotenv import load_dotenv -from app.analysis.industry_analyzer import IndustryAnalyzer -from app.analysis.fundamental_analyzer import FundamentalAnalyzer -from app.analysis.capital_flow_analyzer import CapitalFlowAnalyzer -from app.analysis.scenario_predictor import ScenarioPredictor -from app.analysis.stock_qa import StockQA -from app.analysis.risk_monitor import RiskMonitor -from app.analysis.index_industry_analyzer import IndexIndustryAnalyzer -from app.analysis.news_fetcher import news_fetcher, start_news_scheduler -from app.analysis.etf_analyzer import EtfAnalyzer - -import sys -import os - -# 将 tradingagents 目录添加到系统路径 -# 这允许应用从 tradingagents 代码库中导入模块 -sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../tradingagents'))) - - -# 加载环境变量 -load_dotenv() - -# 检查是否需要初始化数据库 -if USE_DATABASE: - init_db() - -# 配置Swagger -SWAGGER_URL = '/api/docs' -API_URL = '/static/swagger.json' -swaggerui_blueprint = get_swaggerui_blueprint( - SWAGGER_URL, - API_URL, - config={ - 'app_name': "股票智能分析系统 API文档" - } -) - -app = Flask(__name__) -CORS(app, resources={r"/*": {"origins": "*"}}) -analyzer = StockAnalyzer() -us_stock_service = USStockService() - -# 配置缓存 -cache_config = { - 'CACHE_TYPE': 'SimpleCache', - 'CACHE_DEFAULT_TIMEOUT': 300 -} - -# 如果配置了Redis,使用Redis作为缓存后端 -if os.getenv('USE_REDIS_CACHE', 'False').lower() == 'true' and os.getenv('REDIS_URL'): - cache_config = { - 'CACHE_TYPE': 'RedisCache', - 'CACHE_REDIS_URL': os.getenv('REDIS_URL'), - 'CACHE_DEFAULT_TIMEOUT': 300 - } - -cache = Cache(config={'CACHE_TYPE': 'SimpleCache'}) -cache.init_app(app) - -app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL) - - -# 确保全局变量在重新加载时不会丢失 -if 'analyzer' not in globals(): - try: - from app.analysis.stock_analyzer import StockAnalyzer - - analyzer = StockAnalyzer() - print("成功初始化全局StockAnalyzer实例") - except Exception as e: - print(f"初始化StockAnalyzer时出错: {e}", file=sys.stderr) - raise - -# 初始化模块实例 -fundamental_analyzer = FundamentalAnalyzer() -capital_flow_analyzer = CapitalFlowAnalyzer() -scenario_predictor = ScenarioPredictor(analyzer, os.getenv('OPENAI_API_KEY'), os.getenv('OPENAI_API_MODEL')) -stock_qa = StockQA(analyzer, os.getenv('OPENAI_API_KEY')) -risk_monitor = RiskMonitor(analyzer) -index_industry_analyzer = IndexIndustryAnalyzer(analyzer) -industry_analyzer = IndustryAnalyzer() - -news_scheduler = start_news_scheduler() # 接收返回的scheduler实例 - -# 线程本地存储 -thread_local = threading.local() - - -def get_analyzer(): - """获取线程本地的分析器实例""" - # 如果线程本地存储中没有分析器实例,创建一个新的 - if not hasattr(thread_local, 'analyzer'): - thread_local.analyzer = StockAnalyzer() - return thread_local.analyzer - - -# 配置日志 -# 从环境变量读取日志级别和文件路径 -log_level = os.getenv('LOG_LEVEL', 'INFO').upper() -log_file = os.getenv('LOG_FILE', 'data/logs/server.log') - -# 确保日志目录存在 -os.makedirs(os.path.dirname(log_file), exist_ok=True) - -# 创建日志格式化器 -formatter = logging.Formatter( - '[%(asctime)s] [%(process)d:%(thread)d] [%(levelname)s] [%(name)s:%(lineno)d] - %(message)s', - datefmt='%Y-%m-%d %H:%M:%S' -) - -# 配置根日志记录器 -root_logger = logging.getLogger() -root_logger.setLevel(log_level) - -# 清除所有现有的处理器,以避免重复日志 -if root_logger.hasHandlers(): - root_logger.handlers.clear() - -# 添加文件处理器 -file_handler = RotatingFileHandler(log_file, maxBytes=1024*1024*10, backupCount=5, encoding='utf-8') # 10MB -file_handler.setFormatter(formatter) -root_logger.addHandler(file_handler) - -# 添加控制台处理器 -console_handler = logging.StreamHandler(sys.stdout) -console_handler.setFormatter(formatter) -root_logger.addHandler(console_handler) - -# 将Flask的默认处理器移除,使其日志也遵循我们的配置 -from flask.logging import default_handler -app.logger.removeHandler(default_handler) -app.logger.propagate = True - -# 将 werkzeug 日志记录器的级别也设置为 .env 中定义的级别 -logging.getLogger('werkzeug').setLevel(log_level) - -app.logger.info(f"日志系统已初始化,级别: {log_level}, 文件: {log_file}") - - -# 扩展任务管理系统以支持不同类型的任务 -task_types = { - 'scan': 'market_scan', # 市场扫描任务 - 'analysis': 'stock_analysis', # 个股分析任务 - 'agent_analysis': 'agent_analysis', # 智能体分析任务 - 'etf_analysis': 'etf_analysis' # ETF分析任务 -} - -# 任务数据存储 -tasks = { - 'market_scan': {}, - 'stock_analysis': {}, - 'etf_analysis': {}, -} - - - -def get_task_store(task_type): - """获取指定类型的任务存储""" - return tasks.get(task_type, {}) - - -def generate_task_key(task_type, **params): - """生成任务键""" - if task_type == 'stock_analysis': - # 对于个股分析,使用股票代码和市场类型作为键 - return f"{params.get('stock_code')}_{params.get('market_type', 'A')}" - if task_type == 'etf_analysis': - return f"{params.get('etf_code')}" - return None # 其他任务类型不使用预生成的键 - - -def get_or_create_task(task_type, **params): - """获取或创建任务""" - store = get_task_store(task_type) - task_key = generate_task_key(task_type, **params) - - # 检查是否有现有任务 - if task_key and task_key in store: - task = store[task_key] - # 检查任务是否仍然有效 - if task['status'] in [TASK_PENDING, TASK_RUNNING]: - return task['id'], task, False - if task['status'] == TASK_COMPLETED and 'result' in task: - # 任务已完成且有结果,重用它 - return task['id'], task, False - - # 创建新任务 - task_id = generate_task_id() - task = { - 'id': task_id, - 'key': task_key, # 存储任务键以便以后查找 - 'type': task_type, - 'status': TASK_PENDING, - 'progress': 0, - 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'params': params - } - - with task_lock: - if task_key: - store[task_key] = task - store[task_id] = task - - return task_id, task, True - - -# 添加到web_server.py顶部 -# 任务管理系统 -scan_tasks = {} # 存储扫描任务的状态和结果 -task_lock = threading.Lock() # 用于线程安全操作 - - -# 自定义异常,用于任务取消 -class TaskCancelledException(Exception): - pass - -# 任务状态常量 -TASK_PENDING = 'pending' -TASK_RUNNING = 'running' -TASK_COMPLETED = 'completed' -TASK_FAILED = 'failed' -TASK_CANCELLED = 'cancelled' - - -def generate_task_id(): - """生成唯一的任务ID""" - import uuid - return str(uuid.uuid4()) - - -def start_market_scan_task_status(task_id, status, progress=None, result=None, error=None): - """更新任务状态 - 保持原有签名""" - with task_lock: - if task_id in scan_tasks: - task = scan_tasks[task_id] - task['status'] = status - if progress is not None: - task['progress'] = progress - if result is not None: - task['result'] = result - if error is not None: - task['error'] = error - task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - -def update_task_status(task_type, task_id, status, progress=None, result=None, error=None): - """更新任务状态""" - with task_lock: - task = None - if task_type == 'agent_analysis': - task = agent_session_manager.load_task(task_id) - else: - store = get_task_store(task_type) - if task_id in store: - task = store.get(task_id) - - if not task: - app.logger.warning(f"更新任务状态时未找到任务: {task_id} (类型: {task_type})") - return - - # 更新任务属性 - task['status'] = status - if progress is not None: - task['progress'] = progress - if result is not None: - if 'result' not in task or not isinstance(task['result'], dict): - task['result'] = {} - task['result'].update(result) - if error is not None: - task['error'] = error - task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - # 保存更新后的任务 - if task_type == 'agent_analysis': - agent_session_manager.save_task(task) - else: - # 更新键索引的任务 (如果适用) - store = get_task_store(task_type) - if 'key' in task and task.get('key') and task['key'] in store: - store[task['key']] = task - store[task_id] = task # also save by id - - -analysis_tasks = {} - - -def get_or_create_analysis_task(stock_code, market_type='A'): - """获取或创建个股分析任务""" - # 创建一个键,用于查找现有任务 - task_key = f"{stock_code}_{market_type}" - - with task_lock: - # 检查是否有现有任务 - for task_id, task in analysis_tasks.items(): - if task.get('key') == task_key: - # 检查任务是否仍然有效 - if task['status'] in [TASK_PENDING, TASK_RUNNING]: - return task_id, task, False - if task['status'] == TASK_COMPLETED and 'result' in task: - # 任务已完成且有结果,重用它 - return task_id, task, False - - # 创建新任务 - task_id = generate_task_id() - task = { - 'id': task_id, - 'key': task_key, - 'status': TASK_PENDING, - 'progress': 0, - 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'params': { - 'stock_code': stock_code, - 'market_type': market_type - } - } - - analysis_tasks[task_id] = task - - return task_id, task, True - - -def update_analysis_task(task_id, status, progress=None, result=None, error=None): - """更新个股分析任务状态""" - with task_lock: - if task_id in analysis_tasks: - task = analysis_tasks[task_id] - task['status'] = status - if progress is not None: - task['progress'] = progress - if result is not None: - task['result'] = result - if error is not None: - task['error'] = error - task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - -# 定义自定义JSON编码器 - - -# 在web_server.py中,更新convert_numpy_types函数以处理NaN值 - -# 将NumPy类型转换为Python原生类型的函数 -def convert_numpy_types(obj): - """递归地将字典和列表中的NumPy类型转换为Python原生类型""" - try: - import numpy as np - import math - - if isinstance(obj, dict): - return {convert_numpy_types(key): convert_numpy_types(value) for key, value in obj.items()} - elif isinstance(obj, list): - return [convert_numpy_types(item) for item in obj] - elif isinstance(obj, np.integer): - return int(obj) - elif isinstance(obj, np.floating): - # Handle NaN and Infinity specifically - if np.isnan(obj): - return None - elif np.isinf(obj): - return None if obj < 0 else 1e308 # Use a very large number for +Infinity - return float(obj) - elif isinstance(obj, np.ndarray): - return obj.tolist() - elif isinstance(obj, np.bool_): - return bool(obj) - # Handle Python's own float NaN and Infinity - elif isinstance(obj, float): - if math.isnan(obj): - return None - elif math.isinf(obj): - return None - return obj - # 添加对date和datetime类型的处理 - elif isinstance(obj, (date, datetime)): - return obj.isoformat() - else: - return obj - except ImportError: - # 如果没有安装numpy,但需要处理date和datetime - import math - if isinstance(obj, dict): - return {convert_numpy_types(key): convert_numpy_types(value) for key, value in obj.items()} - elif isinstance(obj, list): - return [convert_numpy_types(item) for item in obj] - elif isinstance(obj, (date, datetime)): - return obj.isoformat() - # Handle Python's own float NaN and Infinity - elif isinstance(obj, float): - if math.isnan(obj): - return None - elif math.isinf(obj): - return None - return obj - return obj - - -# 同样更新 NumpyJSONEncoder 类 -class NumpyJSONEncoder(json.JSONEncoder): - def default(self, obj): - # Handle LangChain message objects first - try: - from langchain_core.messages import BaseMessage - if isinstance(obj, BaseMessage): - return {"type": obj.__class__.__name__, "content": str(obj.content)} - except ImportError: - pass # If langchain is not installed, just proceed - - # For NumPy data types - try: - import numpy as np - import math - if isinstance(obj, np.integer): - return int(obj) - elif isinstance(obj, np.floating): - # Handle NaN and Infinity specifically - if np.isnan(obj): - return None - elif np.isinf(obj): - return None - return float(obj) - elif isinstance(obj, np.ndarray): - return obj.tolist() - elif isinstance(obj, np.bool_): - return bool(obj) - # Handle Python's own float NaN and Infinity - elif isinstance(obj, float): - if math.isnan(obj): - return None - elif math.isinf(obj): - return None - return obj - except ImportError: - # Handle Python's own float NaN and Infinity if numpy is not available - import math - if isinstance(obj, float): - if math.isnan(obj): - return None - elif math.isinf(obj): - return None - - # 添加对date和datetime类型的处理 - if isinstance(obj, (date, datetime)): - return obj.isoformat() - - # Fallback for other non-serializable types - try: - return super(NumpyJSONEncoder, self).default(obj) - except TypeError: - # For LangChain messages or other complex objects, convert to string - return str(obj) - - -# Helper to convert LangChain messages to JSON serializable format -def convert_messages_to_dict(obj): - """Recursively convert LangChain message objects to dictionaries.""" - # Check if langchain_core is available and if the object is a message - try: - from langchain_core.messages import BaseMessage - is_message = isinstance(obj, BaseMessage) - except ImportError: - is_message = False - - if is_message: - # Base case: convert message object to dict - return {"type": obj.__class__.__name__, "content": str(obj.content)} - elif isinstance(obj, dict): - # Recursive step for dictionaries - return {k: convert_messages_to_dict(v) for k, v in obj.items()} - elif isinstance(obj, list): - # Recursive step for lists - return [convert_messages_to_dict(elem) for elem in obj] - else: - # Return the object as is if no conversion is needed - return obj - - -# 使用我们的编码器的自定义 jsonify 函数 -def custom_jsonify(data): - return app.response_class( - json.dumps(convert_numpy_types(data), cls=NumpyJSONEncoder), - mimetype='application/json' - ) - - -# 保持API兼容的路由 -@app.route('/') -def index_redirect(): - # 返回一个简单的HTML页面,包含跳转按钮 - return """ - - - - 跳转页面 - - - - - - - """ - - - -@app.route('/index', methods=['GET']) -def index(): - return render_template('index.html') - - -@app.route('/analyze', methods=['POST']) -def analyze(): - try: - data = request.json - stock_codes = data.get('stock_codes', []) - market_type = data.get('market_type', 'A') - - if not stock_codes: - return jsonify({'error': '请输入代码'}), 400 - - app.logger.info(f"分析股票请求: {stock_codes}, 市场类型: {market_type}") - - # 设置最大处理时间,每只股票10秒 - max_time_per_stock = 10 # 秒 - max_total_time = max(30, min(60, len(stock_codes) * max_time_per_stock)) # 至少30秒,最多60秒 - - start_time = time.time() - results = [] - - for stock_code in stock_codes: - try: - # 检查是否已超时 - if time.time() - start_time > max_total_time: - app.logger.warning(f"分析股票请求已超过{max_total_time}秒,提前返回已处理的{len(results)}只股票") - break - - # 使用线程本地缓存的分析器实例 - current_analyzer = get_analyzer() - result = current_analyzer.quick_analyze_stock(stock_code.strip(), market_type) - - app.logger.info( - f"分析结果: 股票={stock_code}, 名称={result.get('stock_name', '未知')}, 行业={result.get('industry', '未知')}") - results.append(result) - except Exception as e: - app.logger.error(f"分析股票 {stock_code} ��出错: {str(e)}") - results.append({ - 'stock_code': stock_code, - 'error': str(e), - 'stock_name': '分析失败', - 'industry': '未知' - }) - - return jsonify({'results': results}) - except Exception as e: - app.logger.error(f"分析股票时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/api/north_flow_history', methods=['POST']) -def api_north_flow_history(): - try: - data = request.json - stock_code = data.get('stock_code') - days = data.get('days', 10) # 默认为10天,对应前端的默认选项 - - # 计算 end_date 为当前时间 - end_date = datetime.now().strftime('%Y%m%d') - - # 计算 start_date 为 end_date 减去指定的天数 - start_date = (datetime.now() - timedelta(days=int(days))).strftime('%Y%m%d') - - if not stock_code: - return jsonify({'error': '请提供股票代码'}), 400 - - # 调用北向资金历史数据方法 - - analyzer = CapitalFlowAnalyzer() - result = analyzer.get_north_flow_history(stock_code, start_date, end_date) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"获取北向资金历史数据出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/search_us_stocks', methods=['GET']) -def search_us_stocks(): - try: - keyword = request.args.get('keyword', '') - if not keyword: - return jsonify({'error': '请输入搜索关键词'}), 400 - - results = us_stock_service.search_us_stocks(keyword) - return jsonify({'results': results}) - - except Exception as e: - app.logger.error(f"搜索美股代码时出错: {str(e)}") - return jsonify({'error': str(e)}), 500 - - -# 新增可视化分析页面路由 -@app.route('/dashboard') -def dashboard(): - return render_template('dashboard.html') - - -@app.route('/stock_detail/') -def stock_detail(stock_code): - market_type = request.args.get('market_type', 'A') - return render_template('stock_detail.html', stock_code=stock_code, market_type=market_type) - - -@app.route('/portfolio') -def portfolio(): - return render_template('portfolio.html') - - -@app.route('/market_scan') -def market_scan(): - return render_template('market_scan.html') - - -# 基本面分析页面 -@app.route('/fundamental') -def fundamental(): - return render_template('fundamental.html') - - -# 资金流向页面 -@app.route('/capital_flow') -def capital_flow(): - return render_template('capital_flow.html') - - -# 情景预测页面 -@app.route('/scenario_predict') -def scenario_predict(): - return render_template('scenario_predict.html') - - -# 风险监控页面 -@app.route('/risk_monitor') -def risk_monitor_page(): - return render_template('risk_monitor.html') - - -# 智能问答页面 -@app.route('/qa') -def qa_page(): - return render_template('qa.html') - - -# 行业分析页面 -@app.route('/industry_analysis') -def industry_analysis(): - return render_template('industry_analysis.html') - - - -# 智能体分析页面 -@app.route('/agent_analysis') -def agent_analysis_page(): - return render_template('agent_analysis.html') - - -@app.route('/etf_analysis') -def etf_analysis_page(): - return render_template('etf_analysis.html') - - - - - -def make_cache_key_with_stock(): - """创建包含股票代码的自定义缓存键""" - path = request.path - - # 从请求体中获取股票代码 - stock_code = None - if request.is_json: - stock_code = request.json.get('stock_code') - - # 构建包含股票代码的键 - if stock_code: - return f"{path}_{stock_code}" - else: - return path - - -@app.route('/api/start_stock_analysis', methods=['POST']) -def start_stock_analysis(): - """启动个股分析任务""" - try: - data = request.json - stock_code = data.get('stock_code') - market_type = data.get('market_type', 'A') - - if not stock_code: - return jsonify({'error': '请输入股票代码'}), 400 - - app.logger.info(f"准备分析股票: {stock_code}") - - # 获取或创建任务 - task_id, task, is_new = get_or_create_task( - 'stock_analysis', - stock_code=stock_code, - market_type=market_type - ) - - # 如果是已完成的任务,直接返回结果 - if task['status'] == TASK_COMPLETED and 'result' in task: - app.logger.info(f"使用缓存的分析结果: {stock_code}") - return jsonify({ - 'task_id': task_id, - 'status': task['status'], - 'result': task['result'] - }) - - # 如果是新创���的任务,启动后台处理 - if is_new: - app.logger.info(f"创建新的分析任务: {task_id}") - - # 启动后台线程执行分析 - def run_analysis(): - try: - update_task_status('stock_analysis', task_id, TASK_RUNNING, progress=10) - - # 执行分析 - result = analyzer.perform_enhanced_analysis(stock_code, market_type) - - # 更新任务状态为完成 - update_task_status('stock_analysis', task_id, TASK_COMPLETED, progress=100, result=result) - app.logger.info(f"分析任务 {task_id} 完成") - - except Exception as e: - app.logger.error(f"分析任务 {task_id} 失败: {str(e)}") - app.logger.error(traceback.format_exc()) - update_task_status('stock_analysis', task_id, TASK_FAILED, error=str(e)) - - # 启动后台线程 - thread = threading.Thread(target=run_analysis) - thread.daemon = True - thread.start() - - # 返回任务ID和状态 - return jsonify({ - 'task_id': task_id, - 'status': task['status'], - 'message': f'已启动分析任务: {stock_code}' - }) - - except Exception as e: - app.logger.error(f"启动个股分析任务时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/api/analysis_status/', methods=['GET']) -def get_analysis_status(task_id): - """获取个股分析任务状态""" - store = get_task_store('stock_analysis') - with task_lock: - if task_id not in store: - return jsonify({'error': '找不到指定的分析任务'}), 404 - - task = store[task_id] - - # 基本状态信息 - status = { - 'id': task['id'], - 'status': task['status'], - 'progress': task.get('progress', 0), - 'created_at': task['created_at'], - 'updated_at': task['updated_at'] - } - - # 如果任务完成,包含结果 - if task['status'] == TASK_COMPLETED and 'result' in task: - status['result'] = task['result'] - - # 如果任务失败,包含错误信息 - if task['status'] == TASK_FAILED and 'error' in task: - status['error'] = task['error'] - - return custom_jsonify(status) - - -@app.route('/api/cancel_analysis/', methods=['POST']) -def cancel_analysis(task_id): - """取消个股分析任务""" - store = get_task_store('stock_analysis') - with task_lock: - if task_id not in store: - return jsonify({'error': '找不到指定的分析任务'}), 404 - - task = store[task_id] - - if task['status'] in [TASK_COMPLETED, TASK_FAILED]: - return jsonify({'message': '任务已完成或失败,无法取消'}) - - # 更新状态为失败 - task['status'] = TASK_FAILED - task['error'] = '用户取消任务' - task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - # 更新键索引的任务 - if 'key' in task and task['key'] in store: - store[task['key']] = task - - return jsonify({'message': '任务已取消'}) - - -# ETF 分析路由 -@app.route('/api/start_etf_analysis', methods=['POST']) -def start_etf_analysis(): - """启动ETF分析任务""" - try: - data = request.json - etf_code = data.get('etf_code') - - if not etf_code: - return jsonify({'error': '请输入ETF代码'}), 400 - - app.logger.info(f"准备分析ETF: {etf_code}") - - task_id, task, is_new = get_or_create_task( - 'etf_analysis', - etf_code=etf_code - ) - - if task['status'] == TASK_COMPLETED and 'result' in task: - app.logger.info(f"使用缓存的ETF分析结果: {etf_code}") - return jsonify({ - 'task_id': task_id, - 'status': task['status'], - 'result': task['result'] - }) - - if is_new: - app.logger.info(f"创建新的ETF分析任务: {task_id}") - - def run_etf_analysis(): - try: - update_task_status('etf_analysis', task_id, TASK_RUNNING, progress=10) - - # 使用一个新的 EtfAnalyzer 实例, 并传入stock_analyzer - etf_analyzer_instance = EtfAnalyzer(etf_code, analyzer) - result = etf_analyzer_instance.run_analysis() - - update_task_status('etf_analysis', task_id, TASK_COMPLETED, progress=100, result=result) - app.logger.info(f"ETF分析任务 {task_id} 完成") - - except Exception as e: - app.logger.error(f"ETF分析任务 {task_id} 失败: {str(e)}") - app.logger.error(traceback.format_exc()) - update_task_status('etf_analysis', task_id, TASK_FAILED, error=str(e)) - - thread = threading.Thread(target=run_etf_analysis) - thread.daemon = True - thread.start() - - return jsonify({ - 'task_id': task_id, - 'status': task['status'], - 'message': f'已启动ETF分析任务: {etf_code}' - }) - - except Exception as e: - app.logger.error(f"启动ETF分析任务时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/api/etf_analysis_status/', methods=['GET']) -def get_etf_analysis_status(task_id): - """获取ETF分析任务状态""" - store = get_task_store('etf_analysis') - with task_lock: - if task_id not in store: - return jsonify({'error': '找不到指定的ETF分析任务'}), 404 - - task = store[task_id] - - status = { - 'id': task['id'], - 'status': task['status'], - 'progress': task.get('progress', 0), - 'created_at': task['created_at'], - 'updated_at': task['updated_at'] - } - - if task['status'] == TASK_COMPLETED and 'result' in task: - status['result'] = task['result'] - - if task['status'] == TASK_FAILED and 'error' in task: - status['error'] = task['error'] - - return custom_jsonify(status) - - -# 保留原有API用于向后兼容 -@app.route('/api/enhanced_analysis', methods=['POST']) -def enhanced_analysis(): - """原增强分析API的向后兼容版本""" - try: - data = request.json - stock_code = data.get('stock_code') - market_type = data.get('market_type', 'A') - - if not stock_code: - return custom_jsonify({'error': '请输入股票代码'}), 400 - - # 调用新的任务系统,但模拟同步行为 - # 这会导致和之前一样的超时问题,但保持兼容 - timeout = 300 - start_time = time.time() - - # 获取或创建任务 - task_id, task, is_new = get_or_create_task( - 'stock_analysis', - stock_code=stock_code, - market_type=market_type - ) - - # 如果是已完成的任务,直接返回结果 - if task['status'] == TASK_COMPLETED and 'result' in task: - app.logger.info(f"使用缓存的分析结果: {stock_code}") - return custom_jsonify({'result': task['result']}) - - # 启动分析(如果是新任务) - if is_new: - # 同步执行分析 - try: - result = analyzer.perform_enhanced_analysis(stock_code, market_type) - update_task_status('stock_analysis', task_id, TASK_COMPLETED, progress=100, result=result) - app.logger.info(f"分析完成: {stock_code},耗时 {time.time() - start_time:.2f} 秒") - return custom_jsonify({'result': result}) - except Exception as e: - app.logger.error(f"分析过程中出错: {str(e)}") - update_task_status('stock_analysis', task_id, TASK_FAILED, error=str(e)) - return custom_jsonify({'error': f'分析过程中出错: {str(e)}'}), 500 - else: - # 已存在正在处理的任务,等待其完成 - max_wait = timeout - (time.time() - start_time) - wait_interval = 0.5 - waited = 0 - - while waited < max_wait: - with task_lock: - current_task = store[task_id] - if current_task['status'] == TASK_COMPLETED and 'result' in current_task: - return custom_jsonify({'result': current_task['result']}) - if current_task['status'] == TASK_FAILED: - error = current_task.get('error', '任务失败,无详细信息') - return custom_jsonify({'error': error}), 500 - - time.sleep(wait_interval) - waited += wait_interval - - # 超时 - return custom_jsonify({'error': '处理超时,请稍后重试'}), 504 - - except Exception as e: - app.logger.error(f"执行增强版分析时出错: {traceback.format_exc()}") - return custom_jsonify({'error': str(e)}), 500 - - -# 添加在web_server.py主代码中 -@app.errorhandler(404) -def not_found(error): - """处理404错误""" - if request.path.startswith('/api/'): - # 为API请求返回JSON格式的错误 - return jsonify({ - 'error': '找不到请求的API端点', - 'path': request.path, - 'method': request.method - }), 404 - # 为网页请求返回HTML错误页 - return render_template('error.html', error_code=404, message="找不到请求的页面"), 404 - - -@app.errorhandler(500) -def server_error(error): - """处理500错误""" - app.logger.error(f"服务器错误: {str(error)}") - if request.path.startswith('/api/'): - # 为API请求返回JSON格式的错误 - return jsonify({ - 'error': '服务器内部错误', - 'message': str(error) - }), 500 - # 为网页请求返回HTML错误页 - return render_template('error.html', error_code=500, message="服务器内部错误"), 500 - - -# Update the get_stock_data function in web_server.py to handle date formatting properly -@app.route('/api/stock_data', methods=['GET']) -@cache.cached(timeout=300, query_string=True) -def get_stock_data(): - try: - stock_code = request.args.get('stock_code') - market_type = request.args.get('market_type', 'A') - period = request.args.get('period', '1y') # 默认1年 - - if not stock_code: - return custom_jsonify({'error': '请提供股票代码'}), 400 - - # 根据period计算start_date - end_date = datetime.now().strftime('%Y%m%d') - if period == '1m': - start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d') - elif period == '3m': - start_date = (datetime.now() - timedelta(days=90)).strftime('%Y%m%d') - elif period == '6m': - start_date = (datetime.now() - timedelta(days=180)).strftime('%Y%m%d') - elif period == '1y': - start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d') - else: - start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d') - - # 获取股票历史数据 - app.logger.info( - f"获取股票 {stock_code} 的历史数据,市场: {market_type}, 起始日期: {start_date}, 结束日期: {end_date}") - df = analyzer.get_stock_data(stock_code, market_type, start_date, end_date) - - # 检查数据是否为空 - if df.empty: - app.logger.warning(f"股票 {stock_code} 的数据为空") - return custom_jsonify({'error': '未找到股票数据'}), 404 - - # 计算技术指标 - app.logger.info(f"计算股票 {stock_code} 的技术指标") - df = analyzer.calculate_indicators(df) - - # 将DataFrame转为JSON格式 - app.logger.info(f"将数据转换为JSON格式,行数: {len(df)}") - - # 确保日期列是字符串格式 - 修复缓存问题 - if 'date' in df.columns: - try: - if pd.api.types.is_datetime64_any_dtype(df['date']): - df['date'] = df['date'].dt.strftime('%Y-%m-%d') - else: - df = df.copy() - df['date'] = pd.to_datetime(df['date'], errors='coerce') - df['date'] = df['date'].dt.strftime('%Y-%m-%d') - except Exception as e: - app.logger.error(f"处理日期列时出错: {str(e)}") - df['date'] = df['date'].astype(str) - - # 将NaN值替换为None - df = df.replace({np.nan: None, np.inf: None, -np.inf: None}) - - records = df.to_dict('records') - - app.logger.info(f"数据处理完成,返回 {len(records)} 条记录") - return custom_jsonify({'data': records}) - except Exception as e: - app.logger.error(f"获取股票数据时出错: {str(e)}") - app.logger.error(traceback.format_exc()) - return custom_jsonify({'error': str(e)}), 500 - - -# @app.route('/api/market_scan', methods=['POST']) -# def api_market_scan(): -# try: -# data = request.json -# stock_list = data.get('stock_list', []) -# min_score = data.get('min_score', 60) -# market_type = data.get('market_type', 'A') - -# if not stock_list: -# return jsonify({'error': '请提供股票列表'}), 400 - -# # 限制股票数量,避免过长处理时间 -# if len(stock_list) > 100: -# app.logger.warning(f"股票列表过长 ({len(stock_list)}只),截取前100只") -# stock_list = stock_list[:100] - -# # 执行市场扫描 -# app.logger.info(f"开始扫描 {len(stock_list)} 只股票,最低分数: {min_score}") - -# # 使用线程池优化处理 -# results = [] -# max_workers = min(10, len(stock_list)) # 最多10个工作线程 - -# # 设置较长的超时时间 -# timeout = 300 # 5分钟 - -# def scan_thread(): -# try: -# return analyzer.scan_market(stock_list, min_score, market_type) -# except Exception as e: -# app.logger.error(f"扫描线程出错: {str(e)}") -# return [] - -# thread = threading.Thread(target=lambda: results.append(scan_thread())) -# thread.start() -# thread.join(timeout) - -# if thread.is_alive(): -# app.logger.error(f"市场扫描超时���已扫描 {len(stock_list)} 只股票超过 {timeout} 秒") -# return custom_jsonify({'error': '扫描超时,请减少股票数量或稍后再试'}), 504 - -# if not results or not results[0]: -# app.logger.warning("扫描结果为空") -# return custom_jsonify({'results': []}) - -# scan_results = results[0] -# app.logger.info(f"扫描完成,找到 {len(scan_results)} 只符合条件的股票") - -# # 使用自定义JSON格式处理NumPy数据类型 -# return custom_jsonify({'results': scan_results}) -# except Exception as e: -# app.logger.error(f"执行市场扫描时出错: {traceback.format_exc()}") -# return custom_jsonify({'error': str(e)}), 500 - -@app.route('/api/start_market_scan', methods=['POST']) -def start_market_scan(): - """启动市场扫描任务""" - try: - data = request.json - stock_list = data.get('stock_list', []) - min_score = data.get('min_score', 60) - market_type = data.get('market_type', 'A') - - if not stock_list: - return jsonify({'error': '请提供股票列表'}), 400 - - # 限制股票数量,避免过长处理时间 - if len(stock_list) > 100: - app.logger.warning(f"股票列表过长 ({len(stock_list)}只),截取前100只") - stock_list = stock_list[:100] - - # 创建新任务 - task_id = generate_task_id() - task = { - 'id': task_id, - 'status': TASK_PENDING, - 'progress': 0, - 'total': len(stock_list), - 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'params': { - 'stock_list': stock_list, - 'min_score': min_score, - 'market_type': market_type - } - } - - with task_lock: - scan_tasks[task_id] = task - - # 启动后台线程执行扫描 - def run_scan(): - try: - start_market_scan_task_status(task_id, TASK_RUNNING) - - # 执行分批处理 - results = [] - total = len(stock_list) - batch_size = 10 - - for i in range(0, total, batch_size): - if task_id not in scan_tasks or scan_tasks[task_id]['status'] != TASK_RUNNING: - # 任务被取消 - app.logger.info(f"扫描任务 {task_id} 被取消") - return - - batch = stock_list[i:i + batch_size] - batch_results = [] - - for stock_code in batch: - try: - report = analyzer.quick_analyze_stock(stock_code, market_type) - if report['score'] >= min_score: - batch_results.append(report) - except Exception as e: - app.logger.error(f"分析股票 {stock_code} 时出错: {str(e)}") - continue - - results.extend(batch_results) - - # 更新进度 - progress = min(100, int((i + len(batch)) / total * 100)) - start_market_scan_task_status(task_id, TASK_RUNNING, progress=progress) - - # 按得分排序 - results.sort(key=lambda x: x['score'], reverse=True) - - # 更新任务状态为完成 - start_market_scan_task_status(task_id, TASK_COMPLETED, progress=100, result=results) - app.logger.info(f"扫描任务 {task_id} 完成,找到 {len(results)} 只符合条件的股票") - - except Exception as e: - app.logger.error(f"扫描任务 {task_id} 失败: {str(e)}") - app.logger.error(traceback.format_exc()) - start_market_scan_task_status(task_id, TASK_FAILED, error=str(e)) - - # 启动后台线程 - thread = threading.Thread(target=run_scan) - thread.daemon = True - thread.start() - - return jsonify({ - 'task_id': task_id, - 'status': 'pending', - 'message': f'已启动扫描任务,正在处理 {len(stock_list)} 只股票' - }) - - except Exception as e: - app.logger.error(f"启动市场扫描任务时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/api/scan_status/', methods=['GET']) -def get_scan_status(task_id): - """获取扫描任务状态""" - with task_lock: - if task_id not in scan_tasks: - return jsonify({'error': '找不到指定的扫描任务'}), 404 - - task = scan_tasks[task_id] - - # 基本状态信息 - status = { - 'id': task['id'], - 'status': task['status'], - 'progress': task.get('progress', 0), - 'total': task.get('total', 0), - 'created_at': task['created_at'], - 'updated_at': task['updated_at'] - } - - # 如果任务完成,包含结果 - if task['status'] == TASK_COMPLETED and 'result' in task: - status['result'] = task['result'] - - # 如果任务失败,包含错误信息 - if task['status'] == TASK_FAILED and 'error' in task: - status['error'] = task['error'] - - return custom_jsonify(status) - - -@app.route('/api/cancel_scan/', methods=['POST']) -def cancel_scan(task_id): - """取消扫描任务""" - with task_lock: - if task_id not in scan_tasks: - return jsonify({'error': '找不到指定的扫描任务'}), 404 - - task = scan_tasks[task_id] - - if task['status'] in [TASK_COMPLETED, TASK_FAILED]: - return jsonify({'message': '任务已完成或失败,无法取消'}) - - # 更新状态为失败 - task['status'] = TASK_FAILED - task['error'] = '用户取消任务' - task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - - return jsonify({'message': '任务已取消'}) - - -@app.route('/api/index_stocks', methods=['GET']) -def get_index_stocks(): - """获取指数成分股""" - try: - import akshare as ak - index_code = request.args.get('index_code', '000300') # 默认沪深300 - - # 获取指数成分股 - app.logger.info(f"获取指数 {index_code} 成分股") - if index_code == '000300': - # 沪深300成分股 - stocks = ak.index_stock_cons_weight_csindex(symbol="000300") - elif index_code == '000905': - # 中证500成分股 - stocks = ak.index_stock_cons_weight_csindex(symbol="000905") - elif index_code == '000852': - # 中证1000成分股 - stocks = ak.index_stock_cons_weight_csindex(symbol="000852") - elif index_code == '000001': - # 上证指数 - stocks = ak.index_stock_cons_weight_csindex(symbol="000001") - else: - return jsonify({'error': '不支持的指数代码'}), 400 - - # 提取股票代码列表 - stock_list = stocks['成分券代码'].tolist() if '成分券代码' in stocks.columns else [] - app.logger.info(f"找到 {len(stock_list)} 只成分股") - - return jsonify({'stock_list': stock_list}) - except Exception as e: - app.logger.error(f"获取指数成分股时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/api/industry_stocks', methods=['GET']) -def get_industry_stocks(): - """获取行业成分股""" - try: - import akshare as ak - industry = request.args.get('industry', '') - - if not industry: - return jsonify({'error': '请提供行业名称'}), 400 - - # 获取行业成分股 - app.logger.info(f"获取 {industry} 行业成分股") - stocks = ak.stock_board_industry_cons_em(symbol=industry) - - # 提取股票代码列表 - stock_list = stocks['代码'].tolist() if '代码' in stocks.columns else [] - app.logger.info(f"找到 {len(stock_list)} 只 {industry} 行业股票") - - return jsonify({'stock_list': stock_list}) - except Exception as e: - app.logger.error(f"获取行业成分股时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 添加到web_server.py -def clean_old_tasks(): - """清理旧的扫描任务""" - with task_lock: - now = datetime.now() - to_delete = [] - - for task_id, task in scan_tasks.items(): - # 解析更新时间 - try: - updated_at = datetime.strptime(task['updated_at'], '%Y-%m-%d %H:%M:%S') - # 如果任务完成或失败且超过1小时,或者任务状态异常且超过3小时,清理它 - if ((task['status'] in [TASK_COMPLETED, TASK_FAILED] and - (now - updated_at).total_seconds() > 3600) or - ((now - updated_at).total_seconds() > 10800)): - to_delete.append(task_id) - except: - # 日期解析错误,添加到删除列表 - to_delete.append(task_id) - - # 删除旧任务 - for task_id in to_delete: - del scan_tasks[task_id] - - return len(to_delete) - - -# 修改 run_task_cleaner 函数,使其每 5 分钟运行一次并在 16:30 左右清理所有缓存 -def run_task_cleaner(): - """定期运行任务清理,并在每天 16:30 左右清理所有缓存""" - while True: - try: - now = datetime.now() - # 判断是否在收盘时间附近(16:25-16:35) - is_market_close_time = (now.hour == 16 and 25 <= now.minute <= 35) - - cleaned = clean_old_tasks() - - # 如果是收盘时间,清理所有缓存 - if is_market_close_time: - # 清理分析器的数据缓存 - analyzer.data_cache.clear() - - # 清理 Flask 缓存 - cache.clear() - - # 清理任务存储 - with task_lock: - for task_type in tasks: - task_store = tasks[task_type] - completed_tasks = [task_id for task_id, task in task_store.items() - if task['status'] == TASK_COMPLETED] - for task_id in completed_tasks: - del task_store[task_id] - - app.logger.info("市场收盘时间检测到,已清理所有缓存数据") - - if cleaned > 0: - app.logger.info(f"清理了 {cleaned} 个旧的扫描任务") - except Exception as e: - app.logger.error(f"任务清理出错: {str(e)}") - - # 每 5 分钟运行一次,而不是每小时 - time.sleep(600) - - -# 基本面分析路由 -@app.route('/api/fundamental_analysis', methods=['POST']) -def api_fundamental_analysis(): - try: - data = request.json - stock_code = data.get('stock_code') - - if not stock_code: - return jsonify({'error': '请提供股票代码'}), 400 - - # 获取基本面分析结果 - result = fundamental_analyzer.calculate_fundamental_score(stock_code) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"基本面分析出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 资金流向分析路由 -# Add to web_server.py - -# 获取概念资金流向的API端点 -@app.route('/api/concept_fund_flow', methods=['GET']) -def api_concept_fund_flow(): - try: - period = request.args.get('period', '10日排行') # Default to 10-day ranking - - # Get concept fund flow data - result = capital_flow_analyzer.get_concept_fund_flow(period) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"Error getting concept fund flow: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 获取个股资金流向排名的API端点 -@app.route('/api/individual_fund_flow_rank', methods=['GET']) -def api_individual_fund_flow_rank(): - try: - period = request.args.get('period', '10日') # Default to today - - # Get individual fund flow ranking data - result = capital_flow_analyzer.get_individual_fund_flow_rank(period) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"Error getting individual fund flow ranking: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 获取个股资金流向的API端点 -@app.route('/api/individual_fund_flow', methods=['GET']) -def api_individual_fund_flow(): - try: - stock_code = request.args.get('stock_code') - market_type = request.args.get('market_type', '') # Auto-detect if not provided - re_date = request.args.get('period-select') - - if not stock_code: - return jsonify({'error': 'Stock code is required'}), 400 - - # Get individual fund flow data - result = capital_flow_analyzer.get_individual_fund_flow(stock_code, market_type, re_date) - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"Error getting individual fund flow: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 获取板块内股票的API端点 -@app.route('/api/sector_stocks', methods=['GET']) -def api_sector_stocks(): - try: - sector = request.args.get('sector') - - if not sector: - return jsonify({'error': 'Sector name is required'}), 400 - - # Get sector stocks data - result = capital_flow_analyzer.get_sector_stocks(sector) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"Error getting sector stocks: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# Update the existing capital flow API endpoint -@app.route('/api/capital_flow', methods=['POST']) -def api_capital_flow(): - try: - data = request.json - stock_code = data.get('stock_code') - market_type = data.get('market_type', '') # Auto-detect if not provided - - if not stock_code: - return jsonify({'error': 'Stock code is required'}), 400 - - # Calculate capital flow score - result = capital_flow_analyzer.calculate_capital_flow_score(stock_code, market_type) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"Error calculating capital flow score: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 情景预测路由 -@app.route('/api/scenario_predict', methods=['POST']) -def api_scenario_predict(): - try: - data = request.json - stock_code = data.get('stock_code') - market_type = data.get('market_type', 'A') - days = data.get('days', 60) - - if not stock_code: - return jsonify({'error': '请提供股票代码'}), 400 - - # 获取情景预测结果 - result = scenario_predictor.generate_scenarios(stock_code, market_type, days) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"情景预测出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 智能问答路由 -@app.route('/api/qa', methods=['POST']) -def api_qa(): - try: - data = request.json - stock_code = data.get('stock_code') - question = data.get('question') - market_type = data.get('market_type', 'A') - - if not stock_code or not question: - return jsonify({'error': '请提供股票代码和问题'}), 400 - - # 获取智能问答结果 - result = stock_qa.answer_question(stock_code, question, market_type) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"智能问答出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 风险分析路由 -@app.route('/api/risk_analysis', methods=['POST']) -def api_risk_analysis(): - try: - data = request.json - stock_code = data.get('stock_code') - market_type = data.get('market_type', 'A') - - if not stock_code: - return jsonify({'error': '请提供股票代码'}), 400 - - # 获取风险分析结果 - result = risk_monitor.analyze_stock_risk(stock_code, market_type) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"风险分析出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 投资组合风险分析路由 -@app.route('/api/portfolio_risk', methods=['POST']) -def api_portfolio_risk(): - try: - data = request.json - portfolio = data.get('portfolio', []) - - if not portfolio: - return jsonify({'error': '请提供投资组合'}), 400 - - # 获取投资组合风险分析结果 - result = risk_monitor.analyze_portfolio_risk(portfolio) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"投资组合风险分析出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 指数分析路由 -@app.route('/api/index_analysis', methods=['GET']) -def api_index_analysis(): - try: - index_code = request.args.get('index_code') - limit = int(request.args.get('limit', 30)) - - if not index_code: - return jsonify({'error': '请提供指数代码'}), 400 - - # 获取指数分析结果 - result = index_industry_analyzer.analyze_index(index_code, limit) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"指数分析出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 行业分析路由 -@app.route('/api/industry_analysis', methods=['GET']) -def api_industry_analysis(): - try: - industry = request.args.get('industry') - limit = int(request.args.get('limit', 30)) - - if not industry: - return jsonify({'error': '请提供行业名称'}), 400 - - # 获取行业分析结果 - result = index_industry_analyzer.analyze_industry(industry, limit) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"行业分析出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/api/industry_fund_flow', methods=['GET']) -def api_industry_fund_flow(): - """获取行业资金流向数据""" - try: - symbol = request.args.get('symbol', '即时') - - result = industry_analyzer.get_industry_fund_flow(symbol) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"获取行业资金流向数据出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/api/industry_detail', methods=['GET']) -def api_industry_detail(): - """获取行业详细信息""" - try: - industry = request.args.get('industry') - - if not industry: - return jsonify({'error': '请提供行业名称'}), 400 - - result = industry_analyzer.get_industry_detail(industry) - - app.logger.info(f"返回前 (result):{result}") - if not result: - return jsonify({'error': f'未找到行业 {industry} 的详细信息'}), 404 - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"获取行业详细信息出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 行业比较路由 -@app.route('/api/industry_compare', methods=['GET']) -def api_industry_compare(): - try: - limit = int(request.args.get('limit', 10)) - - # 获取行业比较结果 - result = index_industry_analyzer.compare_industries(limit) - - return custom_jsonify(result) - except Exception as e: - app.logger.error(f"行业比较出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 保存股票分析结果到数据库 -def save_analysis_result(stock_code, market_type, result): - """保存分析结果到数据库""" - if not USE_DATABASE: - return - - try: - session = get_session() - - # 创建新的分析结果记录 - analysis = AnalysisResult( - stock_code=stock_code, - market_type=market_type, - score=result.get('scores', {}).get('total', 0), - recommendation=result.get('recommendation', {}).get('action', ''), - technical_data=result.get('technical_analysis', {}), - fundamental_data=result.get('fundamental_data', {}), - capital_flow_data=result.get('capital_flow_data', {}), - ai_analysis=result.get('ai_analysis', '') - ) - - session.add(analysis) - session.commit() - - except Exception as e: - app.logger.error(f"保存分析结果到数据库时出错: {str(e)}") - if session: - session.rollback() - finally: - if session: - session.close() - - -# 从数据库获取历史分析结果 -@app.route('/api/history_analysis', methods=['GET']) -def get_history_analysis(): - """获取股票的历史分析结果""" - if not USE_DATABASE: - return jsonify({'error': '数据库功能未启用'}), 400 - - stock_code = request.args.get('stock_code') - limit = int(request.args.get('limit', 10)) - - if not stock_code: - return jsonify({'error': '请提供股票代码'}), 400 - - try: - session = get_session() - - # 查询历史分析结果 - results = session.query(AnalysisResult) \ - .filter(AnalysisResult.stock_code == stock_code) \ - .order_by(AnalysisResult.analysis_date.desc()) \ - .limit(limit) \ - .all() - - # 转换为字典列表 - history = [result.to_dict() for result in results] - - return jsonify({'history': history}) - - except Exception as e: - app.logger.error(f"获取历史分析结果时出错: {str(e)}") - return jsonify({'error': str(e)}), 500 - finally: - if session: - session.close() - -# 添加新闻API端点 -# 添加到web_server.py文件中 -@app.route('/api/latest_news', methods=['GET']) -def get_latest_news(): - try: - days = int(request.args.get('days', 1)) # 默认获取1天的新闻 - limit = int(request.args.get('limit', 1000)) # 默认最多获取1000条 - only_important = request.args.get('important', '0') == '1' # 是否只看重要新闻 - news_type = request.args.get('type', 'all') # 新闻类型,可选值: all, hotspot - - # 从news_fetcher模块获取新闻数据 - news_data = news_fetcher.get_latest_news(days=days, limit=limit) - - # 过滤新闻 - if only_important: - # 根据关键词过滤重要新闻 - important_keywords = ['重要', '利好', '重磅', '突发', '关注'] - news_data = [news for news in news_data if - any(keyword in (news.get('content', '') or '') for keyword in important_keywords)] - - if news_type == 'hotspot': - # 过滤舆情热点相关新闻 - hotspot_keywords = [ - # 舆情直接相关词 - '舆情', '舆论', '热点', '热议', '热搜', '话题', - - # 关注度相关词 - '关注度', '高度关注', '引发关注', '市场关注', '持续关注', '重点关注', - '密切关注', '广泛关注', '集中关注', '投资者关注', - - # 传播相关词 - '爆文', '刷屏', '刷爆', '冲上热搜', '纷纷转发', '广泛传播', - '热传', '病毒式传播', '迅速扩散', '高度转发', - - # 社交媒体相关词 - '微博热搜', '微博话题', '知乎热议', '抖音热门', '今日头条', '朋友圈热议', - '微信热文', '社交媒体热议', 'APP热榜', - - # 情绪相关词 - '情绪高涨', '市场情绪', '投资情绪', '恐慌情绪', '亢奋情绪', - '乐观情绪', '悲观情绪', '投资者情绪', '公众情绪', - - # 突发事件相关 - '突发', '紧急', '爆发', '突现', '紧急事态', '快讯', '突发事件', - '重大事件', '意外事件', '突发新闻', - - # 行业动态相关 - '行业动向', '市场动向', '板块轮动', '资金流向', '产业趋势', - '政策导向', '监管动态', '风口', '市场风向', - - # 舆情分析相关 - '舆情分析', '舆情监测', '舆情报告', '舆情数据', '舆情研判', - '舆情趋势', '舆情预警', '舆情通报', '舆情简报', - - # 市场焦点相关 - '市场焦点', '焦点话题', '焦点股', '焦点事件', '投资焦点', - '关键词', '今日看点', '重点关切', '核心议题', - - # 传统媒体相关 - '头版头条', '财经头条', '要闻', '重磅新闻', '独家报道', - '深度报道', '特别关注', '重点报道', '专题报道', - - # 特殊提示词 - '投资舆情', '今日舆情', '今日热点', '投资热点', '市场热点', - '每日热点', '关注要点', '交易热点', '今日重点', - - # AI基础技术 - '人工智能', 'AI', '机器学习', '深度学习', '神经网络', '大模型', - 'LLM', '大语言模型', '生成式AI', '生成式人工智能', '算法', - - # AI细分技术 - '自然语言处理', 'NLP', '计算机视觉', 'CV', '语音识别', - '图像生成', '多模态', '强化学习', '联邦学习', '知识图谱', - '边缘计算', '量子计算', '类脑计算', '神经形态计算', - - # 热门AI模型/产品 - 'GPT', 'GPT-4', 'GPT-5', 'GPT-4o', 'ChatGPT', 'Claude', - 'Gemini', 'Llama', 'Llama3', 'Stable Diffusion', 'DALL-E', - 'Midjourney', 'Sora', 'Anthropic', 'Runway', 'Copilot', - 'Bard', 'GLM', 'Ernie', '文心一言', '通义千问', '讯飞星火','DeepSeek', - - # AI应用领域 - 'AIGC', '智能驾驶', '自动驾驶', '智能助手', '智能医疗', - '智能制造', '智能客服', '智能金融', '智能教育', - '智能家居', '机器人', 'RPA', '数字人', '虚拟人', - '智能安防', '计算机辅助', - - # AI硬件 - 'AI芯片', 'GPU', 'TPU', 'NPU', 'FPGA', '算力', '推理芯片', - '训练芯片', 'NVIDIA', '英伟达', 'AMD', '高性能计算', - - # AI企业 - 'OpenAI', '微软AI', '谷歌AI', 'Google DeepMind', 'Meta AI', - '百度智能云', '阿里云AI', '腾讯AI', '华为AI', '商汤科技', - '旷视科技', '智源人工智能', '云从科技', '科大讯飞', - - # AI监管/伦理 - 'AI监管', 'AI伦理', 'AI安全', 'AI风险', 'AI治理', - 'AI对齐', 'AI偏见', 'AI隐私', 'AGI', '通用人工智能', - '超级智能', 'AI法规', 'AI责任', 'AI透明度', - - # AI市场趋势 - 'AI创业', 'AI投资', 'AI融资', 'AI估值', 'AI泡沫', - 'AI风口', 'AI赛道', 'AI产业链', 'AI应用落地', 'AI转型', - 'AI红利', 'AI市值', 'AI概念股', - - # 新兴AI概念 - 'AI Agent', 'AI智能体', '多智能体', '自主AI', - 'AI搜索引擎', 'RAG', '检索增强生成', '思维链', 'CoT', - '大模型微调', '提示工程', 'Prompt Engineering', - '基础模型', 'Foundation Model', '小模型', '专用模型', - - # 人工智能舆情专用 - 'AI热点', 'AI风潮', 'AI革命', 'AI热议', 'AI突破', - 'AI进展', 'AI挑战', 'AI竞赛', 'AI战略', 'AI政策', - 'AI风险', 'AI恐慌', 'AI威胁', 'AI机遇' - ] - - # 在API处理中使用 - if news_type == 'hotspot': - # 过滤舆情热点相关新闻 - def has_keyword(item): - title = item.get('title', '') - content = item.get('content', '') - return any(keyword in title for keyword in hotspot_keywords) or \ - any(keyword in content for keyword in hotspot_keywords) - - news_data = [news for news in news_data if has_keyword(news)] - - return jsonify({'success': True, 'news': news_data}) - except Exception as e: - app.logger.error(f"获取最新新闻数据时出错: {str(e)}") - return jsonify({'success': False, 'error': str(e)}), 500 - - - -# --- Start of new FileSessionManager implementation --- -class FileSessionManager: - """A Flask-compatible file-based session manager for agent tasks.""" - def __init__(self, data_dir): - self.data_dir = Path(data_dir) - self.data_dir.mkdir(parents=True, exist_ok=True) - - def _get_task_path(self, task_id): - return self.data_dir / f"{task_id}.json" - - def save_task(self, task_data): - if 'id' not in task_data: - app.logger.error("Attempted to save task without an 'id'") - return - task_id = task_data['id'] - task_file = self._get_task_path(task_id) - with open(task_file, 'w', encoding='utf-8') as f: - json.dump(task_data, f, ensure_ascii=False, indent=4, cls=NumpyJSONEncoder) - - def load_task(self, task_id): - task_file = self._get_task_path(task_id) - if not task_file.exists(): - return None - with open(task_file, 'r', encoding='utf-8') as f: - try: - return json.load(f) - except json.JSONDecodeError: - app.logger.error(f"Failed to decode JSON for task {task_id}") - return None - - def get_all_tasks(self): - tasks = [] - for task_file in self.data_dir.glob("*.json"): - with open(task_file, 'r', encoding='utf-8') as f: - try: - tasks.append(json.load(f)) - except json.JSONDecodeError: - app.logger.warning(f"Skipping corrupted task file: {task_file.name}") - continue - return tasks - - def cleanup_stale_tasks(self, timeout_hours=2): - """Clean up stale 'running' tasks that have exceeded a timeout.""" - app.logger.info("开始清理过时的任务...") - cleaned_count = 0 - now = datetime.now() - - tasks = self.get_all_tasks() - for task in tasks: - if task.get('status') == TASK_RUNNING: - try: - updated_at = datetime.strptime(task.get('updated_at'), '%Y-%m-%d %H:%M:%S') - if (now - updated_at).total_seconds() > timeout_hours * 3600: - task['status'] = TASK_FAILED - task['error'] = '任务因服务器重启或超时而中止' - task['updated_at'] = now.strftime('%Y-%m-%d %H:%M:%S') - self.save_task(task) - cleaned_count += 1 - app.logger.warning(f"清理了过时的任务 {task.get('id')},该任务已运行超过 {timeout_hours} 小时。") - except (ValueError, TypeError) as e: - app.logger.error(f"解析任务 {task.get('id')} 的 updated_at 时出错: {e}") - continue - - if cleaned_count > 0: - app.logger.info(f"清理完成,共处理了 {cleaned_count} 个过时的任务。") - else: - app.logger.info("没有发现需要清理的过时任务。") - - def delete_task(self, task_id): - """Safely delete a task file.""" - try: - task_file = self._get_task_path(task_id) - if task_file.exists(): - task_file.unlink() - return True - except Exception as e: - app.logger.error(f"Failed to delete task {task_id}: {e}") - return False - -# Instantiate the manager -AGENT_SESSIONS_DIR = os.path.join(os.path.dirname(__file__), '../../data/agent_sessions') -agent_session_manager = FileSessionManager(AGENT_SESSIONS_DIR) -agent_session_manager.cleanup_stale_tasks() -# --- End of new FileSessionManager implementation --- - - -# 智能体分析路由 -@app.route('/api/start_agent_analysis', methods=['POST']) -def start_agent_analysis(): - """启动智能体分析任务""" - try: - data = request.json - stock_code = data.get('stock_code') - research_depth = data.get('research_depth', 3) - market_type = data.get('market_type', 'A') - selected_analysts = data.get('selected_analysts', ["market", "social", "news", "fundamentals"]) - analysis_date = data.get('analysis_date') - enable_memory = data.get('enable_memory', True) - max_output_length = data.get('max_output_length', 2048) - - if not stock_code: - return jsonify({'error': '请提供股票代码'}), 400 - - # 创建新任务 - task_id = generate_task_id() - task = { - 'id': task_id, - 'status': TASK_PENDING, - 'progress': 0, - 'current_step': '任务已创建', - 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - 'params': { - 'stock_code': stock_code, - 'research_depth': research_depth, - 'market_type': market_type, - 'selected_analysts': selected_analysts, - 'analysis_date': analysis_date, - 'enable_memory': enable_memory, - 'max_output_length': max_output_length - } - } - - # 为任务创建取消事件 - task['cancel_event'] = threading.Event() - agent_session_manager.save_task(task) - - def run_agent_analysis(): - """在后台线程中运行智能体分析""" - try: - from tradingagents.graph.trading_graph import TradingAgentsGraph - from tradingagents.default_config import DEFAULT_CONFIG - - update_task_status('agent_analysis', task_id, TASK_RUNNING, progress=5, result={'current_step': '正在初始化智能体...'}) - - # --- 修复 Start: 强制使用主应用的OpenAI代理配置 --- - config = DEFAULT_CONFIG.copy() - config['llm_provider'] = 'openai' - config['backend_url'] = os.getenv('OPENAI_API_URL') - main_model = os.getenv('OPENAI_API_MODEL', 'gpt-4o') - config['deep_think_llm'] = main_model - config['quick_think_llm'] = main_model - config['memory_enabled'] = enable_memory - config['max_tokens'] = max_output_length - - if not os.getenv('OPENAI_API_KEY'): - raise ValueError("主应用的 OPENAI_API_KEY 未在.env文件中设置") - - app.logger.info(f"强制使用主应用代理配置进行智能体分析: provider={config['llm_provider']}, url={config['backend_url']}, model={config['deep_think_llm']}") - - ta = TradingAgentsGraph( - selected_analysts=selected_analysts, - debug=True, - config=config - ) - # --- 修复 End --- - - def progress_callback(progress, step): - current_task = agent_session_manager.load_task(task_id) - if not current_task or current_task.get('status') == TASK_CANCELLED: - raise TaskCancelledException(f"任务 {task_id} 已被用户取消") - update_task_status('agent_analysis', task_id, TASK_RUNNING, progress=progress, result={'current_step': step}) - - today = analysis_date or datetime.now().strftime('%Y-%m-%d') - state, decision = ta.propagate(stock_code, today, market_type=market_type, progress_callback=progress_callback) - - # 修复:在任务完成时,获取并添加公司名称到最终结果中 - try: - stock_info = analyzer.get_stock_info(stock_code) - stock_name = stock_info.get('股票名称', '未知') - # 将公司名称添加到 state 字典中,前端将从这里读取 - if isinstance(state, dict): - state['company_name'] = stock_name - except Exception as e: - app.logger.error(f"为 {stock_code} 获取公司名称时出错: {e}") - if isinstance(state, dict): - state['company_name'] = '名称获取失败' - - update_task_status('agent_analysis', task_id, TASK_COMPLETED, progress=100, result={'decision': decision, 'final_state': state, 'current_step': '分析完成'}) - app.logger.info(f"智能体分析任务 {task_id} 完成") - - except TaskCancelledException as e: - app.logger.info(str(e)) - update_task_status('agent_analysis', task_id, TASK_FAILED, error='任务已被用户取消', result={'current_step': '任务已被用户取消'}) - except Exception as e: - app.logger.error(f"智能体分析任务 {task_id} 失败: {str(e)}") - app.logger.error(traceback.format_exc()) - update_task_status('agent_analysis', task_id, TASK_FAILED, error=str(e), result={'current_step': f'分析失败: {e}'}) - - thread = threading.Thread(target=run_agent_analysis) - thread.daemon = True - thread.start() - - return jsonify({ - 'task_id': task_id, - 'status': 'pending', - 'message': f'已启动对 {stock_code} 的智能体分析' - }) - - except Exception as e: - app.logger.error(f"启动智能体分析时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - -@app.route('/api/agent_analysis_status/', methods=['GET']) -def get_agent_analysis_status(task_id): - """获取智能体分析任务的状态""" - task = agent_session_manager.load_task(task_id) - - if not task: - return jsonify({'error': '找不到指定的智能体分析任务'}), 404 - - # 准备要返回的数据 - response_data = { - 'id': task['id'], - 'status': task['status'], - 'progress': task.get('progress', 0), - 'created_at': task['created_at'], - 'updated_at': task['updated_at'], - 'params': task.get('params', {}) - } - - if 'result' in task: - response_data['result'] = convert_messages_to_dict(task['result']) - if 'error' in task: - response_data['error'] = task['error'] - - return custom_jsonify(response_data) - - - -# 在web_server.py中添加(建议放在路由定义区域) -@app.route('/health', methods=['GET']) -def health_check(): - """健康检查端点,供Hugging Face确认应用状态""" - return jsonify({"status": "healthy", "timestamp": datetime.now().isoformat()}), 200 - - - - - -@app.route('/api/agent_analysis_history', methods=['GET']) -def get_agent_analysis_history(): - """获取已完成的智能体分析任务历史""" - try: - all_tasks = agent_session_manager.get_all_tasks() - history = [ - task for task in all_tasks - if task.get('status') in [TASK_COMPLETED, TASK_FAILED] - ] - # 按更新时间排序,最新的在前 - history.sort(key=lambda x: x.get('updated_at', ''), reverse=True) - return custom_jsonify({'history': history}) - except Exception as e: - app.logger.error(f"获取分析历史时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -@app.route('/api/delete_agent_analysis', methods=['POST']) -def delete_agent_analysis(): - """Cancel and/or delete one or more agent analysis tasks.""" - try: - data = request.json - task_ids = data.get('task_ids', []) - if not isinstance(task_ids, list): - return jsonify({'error': 'task_ids 必须是一个列表'}), 400 - - if not task_ids: - return jsonify({'error': '请提供要删除的任务ID'}), 400 - - deleted_count = 0 - cancelled_count = 0 - - for task_id in task_ids: - task = agent_session_manager.load_task(task_id) - if not task: - app.logger.warning(f"尝试删除一个不存在的任务: {task_id}") - continue - - # If the task is running, mark it as cancelled - if task.get('status') == TASK_RUNNING: - task['status'] = TASK_CANCELLED - task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') - task['error'] = '任务已被用户取消' - agent_session_manager.save_task(task) - cancelled_count += 1 - app.logger.info(f"任务 {task_id} 已被标记为取消。") - - # For all other states (or after cancelling), delete the task file - if agent_session_manager.delete_task(task_id): - deleted_count += 1 - - message = f"请求处理 {len(task_ids)} 个任务。已取消 {cancelled_count} 个运行中的任务,并删除了 {deleted_count} 个任务文件。" - app.logger.info(message) - return jsonify({'success': True, 'message': message}) - - except Exception as e: - app.logger.error(f"删除分析历史时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - - -@app.route('/api/active_tasks', methods=['GET']) -def get_active_tasks(): - """获取所有正在进行的智能体分析任务""" - try: - all_tasks = agent_session_manager.get_all_tasks() - active_tasks_list = [] - for task in all_tasks: - if task.get('status') == TASK_RUNNING: - task_info = { - 'task_id': task['id'], - 'stock_code': task.get('params', {}).get('stock_code'), - 'progress': task.get('progress', 0), - 'current_step': task.get('result', {}).get('current_step', '加载中...') - } - active_tasks_list.append(task_info) - # 按创建时间排序,最新的在前 - active_tasks_list.sort(key=lambda x: x.get('created_at', ''), reverse=True) - return custom_jsonify({'active_tasks': active_tasks_list}) - except Exception as e: - app.logger.error(f"获取活动任务时出错: {traceback.format_exc()}") - return jsonify({'error': str(e)}), 500 - - -# 在应用启动时启动清理线程(保持原有代码不变) -cleaner_thread = threading.Thread(target=run_task_cleaner) -cleaner_thread.daemon = True -cleaner_thread.start() - -if __name__ == '__main__': - # 强制禁用Flask的调试模式,以确保日志配置生效 +# -*- coding: utf-8 -*- +""" +智能分析系统(股票) - 股票市场数据分析系统 +修改:熊猫大侠 +版本:v2.1.0 +""" +# web_server.py + +import numpy as np +import pandas as pd +from flask import Flask, render_template, request, jsonify, redirect, url_for +from app.analysis.stock_analyzer import StockAnalyzer +from app.analysis.us_stock_service import USStockService +import threading +import logging +from logging.handlers import RotatingFileHandler +import traceback +import os +import json +from datetime import date, datetime, timedelta +from flask_cors import CORS +from pathlib import Path +import time +from flask_caching import Cache +import threading +import sys +from flask_swagger_ui import get_swaggerui_blueprint +from app.core.database import get_session, StockInfo, AnalysisResult, Portfolio, USE_DATABASE +from dotenv import load_dotenv +from app.analysis.industry_analyzer import IndustryAnalyzer +from app.analysis.fundamental_analyzer import FundamentalAnalyzer +from app.analysis.capital_flow_analyzer import CapitalFlowAnalyzer +from app.analysis.scenario_predictor import ScenarioPredictor +from app.analysis.stock_qa import StockQA +from app.analysis.risk_monitor import RiskMonitor +from app.analysis.index_industry_analyzer import IndexIndustryAnalyzer +from app.analysis.news_fetcher import news_fetcher, start_news_scheduler +from app.analysis.etf_analyzer import EtfAnalyzer + +import sys +import os + +# 将 tradingagents 目录添加到系统路径 +# 这允许应用从 tradingagents 代码库中导入模块 +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../tradingagents'))) + + +# 加载环境变量 +load_dotenv() + +# 检查是否需要初始化数据库 +if USE_DATABASE: + init_db() + +# 配置Swagger +SWAGGER_URL = '/api/docs' +API_URL = '/static/swagger.json' +swaggerui_blueprint = get_swaggerui_blueprint( + SWAGGER_URL, + API_URL, + config={ + 'app_name': "股票智能分析系统 API文档" + } +) + +app = Flask(__name__) +CORS(app, resources={r"/*": {"origins": "*"}}) +analyzer = StockAnalyzer() +us_stock_service = USStockService() + +# 配置缓存 +cache_config = { + 'CACHE_TYPE': 'SimpleCache', + 'CACHE_DEFAULT_TIMEOUT': 300 +} + +# 如果配置了Redis,使用Redis作为缓存后端 +if os.getenv('USE_REDIS_CACHE', 'False').lower() == 'true' and os.getenv('REDIS_URL'): + cache_config = { + 'CACHE_TYPE': 'RedisCache', + 'CACHE_REDIS_URL': os.getenv('REDIS_URL'), + 'CACHE_DEFAULT_TIMEOUT': 300 + } + +cache = Cache(config={'CACHE_TYPE': 'SimpleCache'}) +cache.init_app(app) + +app.register_blueprint(swaggerui_blueprint, url_prefix=SWAGGER_URL) + + +# 确保全局变量在重新加载时不会丢失 +if 'analyzer' not in globals(): + try: + from app.analysis.stock_analyzer import StockAnalyzer + + analyzer = StockAnalyzer() + print("成功初始化全局StockAnalyzer实例") + except Exception as e: + print(f"初始化StockAnalyzer时出错: {e}", file=sys.stderr) + raise + +# 初始化模块实例 +fundamental_analyzer = FundamentalAnalyzer() +capital_flow_analyzer = CapitalFlowAnalyzer() +scenario_predictor = ScenarioPredictor(analyzer, os.getenv('OPENAI_API_KEY'), os.getenv('OPENAI_API_MODEL')) +stock_qa = StockQA(analyzer, os.getenv('OPENAI_API_KEY')) +risk_monitor = RiskMonitor(analyzer) +index_industry_analyzer = IndexIndustryAnalyzer(analyzer) +industry_analyzer = IndustryAnalyzer() + +start_news_scheduler() + +# 线程本地存储 +thread_local = threading.local() + + +def get_analyzer(): + """获取线程本地的分析器实例""" + # 如果线程本地存储中没有分析器实例,创建一个新的 + if not hasattr(thread_local, 'analyzer'): + thread_local.analyzer = StockAnalyzer() + return thread_local.analyzer + + +# 配置日志 +# 从环境变量读取日志级别和文件路径 +log_level = os.getenv('LOG_LEVEL', 'INFO').upper() +log_file = os.getenv('LOG_FILE', 'data/logs/server.log') + +# 确保日志目录存在 +os.makedirs(os.path.dirname(log_file), exist_ok=True) + +# 创建日志格式化器 +formatter = logging.Formatter( + '[%(asctime)s] [%(process)d:%(thread)d] [%(levelname)s] [%(name)s:%(lineno)d] - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) + +# 配置根日志记录器 +root_logger = logging.getLogger() +root_logger.setLevel(log_level) + +# 清除所有现有的处理器,以避免重复日志 +if root_logger.hasHandlers(): + root_logger.handlers.clear() + +# 添加文件处理器 +file_handler = RotatingFileHandler(log_file, maxBytes=1024*1024*10, backupCount=5, encoding='utf-8') # 10MB +file_handler.setFormatter(formatter) +root_logger.addHandler(file_handler) + +# 添加控制台处理器 +console_handler = logging.StreamHandler(sys.stdout) +console_handler.setFormatter(formatter) +root_logger.addHandler(console_handler) + +# 将Flask的默认处理器移除,使其日志也遵循我们的配置 +from flask.logging import default_handler +app.logger.removeHandler(default_handler) +app.logger.propagate = True + +# 将 werkzeug 日志记录器的级别也设置为 .env 中定义的级别 +logging.getLogger('werkzeug').setLevel(log_level) + +app.logger.info(f"日志系统已初始化,级别: {log_level}, 文件: {log_file}") + + +# 扩展任务管理系统以支持不同类型的任务 +task_types = { + 'scan': 'market_scan', # 市场扫描任务 + 'analysis': 'stock_analysis', # 个股分析任务 + 'agent_analysis': 'agent_analysis', # 智能体分析任务 + 'etf_analysis': 'etf_analysis' # ETF分析任务 +} + +# 任务数据存储 +tasks = { + 'market_scan': {}, + 'stock_analysis': {}, + 'etf_analysis': {}, +} + + + +def get_task_store(task_type): + """获取指定类型的任务存储""" + return tasks.get(task_type, {}) + + +def generate_task_key(task_type, **params): + """生成任务键""" + if task_type == 'stock_analysis': + # 对于个股分析,使用股票代码和市场类型作为键 + return f"{params.get('stock_code')}_{params.get('market_type', 'A')}" + if task_type == 'etf_analysis': + return f"{params.get('etf_code')}" + return None # 其他任务类型不使用预生成的键 + + +def get_or_create_task(task_type, **params): + """获取或创建任务""" + store = get_task_store(task_type) + task_key = generate_task_key(task_type, **params) + + # 检查是否有现有任务 + if task_key and task_key in store: + task = store[task_key] + # 检查任务是否仍然有效 + if task['status'] in [TASK_PENDING, TASK_RUNNING]: + return task['id'], task, False + if task['status'] == TASK_COMPLETED and 'result' in task: + # 任务已完成且有结果,重用它 + return task['id'], task, False + + # 创建新任务 + task_id = generate_task_id() + task = { + 'id': task_id, + 'key': task_key, # 存储任务键以便以后查找 + 'type': task_type, + 'status': TASK_PENDING, + 'progress': 0, + 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'params': params + } + + with task_lock: + if task_key: + store[task_key] = task + store[task_id] = task + + return task_id, task, True + + +# 添加到web_server.py顶部 +# 任务管理系统 +scan_tasks = {} # 存储扫描任务的状态和结果 +task_lock = threading.Lock() # 用于线程安全操作 + + +# 自定义异常,用于任务取消 +class TaskCancelledException(Exception): + pass + +# 任务状态常量 +TASK_PENDING = 'pending' +TASK_RUNNING = 'running' +TASK_COMPLETED = 'completed' +TASK_FAILED = 'failed' +TASK_CANCELLED = 'cancelled' + + +def generate_task_id(): + """生成唯一的任务ID""" + import uuid + return str(uuid.uuid4()) + + +def start_market_scan_task_status(task_id, status, progress=None, result=None, error=None): + """更新任务状态 - 保持原有签名""" + with task_lock: + if task_id in scan_tasks: + task = scan_tasks[task_id] + task['status'] = status + if progress is not None: + task['progress'] = progress + if result is not None: + task['result'] = result + if error is not None: + task['error'] = error + task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + +def update_task_status(task_type, task_id, status, progress=None, result=None, error=None): + """更新任务状态""" + with task_lock: + task = None + if task_type == 'agent_analysis': + task = agent_session_manager.load_task(task_id) + else: + store = get_task_store(task_type) + if task_id in store: + task = store.get(task_id) + + if not task: + app.logger.warning(f"更新任务状态时未找到任务: {task_id} (类型: {task_type})") + return + + # 更新任务属性 + task['status'] = status + if progress is not None: + task['progress'] = progress + if result is not None: + if 'result' not in task or not isinstance(task['result'], dict): + task['result'] = {} + task['result'].update(result) + if error is not None: + task['error'] = error + task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # 保存更新后的任务 + if task_type == 'agent_analysis': + agent_session_manager.save_task(task) + else: + # 更新键索引的任务 (如果适用) + store = get_task_store(task_type) + if 'key' in task and task.get('key') and task['key'] in store: + store[task['key']] = task + store[task_id] = task # also save by id + + +analysis_tasks = {} + + +def get_or_create_analysis_task(stock_code, market_type='A'): + """获取或创建个股分析任务""" + # 创建一个键,用于查找现有任务 + task_key = f"{stock_code}_{market_type}" + + with task_lock: + # 检查是否有现有任务 + for task_id, task in analysis_tasks.items(): + if task.get('key') == task_key: + # 检查任务是否仍然有效 + if task['status'] in [TASK_PENDING, TASK_RUNNING]: + return task_id, task, False + if task['status'] == TASK_COMPLETED and 'result' in task: + # 任务已完成且有结果,重用它 + return task_id, task, False + + # 创建新任务 + task_id = generate_task_id() + task = { + 'id': task_id, + 'key': task_key, + 'status': TASK_PENDING, + 'progress': 0, + 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'params': { + 'stock_code': stock_code, + 'market_type': market_type + } + } + + analysis_tasks[task_id] = task + + return task_id, task, True + + +def update_analysis_task(task_id, status, progress=None, result=None, error=None): + """更新个股分析任务状态""" + with task_lock: + if task_id in analysis_tasks: + task = analysis_tasks[task_id] + task['status'] = status + if progress is not None: + task['progress'] = progress + if result is not None: + task['result'] = result + if error is not None: + task['error'] = error + task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + +# 定义自定义JSON编码器 + + +# 在web_server.py中,更新convert_numpy_types函数以处理NaN值 + +# 将NumPy类型转换为Python原生类型的函数 +def convert_numpy_types(obj): + """递归地将字典和列表中的NumPy类型转换为Python原生类型""" + try: + import numpy as np + import math + + if isinstance(obj, dict): + return {convert_numpy_types(key): convert_numpy_types(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [convert_numpy_types(item) for item in obj] + elif isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + # Handle NaN and Infinity specifically + if np.isnan(obj): + return None + elif np.isinf(obj): + return None if obj < 0 else 1e308 # Use a very large number for +Infinity + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.bool_): + return bool(obj) + # Handle Python's own float NaN and Infinity + elif isinstance(obj, float): + if math.isnan(obj): + return None + elif math.isinf(obj): + return None + return obj + # 添加对date和datetime类型的处理 + elif isinstance(obj, (date, datetime)): + return obj.isoformat() + else: + return obj + except ImportError: + # 如果没有安装numpy,但需要处理date和datetime + import math + if isinstance(obj, dict): + return {convert_numpy_types(key): convert_numpy_types(value) for key, value in obj.items()} + elif isinstance(obj, list): + return [convert_numpy_types(item) for item in obj] + elif isinstance(obj, (date, datetime)): + return obj.isoformat() + # Handle Python's own float NaN and Infinity + elif isinstance(obj, float): + if math.isnan(obj): + return None + elif math.isinf(obj): + return None + return obj + return obj + + +# 同样更新 NumpyJSONEncoder 类 +class NumpyJSONEncoder(json.JSONEncoder): + def default(self, obj): + # Handle LangChain message objects first + try: + from langchain_core.messages import BaseMessage + if isinstance(obj, BaseMessage): + return {"type": obj.__class__.__name__, "content": str(obj.content)} + except ImportError: + pass # If langchain is not installed, just proceed + + # For NumPy data types + try: + import numpy as np + import math + if isinstance(obj, np.integer): + return int(obj) + elif isinstance(obj, np.floating): + # Handle NaN and Infinity specifically + if np.isnan(obj): + return None + elif np.isinf(obj): + return None + return float(obj) + elif isinstance(obj, np.ndarray): + return obj.tolist() + elif isinstance(obj, np.bool_): + return bool(obj) + # Handle Python's own float NaN and Infinity + elif isinstance(obj, float): + if math.isnan(obj): + return None + elif math.isinf(obj): + return None + return obj + except ImportError: + # Handle Python's own float NaN and Infinity if numpy is not available + import math + if isinstance(obj, float): + if math.isnan(obj): + return None + elif math.isinf(obj): + return None + + # 添加对date和datetime类型的处理 + if isinstance(obj, (date, datetime)): + return obj.isoformat() + + # Fallback for other non-serializable types + try: + return super(NumpyJSONEncoder, self).default(obj) + except TypeError: + # For LangChain messages or other complex objects, convert to string + return str(obj) + + +# Helper to convert LangChain messages to JSON serializable format +def convert_messages_to_dict(obj): + """Recursively convert LangChain message objects to dictionaries.""" + # Check if langchain_core is available and if the object is a message + try: + from langchain_core.messages import BaseMessage + is_message = isinstance(obj, BaseMessage) + except ImportError: + is_message = False + + if is_message: + # Base case: convert message object to dict + return {"type": obj.__class__.__name__, "content": str(obj.content)} + elif isinstance(obj, dict): + # Recursive step for dictionaries + return {k: convert_messages_to_dict(v) for k, v in obj.items()} + elif isinstance(obj, list): + # Recursive step for lists + return [convert_messages_to_dict(elem) for elem in obj] + else: + # Return the object as is if no conversion is needed + return obj + + +# 使用我们的编码器的自定义 jsonify 函数 +def custom_jsonify(data): + return app.response_class( + json.dumps(convert_numpy_types(data), cls=NumpyJSONEncoder), + mimetype='application/json' + ) + + +# 保持API兼容的路由 +@app.route('/') +def index(): + return render_template('index.html') + + +@app.route('/analyze', methods=['POST']) +def analyze(): + try: + data = request.json + stock_codes = data.get('stock_codes', []) + market_type = data.get('market_type', 'A') + + if not stock_codes: + return jsonify({'error': '请输入代码'}), 400 + + app.logger.info(f"分析股票请求: {stock_codes}, 市场类型: {market_type}") + + # 设置最大处理时间,每只股票10秒 + max_time_per_stock = 10 # 秒 + max_total_time = max(30, min(60, len(stock_codes) * max_time_per_stock)) # 至少30秒,最多60秒 + + start_time = time.time() + results = [] + + for stock_code in stock_codes: + try: + # 检查是否已超时 + if time.time() - start_time > max_total_time: + app.logger.warning(f"分析股票请求已超过{max_total_time}秒,提前返回已处理的{len(results)}只股票") + break + + # 使用线程本地缓存的分析器实例 + current_analyzer = get_analyzer() + result = current_analyzer.quick_analyze_stock(stock_code.strip(), market_type) + + app.logger.info( + f"分析结果: 股票={stock_code}, 名称={result.get('stock_name', '未知')}, 行业={result.get('industry', '未知')}") + results.append(result) + except Exception as e: + app.logger.error(f"分析股票 {stock_code} 时出错: {str(e)}") + results.append({ + 'stock_code': stock_code, + 'error': str(e), + 'stock_name': '分析失败', + 'industry': '未知' + }) + + return jsonify({'results': results}) + except Exception as e: + app.logger.error(f"分析股票时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/north_flow_history', methods=['POST']) +def api_north_flow_history(): + try: + data = request.json + stock_code = data.get('stock_code') + days = data.get('days', 10) # 默认为10天,对应前端的默认选项 + + # 计算 end_date 为当前时间 + end_date = datetime.now().strftime('%Y%m%d') + + # 计算 start_date 为 end_date 减去指定的天数 + start_date = (datetime.now() - timedelta(days=int(days))).strftime('%Y%m%d') + + if not stock_code: + return jsonify({'error': '请提供股票代码'}), 400 + + # 调用北向资金历史数据方法 + + analyzer = CapitalFlowAnalyzer() + result = analyzer.get_north_flow_history(stock_code, start_date, end_date) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"获取北向资金历史数据出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/search_us_stocks', methods=['GET']) +def search_us_stocks(): + try: + keyword = request.args.get('keyword', '') + if not keyword: + return jsonify({'error': '请输入搜索关键词'}), 400 + + results = us_stock_service.search_us_stocks(keyword) + return jsonify({'results': results}) + + except Exception as e: + app.logger.error(f"搜索美股代码时出错: {str(e)}") + return jsonify({'error': str(e)}), 500 + + +# 新增可视化分析页面路由 +@app.route('/dashboard') +def dashboard(): + return render_template('dashboard.html') + + +@app.route('/stock_detail/') +def stock_detail(stock_code): + market_type = request.args.get('market_type', 'A') + return render_template('stock_detail.html', stock_code=stock_code, market_type=market_type) + + +@app.route('/portfolio') +def portfolio(): + return render_template('portfolio.html') + + +@app.route('/market_scan') +def market_scan(): + return render_template('market_scan.html') + + +# 基本面分析页面 +@app.route('/fundamental') +def fundamental(): + return render_template('fundamental.html') + + +# 资金流向页面 +@app.route('/capital_flow') +def capital_flow(): + return render_template('capital_flow.html') + + +# 情景预测页面 +@app.route('/scenario_predict') +def scenario_predict(): + return render_template('scenario_predict.html') + + +# 风险监控页面 +@app.route('/risk_monitor') +def risk_monitor_page(): + return render_template('risk_monitor.html') + + +# 智能问答页面 +@app.route('/qa') +def qa_page(): + return render_template('qa.html') + + +# 行业分析页面 +@app.route('/industry_analysis') +def industry_analysis(): + return render_template('industry_analysis.html') + + + +# 智能体分析页面 +@app.route('/agent_analysis') +def agent_analysis_page(): + return render_template('agent_analysis.html') + + +@app.route('/etf_analysis') +def etf_analysis_page(): + return render_template('etf_analysis.html') + + + + + +def make_cache_key_with_stock(): + """创建包含股票代码的自定义缓存键""" + path = request.path + + # 从请求体中获取股票代码 + stock_code = None + if request.is_json: + stock_code = request.json.get('stock_code') + + # 构建包含股票代码的键 + if stock_code: + return f"{path}_{stock_code}" + else: + return path + + +@app.route('/api/start_stock_analysis', methods=['POST']) +def start_stock_analysis(): + """启动个股分析任务""" + try: + data = request.json + stock_code = data.get('stock_code') + market_type = data.get('market_type', 'A') + + if not stock_code: + return jsonify({'error': '请输入股票代码'}), 400 + + app.logger.info(f"准备分析股票: {stock_code}") + + # 获取或创建任务 + task_id, task, is_new = get_or_create_task( + 'stock_analysis', + stock_code=stock_code, + market_type=market_type + ) + + # 如果是已完成的任务,直接返回结果 + if task['status'] == TASK_COMPLETED and 'result' in task: + app.logger.info(f"使用缓存的分析结果: {stock_code}") + return jsonify({ + 'task_id': task_id, + 'status': task['status'], + 'result': task['result'] + }) + + # 如果是新创建的任务,启动后台处理 + if is_new: + app.logger.info(f"创建新的分析任务: {task_id}") + + # 启动后台线程执行分析 + def run_analysis(): + try: + update_task_status('stock_analysis', task_id, TASK_RUNNING, progress=10) + + # 执行分析 + result = analyzer.perform_enhanced_analysis(stock_code, market_type) + + # 更新任务状态为完成 + update_task_status('stock_analysis', task_id, TASK_COMPLETED, progress=100, result=result) + app.logger.info(f"分析任务 {task_id} 完成") + + except Exception as e: + app.logger.error(f"分析任务 {task_id} 失败: {str(e)}") + app.logger.error(traceback.format_exc()) + update_task_status('stock_analysis', task_id, TASK_FAILED, error=str(e)) + + # 启动后台线程 + thread = threading.Thread(target=run_analysis) + thread.daemon = True + thread.start() + + # 返回任务ID和状态 + return jsonify({ + 'task_id': task_id, + 'status': task['status'], + 'message': f'已启动分析任务: {stock_code}' + }) + + except Exception as e: + app.logger.error(f"启动个股分析任务时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/analysis_status/', methods=['GET']) +def get_analysis_status(task_id): + """获取个股分析任务状态""" + store = get_task_store('stock_analysis') + with task_lock: + if task_id not in store: + return jsonify({'error': '找不到指定的分析任务'}), 404 + + task = store[task_id] + + # 基本状态信息 + status = { + 'id': task['id'], + 'status': task['status'], + 'progress': task.get('progress', 0), + 'created_at': task['created_at'], + 'updated_at': task['updated_at'] + } + + # 如果任务完成,包含结果 + if task['status'] == TASK_COMPLETED and 'result' in task: + status['result'] = task['result'] + + # 如果任务失败,包含错误信息 + if task['status'] == TASK_FAILED and 'error' in task: + status['error'] = task['error'] + + return custom_jsonify(status) + + +@app.route('/api/cancel_analysis/', methods=['POST']) +def cancel_analysis(task_id): + """取消个股分析任务""" + store = get_task_store('stock_analysis') + with task_lock: + if task_id not in store: + return jsonify({'error': '找不到指定的分析任务'}), 404 + + task = store[task_id] + + if task['status'] in [TASK_COMPLETED, TASK_FAILED]: + return jsonify({'message': '任务已完成或失败,无法取消'}) + + # 更新状态为失败 + task['status'] = TASK_FAILED + task['error'] = '用户取消任务' + task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + # 更新键索引的任务 + if 'key' in task and task['key'] in store: + store[task['key']] = task + + return jsonify({'message': '任务已取消'}) + + +# ETF 分析路由 +@app.route('/api/start_etf_analysis', methods=['POST']) +def start_etf_analysis(): + """启动ETF分析任务""" + try: + data = request.json + etf_code = data.get('etf_code') + + if not etf_code: + return jsonify({'error': '请输入ETF代码'}), 400 + + app.logger.info(f"准备分析ETF: {etf_code}") + + task_id, task, is_new = get_or_create_task( + 'etf_analysis', + etf_code=etf_code + ) + + if task['status'] == TASK_COMPLETED and 'result' in task: + app.logger.info(f"使用缓存的ETF分析结果: {etf_code}") + return jsonify({ + 'task_id': task_id, + 'status': task['status'], + 'result': task['result'] + }) + + if is_new: + app.logger.info(f"创建新的ETF分析任务: {task_id}") + + def run_etf_analysis(): + try: + update_task_status('etf_analysis', task_id, TASK_RUNNING, progress=10) + + # 使用一个新的 EtfAnalyzer 实例, 并传入stock_analyzer + etf_analyzer_instance = EtfAnalyzer(etf_code, analyzer) + result = etf_analyzer_instance.run_analysis() + + update_task_status('etf_analysis', task_id, TASK_COMPLETED, progress=100, result=result) + app.logger.info(f"ETF分析任务 {task_id} 完成") + + except Exception as e: + app.logger.error(f"ETF分析任务 {task_id} 失败: {str(e)}") + app.logger.error(traceback.format_exc()) + update_task_status('etf_analysis', task_id, TASK_FAILED, error=str(e)) + + thread = threading.Thread(target=run_etf_analysis) + thread.daemon = True + thread.start() + + return jsonify({ + 'task_id': task_id, + 'status': task['status'], + 'message': f'已启动ETF分析任务: {etf_code}' + }) + + except Exception as e: + app.logger.error(f"启动ETF分析任务时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/etf_analysis_status/', methods=['GET']) +def get_etf_analysis_status(task_id): + """获取ETF分析任务状态""" + store = get_task_store('etf_analysis') + with task_lock: + if task_id not in store: + return jsonify({'error': '找不到指定的ETF分析任务'}), 404 + + task = store[task_id] + + status = { + 'id': task['id'], + 'status': task['status'], + 'progress': task.get('progress', 0), + 'created_at': task['created_at'], + 'updated_at': task['updated_at'] + } + + if task['status'] == TASK_COMPLETED and 'result' in task: + status['result'] = task['result'] + + if task['status'] == TASK_FAILED and 'error' in task: + status['error'] = task['error'] + + return custom_jsonify(status) + + +# 保留原有API用于向后兼容 +@app.route('/api/enhanced_analysis', methods=['POST']) +def enhanced_analysis(): + """原增强分析API的向后兼容版本""" + try: + data = request.json + stock_code = data.get('stock_code') + market_type = data.get('market_type', 'A') + + if not stock_code: + return custom_jsonify({'error': '请输入股票代码'}), 400 + + # 调用新的任务系统,但模拟同步行为 + # 这会导致和之前一样的超时问���,但保持兼容 + timeout = 300 + start_time = time.time() + + # 获取或创建任务 + task_id, task, is_new = get_or_create_task( + 'stock_analysis', + stock_code=stock_code, + market_type=market_type + ) + + # 如果是已完成的任务,直接返回结果 + if task['status'] == TASK_COMPLETED and 'result' in task: + app.logger.info(f"使用缓存的分析结果: {stock_code}") + return custom_jsonify({'result': task['result']}) + + # 启动分析(如果是新任务) + if is_new: + # 同步执行分析 + try: + result = analyzer.perform_enhanced_analysis(stock_code, market_type) + update_task_status('stock_analysis', task_id, TASK_COMPLETED, progress=100, result=result) + app.logger.info(f"分析完成: {stock_code},耗时 {time.time() - start_time:.2f} 秒") + return custom_jsonify({'result': result}) + except Exception as e: + app.logger.error(f"分析过程中出错: {str(e)}") + update_task_status('stock_analysis', task_id, TASK_FAILED, error=str(e)) + return custom_jsonify({'error': f'分析过程中出错: {str(e)}'}), 500 + else: + # 已存在正在处理的任务,等待其完成 + max_wait = timeout - (time.time() - start_time) + wait_interval = 0.5 + waited = 0 + + while waited < max_wait: + with task_lock: + current_task = store[task_id] + if current_task['status'] == TASK_COMPLETED and 'result' in current_task: + return custom_jsonify({'result': current_task['result']}) + if current_task['status'] == TASK_FAILED: + error = current_task.get('error', '任务失败,无详细信息') + return custom_jsonify({'error': error}), 500 + + time.sleep(wait_interval) + waited += wait_interval + + # 超时 + return custom_jsonify({'error': '处理超时,请稍后重试'}), 504 + + except Exception as e: + app.logger.error(f"执行增强版分析时出错: {traceback.format_exc()}") + return custom_jsonify({'error': str(e)}), 500 + + +# 添加在web_server.py主代码中 +@app.errorhandler(404) +def not_found(error): + """处理404错误""" + if request.path.startswith('/api/'): + # 为API请求返回JSON格式的错误 + return jsonify({ + 'error': '找不到请求的API端点', + 'path': request.path, + 'method': request.method + }), 404 + # 为网页请求返回HTML错误页 + return render_template('error.html', error_code=404, message="找不到请求的页面"), 404 + + +@app.errorhandler(500) +def server_error(error): + """处理500错误""" + app.logger.error(f"服务器错误: {str(error)}") + if request.path.startswith('/api/'): + # 为API请求返回JSON格式的错误 + return jsonify({ + 'error': '服务器内部错误', + 'message': str(error) + }), 500 + # 为网页请求返回HTML错误页 + return render_template('error.html', error_code=500, message="服务器内部错误"), 500 + + +# Update the get_stock_data function in web_server.py to handle date formatting properly +@app.route('/api/stock_data', methods=['GET']) +@cache.cached(timeout=300, query_string=True) +def get_stock_data(): + try: + stock_code = request.args.get('stock_code') + market_type = request.args.get('market_type', 'A') + period = request.args.get('period', '1y') # 默认1年 + + if not stock_code: + return custom_jsonify({'error': '请提供股票代码'}), 400 + + # 根据period计算start_date + end_date = datetime.now().strftime('%Y%m%d') + if period == '1m': + start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d') + elif period == '3m': + start_date = (datetime.now() - timedelta(days=90)).strftime('%Y%m%d') + elif period == '6m': + start_date = (datetime.now() - timedelta(days=180)).strftime('%Y%m%d') + elif period == '1y': + start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d') + else: + start_date = (datetime.now() - timedelta(days=365)).strftime('%Y%m%d') + + # 获取股票历史数据 + app.logger.info( + f"获取股票 {stock_code} 的历史数据,市场: {market_type}, 起始日期: {start_date}, 结束日期: {end_date}") + df = analyzer.get_stock_data(stock_code, market_type, start_date, end_date) + + # 检查数据是否为空 + if df.empty: + app.logger.warning(f"股票 {stock_code} 的数据为空") + return custom_jsonify({'error': '未找到股票数据'}), 404 + + # 计算技术指标 + app.logger.info(f"计算股票 {stock_code} 的技术指标") + df = analyzer.calculate_indicators(df) + + # 将DataFrame转为JSON格式 + app.logger.info(f"将数据转换为JSON格式,行数: {len(df)}") + + # 确保日期列是字符串格式 - 修复缓存问题 + if 'date' in df.columns: + try: + if pd.api.types.is_datetime64_any_dtype(df['date']): + df['date'] = df['date'].dt.strftime('%Y-%m-%d') + else: + df = df.copy() + df['date'] = pd.to_datetime(df['date'], errors='coerce') + df['date'] = df['date'].dt.strftime('%Y-%m-%d') + except Exception as e: + app.logger.error(f"处理日期列时出错: {str(e)}") + df['date'] = df['date'].astype(str) + + # 将NaN值替换为None + df = df.replace({np.nan: None, np.inf: None, -np.inf: None}) + + records = df.to_dict('records') + + app.logger.info(f"数据处理完成,返回 {len(records)} 条记录") + return custom_jsonify({'data': records}) + except Exception as e: + app.logger.error(f"获取股票数据时出错: {str(e)}") + app.logger.error(traceback.format_exc()) + return custom_jsonify({'error': str(e)}), 500 + + +# @app.route('/api/market_scan', methods=['POST']) +# def api_market_scan(): +# try: +# data = request.json +# stock_list = data.get('stock_list', []) +# min_score = data.get('min_score', 60) +# market_type = data.get('market_type', 'A') + +# if not stock_list: +# return jsonify({'error': '请提供股票列表'}), 400 + +# # 限制股票数量,避免过长处理时间 +# if len(stock_list) > 100: +# app.logger.warning(f"股票列表过长 ({len(stock_list)}只),截取前100只") +# stock_list = stock_list[:100] + +# # 执行市场扫描 +# app.logger.info(f"开始扫描 {len(stock_list)} 只股票,最低分数: {min_score}") + +# # 使用线程池优化处理 +# results = [] +# max_workers = min(10, len(stock_list)) # 最多10个工作线程 + +# # 设置较长的超时时间 +# timeout = 300 # 5分钟 + +# def scan_thread(): +# try: +# return analyzer.scan_market(stock_list, min_score, market_type) +# except Exception as e: +# app.logger.error(f"扫描线程出错: {str(e)}") +# return [] + +# thread = threading.Thread(target=lambda: results.append(scan_thread())) +# thread.start() +# thread.join(timeout) + +# if thread.is_alive(): +# app.logger.error(f"市场扫描超时,已扫描 {len(stock_list)} 只股票超过 {timeout} 秒") +# return custom_jsonify({'error': '扫描超时,请减少股票数量或稍后再试'}), 504 + +# if not results or not results[0]: +# app.logger.warning("扫描结果为空") +# return custom_jsonify({'results': []}) + +# scan_results = results[0] +# app.logger.info(f"扫描完成,找到 {len(scan_results)} 只符合条件的股票") + +# # 使用自定义JSON格式处理NumPy数据类型 +# return custom_jsonify({'results': scan_results}) +# except Exception as e: +# app.logger.error(f"执行市场扫描时出错: {traceback.format_exc()}") +# return custom_jsonify({'error': str(e)}), 500 + +@app.route('/api/start_market_scan', methods=['POST']) +def start_market_scan(): + """启动市场扫描任务""" + try: + data = request.json + stock_list = data.get('stock_list', []) + min_score = data.get('min_score', 60) + market_type = data.get('market_type', 'A') + + if not stock_list: + return jsonify({'error': '请提供股票列表'}), 400 + + # 限制股票数量,避免过长处理时间 + if len(stock_list) > 100: + app.logger.warning(f"股票列表过长 ({len(stock_list)}只),截取前100只") + stock_list = stock_list[:100] + + # 创建新任务 + task_id = generate_task_id() + task = { + 'id': task_id, + 'status': TASK_PENDING, + 'progress': 0, + 'total': len(stock_list), + 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'params': { + 'stock_list': stock_list, + 'min_score': min_score, + 'market_type': market_type + } + } + + with task_lock: + scan_tasks[task_id] = task + + # 启动后台线程执行扫描 + def run_scan(): + try: + start_market_scan_task_status(task_id, TASK_RUNNING) + + # 执行分批处理 + results = [] + total = len(stock_list) + batch_size = 10 + + for i in range(0, total, batch_size): + if task_id not in scan_tasks or scan_tasks[task_id]['status'] != TASK_RUNNING: + # 任务被取消 + app.logger.info(f"扫描任务 {task_id} 被取消") + return + + batch = stock_list[i:i + batch_size] + batch_results = [] + + for stock_code in batch: + try: + report = analyzer.quick_analyze_stock(stock_code, market_type) + if report['score'] >= min_score: + batch_results.append(report) + except Exception as e: + app.logger.error(f"分析股票 {stock_code} 时出错: {str(e)}") + continue + + results.extend(batch_results) + + # 更新进度 + progress = min(100, int((i + len(batch)) / total * 100)) + start_market_scan_task_status(task_id, TASK_RUNNING, progress=progress) + + # 按得分排序 + results.sort(key=lambda x: x['score'], reverse=True) + + # 更新任务状态为完成 + start_market_scan_task_status(task_id, TASK_COMPLETED, progress=100, result=results) + app.logger.info(f"扫描任务 {task_id} 完成,找到 {len(results)} 只符合条件的股票") + + except Exception as e: + app.logger.error(f"扫描任务 {task_id} 失败: {str(e)}") + app.logger.error(traceback.format_exc()) + start_market_scan_task_status(task_id, TASK_FAILED, error=str(e)) + + # 启动后台线程 + thread = threading.Thread(target=run_scan) + thread.daemon = True + thread.start() + + return jsonify({ + 'task_id': task_id, + 'status': 'pending', + 'message': f'已启动扫描任务,正在处理 {len(stock_list)} 只股票' + }) + + except Exception as e: + app.logger.error(f"启动市场扫描任务时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/scan_status/', methods=['GET']) +def get_scan_status(task_id): + """获取扫描任务状态""" + with task_lock: + if task_id not in scan_tasks: + return jsonify({'error': '找不到指定的扫描任务'}), 404 + + task = scan_tasks[task_id] + + # 基本状态信息 + status = { + 'id': task['id'], + 'status': task['status'], + 'progress': task.get('progress', 0), + 'total': task.get('total', 0), + 'created_at': task['created_at'], + 'updated_at': task['updated_at'] + } + + # 如果任务完成,包含结果 + if task['status'] == TASK_COMPLETED and 'result' in task: + status['result'] = task['result'] + + # 如果任务失败,包含错误信息 + if task['status'] == TASK_FAILED and 'error' in task: + status['error'] = task['error'] + + return custom_jsonify(status) + + +@app.route('/api/cancel_scan/', methods=['POST']) +def cancel_scan(task_id): + """取消扫描任务""" + with task_lock: + if task_id not in scan_tasks: + return jsonify({'error': '找不到指定的扫描任务'}), 404 + + task = scan_tasks[task_id] + + if task['status'] in [TASK_COMPLETED, TASK_FAILED]: + return jsonify({'message': '任务已完成或失败,无法取消'}) + + # 更新状态为失败 + task['status'] = TASK_FAILED + task['error'] = '用户取消任务' + task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + return jsonify({'message': '任务已取消'}) + + +@app.route('/api/index_stocks', methods=['GET']) +def get_index_stocks(): + """获取指数成分股""" + try: + import akshare as ak + index_code = request.args.get('index_code', '000300') # 默认沪深300 + + # 获取指数成分股 + app.logger.info(f"获取指数 {index_code} 成分股") + if index_code == '000300': + # 沪深300成分股 + stocks = ak.index_stock_cons_weight_csindex(symbol="000300") + elif index_code == '000905': + # 中证500成分股 + stocks = ak.index_stock_cons_weight_csindex(symbol="000905") + elif index_code == '000852': + # 中证1000成分股 + stocks = ak.index_stock_cons_weight_csindex(symbol="000852") + elif index_code == '000001': + # 上证指数 + stocks = ak.index_stock_cons_weight_csindex(symbol="000001") + else: + return jsonify({'error': '不支持的指数代码'}), 400 + + # 提取股票代码列表 + stock_list = stocks['成分券代码'].tolist() if '成分券代码' in stocks.columns else [] + app.logger.info(f"找到 {len(stock_list)} 只成分股") + + return jsonify({'stock_list': stock_list}) + except Exception as e: + app.logger.error(f"获取指数成分股时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/industry_stocks', methods=['GET']) +def get_industry_stocks(): + """获取行业成分股""" + try: + import akshare as ak + industry = request.args.get('industry', '') + + if not industry: + return jsonify({'error': '请提供行业名称'}), 400 + + # 获取行业成分股 + app.logger.info(f"获取 {industry} 行业成分股") + stocks = ak.stock_board_industry_cons_em(symbol=industry) + + # 提取股票代码列表 + stock_list = stocks['代码'].tolist() if '代码' in stocks.columns else [] + app.logger.info(f"找到 {len(stock_list)} 只 {industry} 行业股票") + + return jsonify({'stock_list': stock_list}) + except Exception as e: + app.logger.error(f"获取行业成分股时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 添加到web_server.py +def clean_old_tasks(): + """清理旧的扫描任务""" + with task_lock: + now = datetime.now() + to_delete = [] + + for task_id, task in scan_tasks.items(): + # 解析更新时间 + try: + updated_at = datetime.strptime(task['updated_at'], '%Y-%m-%d %H:%M:%S') + # 如果任务完成或失败且超过1小时,或者任务状态异常且超过3小时,清理它 + if ((task['status'] in [TASK_COMPLETED, TASK_FAILED] and + (now - updated_at).total_seconds() > 3600) or + ((now - updated_at).total_seconds() > 10800)): + to_delete.append(task_id) + except: + # 日期解析错误,添加到删除列表 + to_delete.append(task_id) + + # 删除旧任务 + for task_id in to_delete: + del scan_tasks[task_id] + + return len(to_delete) + + +# 修改 run_task_cleaner 函数,使其每 5 分钟运行一次并在 16:30 左右清理所有缓存 +def run_task_cleaner(): + """定期运行任务清理,并在每天 16:30 左右清理所有缓存""" + while True: + try: + now = datetime.now() + # 判断是否在收盘时间附近(16:25-16:35) + is_market_close_time = (now.hour == 16 and 25 <= now.minute <= 35) + + cleaned = clean_old_tasks() + + # 如果是收盘时间,清理所有缓存 + if is_market_close_time: + # 清理分析器的数据缓存 + analyzer.data_cache.clear() + + # 清理 Flask 缓存 + cache.clear() + + # 清理任务存储 + with task_lock: + for task_type in tasks: + task_store = tasks[task_type] + completed_tasks = [task_id for task_id, task in task_store.items() + if task['status'] == TASK_COMPLETED] + for task_id in completed_tasks: + del task_store[task_id] + + app.logger.info("市场收盘时间检测到,已清理所有缓存数据") + + if cleaned > 0: + app.logger.info(f"清理了 {cleaned} 个旧的扫描任务") + except Exception as e: + app.logger.error(f"任务清理出错: {str(e)}") + + # 每 5 分钟运行一次,而不是每小时 + time.sleep(600) + + +# 基本面分析路由 +@app.route('/api/fundamental_analysis', methods=['POST']) +def api_fundamental_analysis(): + try: + data = request.json + stock_code = data.get('stock_code') + + if not stock_code: + return jsonify({'error': '请提供股票代码'}), 400 + + # 获取基本面分析结果 + result = fundamental_analyzer.calculate_fundamental_score(stock_code) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"基本面分析出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 资金流向分析路由 +# Add to web_server.py + +# 获取概念资金流向的API端点 +@app.route('/api/concept_fund_flow', methods=['GET']) +def api_concept_fund_flow(): + try: + period = request.args.get('period', '10日排行') # Default to 10-day ranking + + # Get concept fund flow data + result = capital_flow_analyzer.get_concept_fund_flow(period) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"Error getting concept fund flow: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 获取个股资金流向排名的API端点 +@app.route('/api/individual_fund_flow_rank', methods=['GET']) +def api_individual_fund_flow_rank(): + try: + period = request.args.get('period', '10日') # Default to today + + # Get individual fund flow ranking data + result = capital_flow_analyzer.get_individual_fund_flow_rank(period) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"Error getting individual fund flow ranking: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 获取个股资金流向的API端点 +@app.route('/api/individual_fund_flow', methods=['GET']) +def api_individual_fund_flow(): + try: + stock_code = request.args.get('stock_code') + market_type = request.args.get('market_type', '') # Auto-detect if not provided + re_date = request.args.get('period-select') + + if not stock_code: + return jsonify({'error': 'Stock code is required'}), 400 + + # Get individual fund flow data + result = capital_flow_analyzer.get_individual_fund_flow(stock_code, market_type, re_date) + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"Error getting individual fund flow: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 获取板块内股票的API端点 +@app.route('/api/sector_stocks', methods=['GET']) +def api_sector_stocks(): + try: + sector = request.args.get('sector') + + if not sector: + return jsonify({'error': 'Sector name is required'}), 400 + + # Get sector stocks data + result = capital_flow_analyzer.get_sector_stocks(sector) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"Error getting sector stocks: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# Update the existing capital flow API endpoint +@app.route('/api/capital_flow', methods=['POST']) +def api_capital_flow(): + try: + data = request.json + stock_code = data.get('stock_code') + market_type = data.get('market_type', '') # Auto-detect if not provided + + if not stock_code: + return jsonify({'error': 'Stock code is required'}), 400 + + # Calculate capital flow score + result = capital_flow_analyzer.calculate_capital_flow_score(stock_code, market_type) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"Error calculating capital flow score: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 情景预测路由 +@app.route('/api/scenario_predict', methods=['POST']) +def api_scenario_predict(): + try: + data = request.json + stock_code = data.get('stock_code') + market_type = data.get('market_type', 'A') + days = data.get('days', 60) + + if not stock_code: + return jsonify({'error': '请提供股票代码'}), 400 + + # 获取情景预测结果 + result = scenario_predictor.generate_scenarios(stock_code, market_type, days) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"情景预测出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 智能问答路由 +@app.route('/api/qa', methods=['POST']) +def api_qa(): + try: + data = request.json + stock_code = data.get('stock_code') + question = data.get('question') + market_type = data.get('market_type', 'A') + + if not stock_code or not question: + return jsonify({'error': '请提供股票代码和问题'}), 400 + + # 获取智能问答结果 + result = stock_qa.answer_question(stock_code, question, market_type) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"智能问答出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 风险分析路由 +@app.route('/api/risk_analysis', methods=['POST']) +def api_risk_analysis(): + try: + data = request.json + stock_code = data.get('stock_code') + market_type = data.get('market_type', 'A') + + if not stock_code: + return jsonify({'error': '请提供股票代码'}), 400 + + # 获取风险分析结果 + result = risk_monitor.analyze_stock_risk(stock_code, market_type) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"风险分析出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 投资组合风险分析路由 +@app.route('/api/portfolio_risk', methods=['POST']) +def api_portfolio_risk(): + try: + data = request.json + portfolio = data.get('portfolio', []) + + if not portfolio: + return jsonify({'error': '请提供投资组合'}), 400 + + # 获取投资组合风险分析结果 + result = risk_monitor.analyze_portfolio_risk(portfolio) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"投资组合风险分析出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 指数分析路由 +@app.route('/api/index_analysis', methods=['GET']) +def api_index_analysis(): + try: + index_code = request.args.get('index_code') + limit = int(request.args.get('limit', 30)) + + if not index_code: + return jsonify({'error': '请提供指数代码'}), 400 + + # 获取指数分析结果 + result = index_industry_analyzer.analyze_index(index_code, limit) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"指数分析出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 行业分析路由 +@app.route('/api/industry_analysis', methods=['GET']) +def api_industry_analysis(): + try: + industry = request.args.get('industry') + limit = int(request.args.get('limit', 30)) + + if not industry: + return jsonify({'error': '请提供行业名称'}), 400 + + # 获取行业分析结果 + result = index_industry_analyzer.analyze_industry(industry, limit) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"行业分析出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/industry_fund_flow', methods=['GET']) +def api_industry_fund_flow(): + """获取行业资金流向数据""" + try: + symbol = request.args.get('symbol', '即时') + + result = industry_analyzer.get_industry_fund_flow(symbol) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"获取行业资金流向数据出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/industry_detail', methods=['GET']) +def api_industry_detail(): + """获取行业详细信息""" + try: + industry = request.args.get('industry') + + if not industry: + return jsonify({'error': '请提供行业名称'}), 400 + + result = industry_analyzer.get_industry_detail(industry) + + app.logger.info(f"返回前 (result):{result}") + if not result: + return jsonify({'error': f'未找到行业 {industry} 的详细信息'}), 404 + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"获取行业详细信息出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 行业比较路由 +@app.route('/api/industry_compare', methods=['GET']) +def api_industry_compare(): + try: + limit = int(request.args.get('limit', 10)) + + # 获取行业比较结果 + result = index_industry_analyzer.compare_industries(limit) + + return custom_jsonify(result) + except Exception as e: + app.logger.error(f"行业比较出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 保存股票分析结果到数据库 +def save_analysis_result(stock_code, market_type, result): + """保存分析结果到数据库""" + if not USE_DATABASE: + return + + try: + session = get_session() + + # 创建新的分析结果记录 + analysis = AnalysisResult( + stock_code=stock_code, + market_type=market_type, + score=result.get('scores', {}).get('total', 0), + recommendation=result.get('recommendation', {}).get('action', ''), + technical_data=result.get('technical_analysis', {}), + fundamental_data=result.get('fundamental_data', {}), + capital_flow_data=result.get('capital_flow_data', {}), + ai_analysis=result.get('ai_analysis', '') + ) + + session.add(analysis) + session.commit() + + except Exception as e: + app.logger.error(f"保存分析结果到数据库时出错: {str(e)}") + if session: + session.rollback() + finally: + if session: + session.close() + + +# 从数据库获取历史分析结果 +@app.route('/api/history_analysis', methods=['GET']) +def get_history_analysis(): + """获取股票的历史分析结果""" + if not USE_DATABASE: + return jsonify({'error': '数据库功能未启用'}), 400 + + stock_code = request.args.get('stock_code') + limit = int(request.args.get('limit', 10)) + + if not stock_code: + return jsonify({'error': '请提供股票代码'}), 400 + + try: + session = get_session() + + # 查询历史分析结果 + results = session.query(AnalysisResult) \ + .filter(AnalysisResult.stock_code == stock_code) \ + .order_by(AnalysisResult.analysis_date.desc()) \ + .limit(limit) \ + .all() + + # 转换为字典列表 + history = [result.to_dict() for result in results] + + return jsonify({'history': history}) + + except Exception as e: + app.logger.error(f"获取历史分析结果时出错: {str(e)}") + return jsonify({'error': str(e)}), 500 + finally: + if session: + session.close() + +# 添加新闻API端点 +# 添加到web_server.py文件中 +@app.route('/api/latest_news', methods=['GET']) +def get_latest_news(): + try: + days = int(request.args.get('days', 1)) # 默认获取1天的新闻 + limit = int(request.args.get('limit', 1000)) # 默认最多获取1000条 + only_important = request.args.get('important', '0') == '1' # 是否只看重要新闻 + news_type = request.args.get('type', 'all') # 新闻类型,可选值: all, hotspot + + # 从news_fetcher模块获取新闻数据 + news_data = news_fetcher.get_latest_news(days=days, limit=limit) + + # 过滤新闻 + if only_important: + # 根据关键词过滤重要新闻 + important_keywords = ['重要', '利好', '重磅', '突发', '关注'] + news_data = [news for news in news_data if + any(keyword in (news.get('content', '') or '') for keyword in important_keywords)] + + if news_type == 'hotspot': + # 过滤舆情热点相关新闻 + hotspot_keywords = [ + # 舆情直接相关词 + '舆情', '舆论', '热点', '热议', '热搜', '话题', + + # 关注度相关词 + '关注度', '高度关注', '引发关注', '市场关注', '持续关注', '重点关注', + '密切关注', '广泛关注', '集中关注', '投资者关注', + + # 传播相关词 + '爆文', '刷屏', '刷爆', '冲上热搜', '纷纷转发', '广泛传播', + '热传', '病毒式传播', '迅速扩散', '高度转发', + + # 社交媒体相关词 + '微博热搜', '微博话题', '知乎热议', '抖音热门', '今日头条', '朋友圈热议', + '微信热文', '社交媒体热议', 'APP热榜', + + # 情绪相关词 + '情绪高涨', '市场情绪', '投资情绪', '恐慌情绪', '亢奋情绪', + '乐观情绪', '悲观情绪', '投资者情绪', '公众情绪', + + # 突发事件相关 + '突发', '紧急', '爆发', '突现', '紧急事态', '快讯', '突发事件', + '重大事件', '意外事件', '突发新闻', + + # 行业动态相关 + '行业动向', '市场动向', '板块轮动', '资金流向', '产业趋势', + '政策导向', '监管动态', '风口', '市场风向', + + # 舆情分析相关 + '舆情分析', '舆情监测', '舆情报告', '舆情数据', '舆情研判', + '舆情趋势', '舆情预警', '舆情通报', '舆情简报', + + # 市场焦点相关 + '市场焦点', '焦点话题', '焦点股', '焦点事件', '投资焦点', + '关键词', '今日看点', '重点关切', '核心议题', + + # 传统媒体相关 + '头版头条', '财经头条', '要闻', '重磅新闻', '独家报道', + '深度报道', '特别关注', '重点报道', '专题报道', + + # 特殊提示词 + '投资舆情', '今日舆情', '今日热点', '投资热点', '市场热点', + '每日热点', '关注要点', '交易热点', '今日重点', + + # AI基础技术 + '人工智能', 'AI', '机器学习', '深度学习', '神经网络', '大模型', + 'LLM', '大语言模型', '生成式AI', '生成式人工智能', '算法', + + # AI细分技术 + '自然语言处理', 'NLP', '计算机视觉', 'CV', '语音识别', + '图像生成', '多模态', '强化学习', '联邦学习', '知识图谱', + '边缘计算', '量子计算', '类脑计算', '神经形态计算', + + # 热门AI模型/产品 + 'GPT', 'GPT-4', 'GPT-5', 'GPT-4o', 'ChatGPT', 'Claude', + 'Gemini', 'Llama', 'Llama3', 'Stable Diffusion', 'DALL-E', + 'Midjourney', 'Sora', 'Anthropic', 'Runway', 'Copilot', + 'Bard', 'GLM', 'Ernie', '文心一言', '通义千问', '讯飞星火','DeepSeek', + + # AI应用领域 + 'AIGC', '智能驾驶', '自动驾驶', '智能助手', '智能医疗', + '智能制造', '智能客服', '智能金融', '智能教育', + '智能家居', '机器人', 'RPA', '数字人', '虚拟人', + '智能安防', '计算机辅助', + + # AI硬件 + 'AI芯片', 'GPU', 'TPU', 'NPU', 'FPGA', '算力', '推理芯片', + '训练芯片', 'NVIDIA', '英伟达', 'AMD', '高性能计算', + + # AI企业 + 'OpenAI', '微软AI', '谷歌AI', 'Google DeepMind', 'Meta AI', + '百度智能云', '阿里云AI', '腾讯AI', '华为AI', '商汤科技', + '旷视科技', '智源人工智能', '云从科技', '科大讯飞', + + # AI监管/伦理 + 'AI监管', 'AI伦理', 'AI安全', 'AI风险', 'AI治理', + 'AI对齐', 'AI偏见', 'AI隐私', 'AGI', '通用人工智能', + '超级智能', 'AI法规', 'AI责任', 'AI透明度', + + # AI市场趋势 + 'AI创业', 'AI投资', 'AI融资', 'AI估值', 'AI泡沫', + 'AI风口', 'AI赛道', 'AI产业链', 'AI应用落地', 'AI转型', + 'AI红利', 'AI市值', 'AI概念股', + + # 新兴AI概念 + 'AI Agent', 'AI智能体', '多智能体', '自主AI', + 'AI搜索引擎', 'RAG', '检索增强生成', '思维链', 'CoT', + '大模型微调', '提示工程', 'Prompt Engineering', + '基础模型', 'Foundation Model', '小模型', '专用模型', + + # 人工智能舆情专用 + 'AI热点', 'AI风潮', 'AI革命', 'AI热议', 'AI突破', + 'AI进展', 'AI挑战', 'AI竞赛', 'AI战略', 'AI政策', + 'AI风险', 'AI恐慌', 'AI威胁', 'AI机遇' + ] + + # 在API处理中使用 + if news_type == 'hotspot': + # 过滤舆情热点相关新闻 + def has_keyword(item): + title = item.get('title', '') + content = item.get('content', '') + return any(keyword in title for keyword in hotspot_keywords) or \ + any(keyword in content for keyword in hotspot_keywords) + + news_data = [news for news in news_data if has_keyword(news)] + + return jsonify({'success': True, 'news': news_data}) + except Exception as e: + app.logger.error(f"获取最新新闻数据时出错: {str(e)}") + return jsonify({'success': False, 'error': str(e)}), 500 + + + +# --- Start of new FileSessionManager implementation --- +class FileSessionManager: + """A Flask-compatible file-based session manager for agent tasks.""" + def __init__(self, data_dir): + self.data_dir = Path(data_dir) + self.data_dir.mkdir(parents=True, exist_ok=True) + + def _get_task_path(self, task_id): + return self.data_dir / f"{task_id}.json" + + def save_task(self, task_data): + if 'id' not in task_data: + app.logger.error("Attempted to save task without an 'id'") + return + task_id = task_data['id'] + task_file = self._get_task_path(task_id) + with open(task_file, 'w', encoding='utf-8') as f: + json.dump(task_data, f, ensure_ascii=False, indent=4, cls=NumpyJSONEncoder) + + def load_task(self, task_id): + task_file = self._get_task_path(task_id) + if not task_file.exists(): + return None + with open(task_file, 'r', encoding='utf-8') as f: + try: + return json.load(f) + except json.JSONDecodeError: + app.logger.error(f"Failed to decode JSON for task {task_id}") + return None + + def get_all_tasks(self): + tasks = [] + for task_file in self.data_dir.glob("*.json"): + with open(task_file, 'r', encoding='utf-8') as f: + try: + tasks.append(json.load(f)) + except json.JSONDecodeError: + app.logger.warning(f"Skipping corrupted task file: {task_file.name}") + continue + return tasks + + def cleanup_stale_tasks(self, timeout_hours=2): + """Clean up stale 'running' tasks that have exceeded a timeout.""" + app.logger.info("开始清理过时的任务...") + cleaned_count = 0 + now = datetime.now() + + tasks = self.get_all_tasks() + for task in tasks: + if task.get('status') == TASK_RUNNING: + try: + updated_at = datetime.strptime(task.get('updated_at'), '%Y-%m-%d %H:%M:%S') + if (now - updated_at).total_seconds() > timeout_hours * 3600: + task['status'] = TASK_FAILED + task['error'] = '任务因服务器重启或超时而中止' + task['updated_at'] = now.strftime('%Y-%m-%d %H:%M:%S') + self.save_task(task) + cleaned_count += 1 + app.logger.warning(f"清理了过时的任务 {task.get('id')},该任务已运行超过 {timeout_hours} 小时。") + except (ValueError, TypeError) as e: + app.logger.error(f"解析任务 {task.get('id')} 的 updated_at 时出错: {e}") + continue + + if cleaned_count > 0: + app.logger.info(f"清理完成,共处理了 {cleaned_count} 个过时的任务。") + else: + app.logger.info("没有发现需要清理的过时任务。") + + def delete_task(self, task_id): + """Safely delete a task file.""" + try: + task_file = self._get_task_path(task_id) + if task_file.exists(): + task_file.unlink() + return True + except Exception as e: + app.logger.error(f"Failed to delete task {task_id}: {e}") + return False + +# Instantiate the manager +AGENT_SESSIONS_DIR = os.path.join(os.path.dirname(__file__), '../../data/agent_sessions') +agent_session_manager = FileSessionManager(AGENT_SESSIONS_DIR) +agent_session_manager.cleanup_stale_tasks() +# --- End of new FileSessionManager implementation --- + + +# 智能体分析路由 +@app.route('/api/start_agent_analysis', methods=['POST']) +def start_agent_analysis(): + """启动智能体分析任务""" + try: + data = request.json + stock_code = data.get('stock_code') + research_depth = data.get('research_depth', 3) + market_type = data.get('market_type', 'A') + selected_analysts = data.get('selected_analysts', ["market", "social", "news", "fundamentals"]) + analysis_date = data.get('analysis_date') + enable_memory = data.get('enable_memory', True) + max_output_length = data.get('max_output_length', 2048) + + if not stock_code: + return jsonify({'error': '请提供股票代码'}), 400 + + # 创建新任务 + task_id = generate_task_id() + task = { + 'id': task_id, + 'status': TASK_PENDING, + 'progress': 0, + 'current_step': '任务已创建', + 'created_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'updated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + 'params': { + 'stock_code': stock_code, + 'research_depth': research_depth, + 'market_type': market_type, + 'selected_analysts': selected_analysts, + 'analysis_date': analysis_date, + 'enable_memory': enable_memory, + 'max_output_length': max_output_length + } + } + + # 为任务创建取消事件 + task['cancel_event'] = threading.Event() + agent_session_manager.save_task(task) + + def run_agent_analysis(): + """在后台线程中运行智能体分析""" + try: + from tradingagents.graph.trading_graph import TradingAgentsGraph + from tradingagents.default_config import DEFAULT_CONFIG + + update_task_status('agent_analysis', task_id, TASK_RUNNING, progress=5, result={'current_step': '正在初始化智能体...'}) + + # --- 修复 Start: 强制使用主应用的OpenAI代理配置 --- + config = DEFAULT_CONFIG.copy() + config['llm_provider'] = 'openai' + config['backend_url'] = os.getenv('OPENAI_API_URL') + main_model = os.getenv('OPENAI_API_MODEL', 'gpt-4o') + config['deep_think_llm'] = main_model + config['quick_think_llm'] = main_model + config['memory_enabled'] = enable_memory + config['max_tokens'] = max_output_length + + if not os.getenv('OPENAI_API_KEY'): + raise ValueError("主应用的 OPENAI_API_KEY 未在.env文件中设置") + + app.logger.info(f"强制使用主应用代理配置进行智能体分析: provider={config['llm_provider']}, url={config['backend_url']}, model={config['deep_think_llm']}") + + ta = TradingAgentsGraph( + selected_analysts=selected_analysts, + debug=True, + config=config + ) + # --- 修复 End --- + + def progress_callback(progress, step): + current_task = agent_session_manager.load_task(task_id) + if not current_task or current_task.get('status') == TASK_CANCELLED: + raise TaskCancelledException(f"任务 {task_id} 已被用户取消") + update_task_status('agent_analysis', task_id, TASK_RUNNING, progress=progress, result={'current_step': step}) + + today = analysis_date or datetime.now().strftime('%Y-%m-%d') + state, decision = ta.propagate(stock_code, today, market_type=market_type, progress_callback=progress_callback) + + # 修复:在任务完成时,获取并添加公司名称到最终结果中 + try: + stock_info = analyzer.get_stock_info(stock_code) + stock_name = stock_info.get('股票名称', '未知') + # 将公司名称添加到 state 字典中,前端将从这里读取 + if isinstance(state, dict): + state['company_name'] = stock_name + except Exception as e: + app.logger.error(f"为 {stock_code} 获取公司名称时出错: {e}") + if isinstance(state, dict): + state['company_name'] = '名称获取失败' + + update_task_status('agent_analysis', task_id, TASK_COMPLETED, progress=100, result={'decision': decision, 'final_state': state, 'current_step': '分析完成'}) + app.logger.info(f"智能体分析任务 {task_id} 完成") + + except TaskCancelledException as e: + app.logger.info(str(e)) + update_task_status('agent_analysis', task_id, TASK_FAILED, error='任务已被用户取消', result={'current_step': '任务已被用户取消'}) + except Exception as e: + app.logger.error(f"智能体分析任务 {task_id} 失败: {str(e)}") + app.logger.error(traceback.format_exc()) + update_task_status('agent_analysis', task_id, TASK_FAILED, error=str(e), result={'current_step': f'分析失败: {e}'}) + + thread = threading.Thread(target=run_agent_analysis) + thread.daemon = True + thread.start() + + return jsonify({ + 'task_id': task_id, + 'status': 'pending', + 'message': f'已启动对 {stock_code} 的智能体分析' + }) + + except Exception as e: + app.logger.error(f"启动智能体分析时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + +@app.route('/api/agent_analysis_status/', methods=['GET']) +def get_agent_analysis_status(task_id): + """获取智能体分析任务的状态""" + task = agent_session_manager.load_task(task_id) + + if not task: + return jsonify({'error': '找不到指定的智能体分析任务'}), 404 + + # 准备要返回的数据 + response_data = { + 'id': task['id'], + 'status': task['status'], + 'progress': task.get('progress', 0), + 'created_at': task['created_at'], + 'updated_at': task['updated_at'], + 'params': task.get('params', {}) + } + + if 'result' in task: + response_data['result'] = convert_messages_to_dict(task['result']) + if 'error' in task: + response_data['error'] = task['error'] + + return custom_jsonify(response_data) + + +@app.route('/api/agent_analysis_history', methods=['GET']) +def get_agent_analysis_history(): + """获取已完成的智能体分析任务历史""" + try: + all_tasks = agent_session_manager.get_all_tasks() + history = [ + task for task in all_tasks + if task.get('status') in [TASK_COMPLETED, TASK_FAILED] + ] + # 按更新时间排序,最新的在前 + history.sort(key=lambda x: x.get('updated_at', ''), reverse=True) + return custom_jsonify({'history': history}) + except Exception as e: + app.logger.error(f"获取分析历史时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/delete_agent_analysis', methods=['POST']) +def delete_agent_analysis(): + """Cancel and/or delete one or more agent analysis tasks.""" + try: + data = request.json + task_ids = data.get('task_ids', []) + if not isinstance(task_ids, list): + return jsonify({'error': 'task_ids 必须是一个列表'}), 400 + + if not task_ids: + return jsonify({'error': '请提供要删除的任务ID'}), 400 + + deleted_count = 0 + cancelled_count = 0 + + for task_id in task_ids: + task = agent_session_manager.load_task(task_id) + if not task: + app.logger.warning(f"尝试删除一个不存在的任务: {task_id}") + continue + + # If the task is running, mark it as cancelled + if task.get('status') == TASK_RUNNING: + task['status'] = TASK_CANCELLED + task['updated_at'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + task['error'] = '任务已被用户取消' + agent_session_manager.save_task(task) + cancelled_count += 1 + app.logger.info(f"任务 {task_id} 已被标记为取消。") + + # For all other states (or after cancelling), delete the task file + if agent_session_manager.delete_task(task_id): + deleted_count += 1 + + message = f"请求处理 {len(task_ids)} 个任务。已取消 {cancelled_count} 个运行中的任务,并删除了 {deleted_count} 个任务文件。" + app.logger.info(message) + return jsonify({'success': True, 'message': message}) + + except Exception as e: + app.logger.error(f"删除分析历史时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + + +@app.route('/api/active_tasks', methods=['GET']) +def get_active_tasks(): + """获取所有正在进行的智能体分析任务""" + try: + all_tasks = agent_session_manager.get_all_tasks() + active_tasks_list = [] + for task in all_tasks: + if task.get('status') == TASK_RUNNING: + task_info = { + 'task_id': task['id'], + 'stock_code': task.get('params', {}).get('stock_code'), + 'progress': task.get('progress', 0), + 'current_step': task.get('result', {}).get('current_step', '加载中...') + } + active_tasks_list.append(task_info) + # 按创建时间排序,最新的在前 + active_tasks_list.sort(key=lambda x: x.get('created_at', ''), reverse=True) + return custom_jsonify({'active_tasks': active_tasks_list}) + except Exception as e: + app.logger.error(f"获取活动任务时出错: {traceback.format_exc()}") + return jsonify({'error': str(e)}), 500 + + +# 在应用启动时启动清理线程(保持原有代码不变) +cleaner_thread = threading.Thread(target=run_task_cleaner) +cleaner_thread.daemon = True +cleaner_thread.start() + +if __name__ == '__main__': + # 强制禁用Flask的调试模式,以确保日志配置生效 app.run(host='0.0.0.0', port=int(os.getenv("PORT", "8888")), debug=False) \ No newline at end of file