stock-analysis / api_integration.py
fromozu's picture
Auto-deploy from GitHub Actions - 56b35f1
03591d9 verified
# -*- coding: utf-8 -*-
"""
API集成模块
将新的API功能集成到现有的Flask应用中
"""
import logging
import time
from typing import Dict, Any
from flask import Flask
# 导入API模块
from api_endpoints import api_v1, init_analyzers
from rate_limiter import start_cleanup_scheduler
from api_cache_integration import preload_cache_for_popular_stocks
from auth_middleware import api_key_manager
logger = logging.getLogger(__name__)
def register_api_endpoints(app: Flask):
"""注册API端点到Flask应用"""
try:
# 注册API蓝图
app.register_blueprint(api_v1)
logger.info("API v1端点已注册")
# 初始化分析器 - 尝试从多个来源获取分析器实例
from stock_analyzer import StockAnalyzer
from risk_monitor import RiskMonitor
from fundamental_analyzer import FundamentalAnalyzer
# 尝试从app对象获取分析器实例
analyzer = getattr(app, 'analyzer', None)
risk_monitor_instance = getattr(app, 'risk_monitor', None)
fundamental_analyzer_instance = getattr(app, 'fundamental_analyzer', None)
# 如果app对象中没有,尝试从全局变量获取
if analyzer is None:
try:
import web_server
analyzer = getattr(web_server, 'analyzer', None)
risk_monitor_instance = getattr(web_server, 'risk_monitor', None)
fundamental_analyzer_instance = getattr(web_server, 'fundamental_analyzer', None)
logger.info("从web_server模块获取分析器实例")
except Exception as e:
logger.warning(f"无法从web_server模块获取分析器: {e}")
# 如果仍然没有,创建新的实例
if analyzer is None:
analyzer = StockAnalyzer()
logger.info("创建新的StockAnalyzer实例")
if risk_monitor_instance is None:
risk_monitor_instance = RiskMonitor(analyzer)
logger.info("创建新的RiskMonitor实例")
if fundamental_analyzer_instance is None:
fundamental_analyzer_instance = FundamentalAnalyzer()
logger.info("创建新的FundamentalAnalyzer实例")
# 初始化API端点的分析器
init_analyzers(analyzer, risk_monitor_instance, fundamental_analyzer_instance)
logger.info("API分析器已初始化")
return True
except Exception as e:
logger.error(f"注册API端点失败: {e}")
import traceback
logger.error(f"错误堆栈: {traceback.format_exc()}")
return False
def setup_api_middleware(app: Flask):
"""设置API中间件"""
try:
# 启动限流器清理调度器
start_cleanup_scheduler()
logger.info("限流器清理调度器已启动")
# 移除自动预加载热门股票缓存,避免系统启动时的API调用
# 如需预加载,可通过API手动触发:POST /api/v1/cache/preload
# import threading
# def preload_cache():
# try:
# preload_cache_for_popular_stocks()
# except Exception as e:
# logger.error(f"预加载缓存失败: {e}")
#
# cache_thread = threading.Thread(target=preload_cache, daemon=True)
# cache_thread.start()
logger.info("缓存预加载已禁用,系统启动更快")
return True
except Exception as e:
logger.error(f"设置API中间件失败: {e}")
return False
def add_api_routes_to_existing_app(app: Flask):
"""将API路由添加到现有应用"""
# 添加API状态检查端点
@app.route('/api/v1/health', methods=['GET'])
def api_health_check():
"""API健康检查"""
from api_response import APIResponse
from api_cache_integration import get_cache_statistics
try:
# 检查各个组件状态
status = {
'api_version': '1.0.0',
'status': 'healthy',
'components': {
'database': check_database_status(),
'cache': check_cache_status(),
'analyzers': check_analyzers_status()
},
'cache_statistics': get_cache_statistics()
}
return APIResponse.success(data=status)
except Exception as e:
logger.error(f"API健康检查失败: {e}")
from api_response import ErrorCodes
return APIResponse.error(
code=ErrorCodes.INTERNAL_SERVER_ERROR,
message='健康检查失败',
details={'error': str(e)},
status_code=500
)
# 添加API密钥管理端点(仅限管理员)
@app.route('/api/v1/admin/api-keys', methods=['POST'])
def generate_api_key():
"""生成新的API密钥(管理员功能)"""
from flask import request
from api_response import APIResponse, ErrorCodes
from auth_middleware import require_api_key
# 这里应该添加管理员权限检查
admin_key = request.headers.get('X-Admin-Key')
if admin_key != app.config.get('ADMIN_KEY'):
return APIResponse.error(
code=ErrorCodes.INSUFFICIENT_PERMISSIONS,
message='需要管理员权限',
status_code=403
)
try:
data = request.get_json() or {}
tier = data.get('tier', 'free')
permissions = data.get('permissions', ['stock_analysis'])
new_api_key = api_key_manager.generate_api_key(tier, permissions)
return APIResponse.success(data={
'api_key': new_api_key,
'tier': tier,
'permissions': permissions,
'created_at': time.strftime('%Y-%m-%dT%H:%M:%SZ')
})
except Exception as e:
logger.error(f"生成API密钥失败: {e}")
return APIResponse.error(
code=ErrorCodes.INTERNAL_SERVER_ERROR,
message='生成API密钥失败',
details={'error': str(e)},
status_code=500
)
# 添加缓存管理端点
@app.route('/api/v1/admin/cache/clear', methods=['POST'])
def clear_api_cache():
"""清除API缓存(管理员功能)"""
from flask import request
from api_response import APIResponse, ErrorCodes
from api_cache_integration import smart_cache_invalidation
admin_key = request.headers.get('X-Admin-Key')
if admin_key != app.config.get('ADMIN_KEY'):
return APIResponse.error(
code=ErrorCodes.INSUFFICIENT_PERMISSIONS,
message='需要管理员权限',
status_code=403
)
try:
data = request.get_json() or {}
stock_codes = data.get('stock_codes')
cache_types = data.get('cache_types')
smart_cache_invalidation(stock_codes, cache_types)
return APIResponse.success(data={
'message': '缓存清除完成',
'cleared_stock_codes': stock_codes,
'cleared_cache_types': cache_types
})
except Exception as e:
logger.error(f"清除缓存失败: {e}")
return APIResponse.error(
code=ErrorCodes.INTERNAL_SERVER_ERROR,
message='清除缓存失败',
details={'error': str(e)},
status_code=500
)
logger.info("API管理端点已添加")
def check_database_status() -> Dict:
"""检查数据库状态"""
try:
from database import test_connection, USE_DATABASE
if USE_DATABASE:
connected = test_connection()
return {'status': 'connected' if connected else 'disconnected', 'enabled': True}
else:
return {'status': 'disabled', 'enabled': False}
except Exception as e:
return {'status': 'error', 'error': str(e), 'enabled': False}
def check_cache_status() -> Dict:
"""检查缓存状态"""
try:
from api_cache_integration import get_cache_statistics
stats = get_cache_statistics()
return {'status': 'active', 'statistics': stats}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def check_analyzers_status() -> Dict:
"""检查分析器状态"""
try:
# 简单测试分析器是否可用
from stock_analyzer import StockAnalyzer
analyzer = StockAnalyzer()
# 尝试获取一个简单的股票信息
test_result = analyzer.get_stock_info('000001.SZ')
return {
'status': 'active',
'test_result': bool(test_result)
}
except Exception as e:
return {'status': 'error', 'error': str(e)}
def integrate_api_with_existing_app(app: Flask) -> bool:
"""将API功能集成到现有应用"""
try:
logger.info("开始集成API功能到现有应用")
# 1. 注册API端点
if not register_api_endpoints(app):
logger.error("注册API端点失败")
return False
# 2. 设置中间件
if not setup_api_middleware(app):
logger.error("设置API中间件失败")
return False
# 3. 添加管理端点
add_api_routes_to_existing_app(app)
# 4. 更新应用配置
app.config.setdefault('API_ENABLED', True)
app.config.setdefault('API_VERSION', '1.0.0')
logger.info("API功能集成完成")
return True
except Exception as e:
logger.error(f"API功能集成失败: {e}")
return False
# 使用示例
def example_usage():
"""使用示例"""
print("""
# 在现有的web_server.py中添加以下代码来集成API功能:
from api_integration import integrate_api_with_existing_app
# 在创建Flask应用后
app = Flask(__name__)
# 集成API功能
if integrate_api_with_existing_app(app):
print("API功能集成成功")
else:
print("API功能集成失败")
# API端点将在以下路径可用:
# POST /api/v1/portfolio/analyze - 投资组合分析
# POST /api/v1/stock/analyze - 个股分析
# POST /api/v1/stocks/batch-score - 批量股票评分
# POST /api/v1/tasks - 创建异步任务
# GET /api/v1/tasks/{task_id} - 查询任务状态
# GET /api/v1/tasks/{task_id}/result - 获取任务结果
# GET /api/v1/health - API健康检查
""")