Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| """ | |
| 替代方案批量股票分析程序 | |
| 尝试不同的请求参数组合来避免API服务端错误 | |
| """ | |
| import pandas as pd | |
| import requests | |
| import json | |
| import time | |
| from datetime import datetime | |
| from typing import Dict, List, Optional | |
| class AlternativeBatchAnalyzer: | |
| """替代方案批量分析器""" | |
| def __init__(self): | |
| self.api_url = "https://fromozu-stock-analysis.hf.space/api/v1/stock/analyze" | |
| self.api_key = "UZXJfw3YNX80DLfN" | |
| self.headers = { | |
| 'Content-Type': 'application/json', | |
| 'X-API-Key': self.api_key | |
| } | |
| # 不同的请求参数组合 | |
| self.request_variants = [ | |
| { | |
| "name": "完整分析", | |
| "params": { | |
| "analysis_depth": "full", | |
| "include_ai_analysis": True | |
| } | |
| }, | |
| { | |
| "name": "基础分析", | |
| "params": { | |
| "analysis_depth": "basic", | |
| "include_ai_analysis": False | |
| } | |
| }, | |
| { | |
| "name": "简化分析", | |
| "params": { | |
| "analysis_depth": "simple", | |
| "include_ai_analysis": False | |
| } | |
| }, | |
| { | |
| "name": "最小分析", | |
| "params": {} | |
| } | |
| ] | |
| def convert_stock_code(self, sec_id: str) -> str: | |
| """转换股票代码格式""" | |
| if pd.isna(sec_id) or not isinstance(sec_id, str): | |
| return "" | |
| if sec_id.endswith('.XSHE'): | |
| return sec_id.replace('.XSHE', '.SZ') | |
| elif sec_id.endswith('.XSHG'): | |
| return sec_id.replace('.XSHG', '.SH') | |
| else: | |
| return sec_id | |
| def test_api_variants(self, stock_code: str) -> Optional[Dict]: | |
| """测试不同的API请求参数组合""" | |
| print(f" 测试不同的请求参数组合...") | |
| for i, variant in enumerate(self.request_variants, 1): | |
| print(f" 尝试 {i}/{len(self.request_variants)}: {variant['name']}") | |
| # 构建请求参数 | |
| payload = { | |
| "stock_code": stock_code, | |
| "market_type": "A" | |
| } | |
| payload.update(variant['params']) | |
| try: | |
| response = requests.post( | |
| self.api_url, | |
| json=payload, | |
| headers=self.headers, | |
| timeout=30 | |
| ) | |
| print(f" 状态码: {response.status_code}") | |
| if response.status_code == 200: | |
| result = response.json() | |
| if result.get('success', False): | |
| print(f" ✅ 成功! 使用参数: {variant['name']}") | |
| return { | |
| 'success': True, | |
| 'data': result.get('data', {}), | |
| 'variant': variant['name'], | |
| 'params': variant['params'] | |
| } | |
| else: | |
| print(f" ❌ API返回失败: {result.get('message', '未知错误')}") | |
| elif response.status_code == 500: | |
| error_info = self._parse_500_error(response) | |
| print(f" ❌ 500错误: {error_info['message']}") | |
| else: | |
| print(f" ❌ HTTP错误: {response.status_code}") | |
| except Exception as e: | |
| print(f" ❌ 异常: {e}") | |
| # 短暂延迟 | |
| time.sleep(1) | |
| print(f" ❌ 所有参数组合都失败") | |
| return None | |
| def _parse_500_error(self, response) -> Dict: | |
| """解析500错误的详细信息""" | |
| try: | |
| error_data = response.json() | |
| error_details = error_data.get('error', {}) | |
| return { | |
| 'message': error_details.get('message', '服务器内部错误'), | |
| 'code': error_details.get('code', 'INTERNAL_SERVER_ERROR'), | |
| 'details': error_details.get('details', {}) | |
| } | |
| except: | |
| return { | |
| 'message': '服务器内部错误(无法解析错误信息)', | |
| 'code': 'PARSE_ERROR', | |
| 'details': {} | |
| } | |
| def analyze_with_fallback(self, stock_code: str, original_code: str) -> Dict: | |
| """使用降级策略分析股票""" | |
| # 首先尝试标准请求 | |
| standard_payload = { | |
| "stock_code": stock_code, | |
| "market_type": "A", | |
| "analysis_depth": "full", | |
| "include_ai_analysis": True | |
| } | |
| try: | |
| print(f" 尝试标准请求...") | |
| response = requests.post( | |
| self.api_url, | |
| json=standard_payload, | |
| headers=self.headers, | |
| timeout=30 | |
| ) | |
| if response.status_code == 200: | |
| result = response.json() | |
| if result.get('success', False): | |
| print(f" ✅ 标准请求成功") | |
| return self._extract_success_data(result, original_code, stock_code, "标准分析") | |
| print(f" ⚠️ 标准请求失败 (状态码: {response.status_code})") | |
| except Exception as e: | |
| print(f" ⚠️ 标准请求异常: {e}") | |
| # 如果标准请求失败,尝试其他参数组合 | |
| variant_result = self.test_api_variants(stock_code) | |
| if variant_result and variant_result['success']: | |
| return self._extract_success_data( | |
| {'data': variant_result['data']}, | |
| original_code, | |
| stock_code, | |
| variant_result['variant'] | |
| ) | |
| # 所有尝试都失败,返回失败记录 | |
| return self._create_failed_record(original_code, stock_code, "所有API请求都失败") | |
| def _extract_success_data(self, result: Dict, original_code: str, | |
| converted_code: str, analysis_type: str) -> Dict: | |
| """从成功的API响应中提取数据""" | |
| try: | |
| data = result.get('data', {}) | |
| basic_info = data.get('basic_info', {}) | |
| scores = data.get('scores', {}) | |
| risk_assessment = data.get('risk_assessment', {}) | |
| return { | |
| 'original_code': original_code, | |
| 'converted_code': converted_code, | |
| 'stock_name': basic_info.get('name', ''), | |
| 'current_price': basic_info.get('current_price', 0), | |
| 'change_percent': basic_info.get('change_percent', 0), | |
| 'overall_score': scores.get('overall_score', 0), | |
| 'technical_score': scores.get('technical_score', 0), | |
| 'fundamental_score': scores.get('fundamental_score', 0), | |
| 'risk_score': scores.get('risk_score', 0), | |
| 'risk_level': risk_assessment.get('risk_level', ''), | |
| 'analysis_type': analysis_type, | |
| 'analysis_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
| 'status': 'success' | |
| } | |
| except Exception as e: | |
| print(f" ⚠️ 数据提取失败: {e}") | |
| return self._create_failed_record(original_code, converted_code, f"数据提取失败: {str(e)}") | |
| def _create_failed_record(self, original_code: str, converted_code: str, error_msg: str) -> Dict: | |
| """创建失败记录""" | |
| return { | |
| 'original_code': original_code, | |
| 'converted_code': converted_code, | |
| 'stock_name': '分析失败', | |
| 'current_price': 0, | |
| 'change_percent': 0, | |
| 'overall_score': 0, | |
| 'technical_score': 0, | |
| 'fundamental_score': 0, | |
| 'risk_score': 0, | |
| 'risk_level': '未知', | |
| 'analysis_type': '失败', | |
| 'analysis_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
| 'status': 'failed', | |
| 'error_message': error_msg | |
| } | |
| def process_csv_file(self, csv_file: str) -> bool: | |
| """处理CSV文件""" | |
| print(f"=== 替代方案批量股票分析程序 ===") | |
| print(f"开始时间: {datetime.now()}") | |
| print(f"策略: 尝试多种请求参数组合") | |
| # 读取CSV文件 | |
| try: | |
| df = pd.read_csv(csv_file) | |
| print(f"✅ 成功读取CSV文件: {len(df)} 行") | |
| if 'secID' not in df.columns: | |
| print("❌ 错误: 未找到 'secID' 列") | |
| return False | |
| stocks = df[df['secID'].notna() & (df['secID'] != '')]['secID'].tolist() | |
| print(f"有效股票代码: {len(stocks)} 个") | |
| except Exception as e: | |
| print(f"❌ CSV文件读取失败: {e}") | |
| return False | |
| if not stocks: | |
| print("❌ 未找到有效的股票代码") | |
| return False | |
| # 批量分析 | |
| print(f"\n开始批量分析...") | |
| results = [] | |
| success_count = 0 | |
| for i, original_code in enumerate(stocks, 1): | |
| print(f"\n[{i}/{len(stocks)}] 处理: {original_code}") | |
| # 转换代码 | |
| converted_code = self.convert_stock_code(original_code) | |
| print(f" 转换为: {converted_code}") | |
| if not converted_code: | |
| print(" ❌ 代码转换失败") | |
| results.append(self._create_failed_record(original_code, "", "代码转换失败")) | |
| continue | |
| # 分析股票 | |
| result = self.analyze_with_fallback(converted_code, original_code) | |
| results.append(result) | |
| # 显示结果 | |
| if result['status'] == 'success': | |
| success_count += 1 | |
| print(f" ✅ 成功 - 类型: {result['analysis_type']}, 评分: {result['overall_score']}") | |
| else: | |
| print(f" ❌ 失败 - {result['error_message']}") | |
| # 延迟避免限流 | |
| if i < len(stocks): | |
| time.sleep(3) # 增加延迟 | |
| # 保存结果 | |
| self._save_results(results) | |
| # 显示统计 | |
| total = len(results) | |
| success_rate = (success_count / total * 100) if total > 0 else 0 | |
| print(f"\n=== 分析完成 ===") | |
| print(f"总股票数: {total}") | |
| print(f"成功: {success_count}") | |
| print(f"失败: {total - success_count}") | |
| print(f"成功率: {success_rate:.1f}%") | |
| return True | |
| def _save_results(self, results: List[Dict]): | |
| """保存分析结果""" | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| # 保存所有结果 | |
| output_file = f"alternative_batch_analysis_{timestamp}.csv" | |
| results_df = pd.DataFrame(results) | |
| results_df.to_csv(output_file, index=False, encoding='utf-8-sig') | |
| print(f"\n✅ 结果已保存: {output_file}") | |
| # 分析成功的分析类型 | |
| success_results = [r for r in results if r['status'] == 'success'] | |
| if success_results: | |
| analysis_types = {} | |
| for r in success_results: | |
| analysis_type = r.get('analysis_type', '未知') | |
| analysis_types[analysis_type] = analysis_types.get(analysis_type, 0) + 1 | |
| print(f"\n📊 成功分析类型统计:") | |
| for analysis_type, count in analysis_types.items(): | |
| print(f" {analysis_type}: {count} 只股票") | |
| def main(): | |
| """主函数""" | |
| analyzer = AlternativeBatchAnalyzer() | |
| # 检查CSV文件 | |
| csv_file = "list3.csv" | |
| if not pd.io.common.file_exists(csv_file): | |
| print(f"❌ CSV文件不存在: {csv_file}") | |
| return | |
| # 开始分析 | |
| success = analyzer.process_csv_file(csv_file) | |
| if success: | |
| print(f"\n🎉 替代方案分析完成!") | |
| else: | |
| print(f"\n❌ 替代方案分析失败!") | |
| if __name__ == "__main__": | |
| main() | |