Spaces:
Build error
Build error
| """ | |
| 错误分析器实现 | |
| 负责分析日志、识别错误类型和根本原因 | |
| """ | |
| import re | |
| import asyncio | |
| import logging | |
| from typing import List, Dict, Any, Tuple, Optional | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from core_system import ErrorAnalyzer, ErrorInfo, ErrorType | |
| class ErrorPattern: | |
| """错误模式定义""" | |
| regex: re.Pattern | |
| error_type: ErrorType | |
| confidence: float | |
| description: str | |
| common_causes: List[str] | |
| suggested_fixes: List[str] | |
| class LogAnalyzer: | |
| """日志分析器""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| def extract_error_context(self, logs: str, error_line: int, context_size: int = 5) -> Dict[str, Any]: | |
| """提取错误上下文""" | |
| lines = logs.split('\n') | |
| start = max(0, error_line - context_size) | |
| end = min(len(lines), error_line + context_size + 1) | |
| return { | |
| "before": lines[start:error_line], | |
| "error_line": lines[error_line] if error_line < len(lines) else "", | |
| "after": lines[error_line + 1:end], | |
| "full_context": lines[start:end], | |
| "relative_line": error_line - start | |
| } | |
| def detect_error_sequence(self, logs: str) -> List[str]: | |
| """检测错误序列""" | |
| lines = logs.split('\n') | |
| error_sequence = [] | |
| for line in lines: | |
| if any(keyword in line.lower() for keyword in ['error', 'failed', 'exception', 'traceback']): | |
| error_sequence.append(line.strip()) | |
| return error_sequence | |
| def find_related_errors(self, logs: str, main_error: ErrorInfo) -> List[ErrorInfo]: | |
| """查找相关错误""" | |
| related_errors = [] | |
| lines = logs.split('\n') | |
| # 在主错误附近查找相关错误 | |
| if main_error.line_number: | |
| start = max(0, main_error.line_number - 10) | |
| end = min(len(lines), main_error.line_number + 10) | |
| for i, line in enumerate(lines[start:end], start): | |
| if i != main_error.line_number and 'error' in line.lower(): | |
| related_error = ErrorInfo( | |
| error_type=ErrorType.UNKNOWN_ERROR, | |
| message=line.strip(), | |
| log_snippet=line.strip(), | |
| line_number=i, | |
| confidence=0.5 | |
| ) | |
| related_errors.append(related_error) | |
| return related_errors | |
| class IntelligentErrorAnalyzer(ErrorAnalyzer): | |
| """智能错误分析器""" | |
| def __init__(self): | |
| self.logger = logging.getLogger(__name__) | |
| self.log_analyzer = LogAnalyzer() | |
| self.error_patterns = self._initialize_patterns() | |
| self.context_analyzers = { | |
| ErrorType.DOCKERFILE_SYNTAX: DockerfileSyntaxAnalyzer(), | |
| ErrorType.DEPENDENCY_INSTALL: DependencyErrorAnalyzer(), | |
| ErrorType.ENVIRONMENT_CONFIG: EnvironmentErrorAnalyzer(), | |
| ErrorType.PORT_CONFLICT: PortErrorAnalyzer(), | |
| ErrorType.PERMISSION_ERROR: PermissionErrorAnalyzer(), | |
| ErrorType.NETWORK_CONNECTION: NetworkErrorAnalyzer(), | |
| ErrorType.TIMEOUT_ERROR: TimeoutErrorAnalyzer(), | |
| ErrorType.RESOURCE_EXCEEDED: ResourceErrorAnalyzer() | |
| } | |
| async def analyze_logs(self, logs: str) -> List[ErrorInfo]: | |
| """分析日志并识别错误""" | |
| errors = [] | |
| # 首先使用正则模式进行快速匹配 | |
| pattern_errors = await self._pattern_matching(logs) | |
| errors.extend(pattern_errors) | |
| # 然后使用上下文分析器进行深度分析 | |
| context_errors = await self._context_analysis(logs) | |
| errors.extend(context_errors) | |
| # 去重和合并相似错误 | |
| deduplicated_errors = self._deduplicate_errors(errors) | |
| # 计算最终置信度 | |
| final_errors = self._calculate_final_confidence(deduplicated_errors, logs) | |
| return final_errors | |
| async def classify_error(self, error_message: str) -> ErrorType: | |
| """分类错误类型""" | |
| max_confidence = 0.0 | |
| best_type = ErrorType.UNKNOWN_ERROR | |
| for pattern in self.error_patterns: | |
| if pattern.regex.search(error_message): | |
| if pattern.confidence > max_confidence: | |
| max_confidence = pattern.confidence | |
| best_type = pattern.error_type | |
| return best_type | |
| async def _pattern_matching(self, logs: str) -> List[ErrorInfo]: | |
| """基于模式的错误匹配""" | |
| errors = [] | |
| lines = logs.split('\n') | |
| for line_num, line in enumerate(lines, 1): | |
| for pattern in self.error_patterns: | |
| if pattern.regex.search(line): | |
| error_info = ErrorInfo( | |
| error_type=pattern.error_type, | |
| message=line.strip(), | |
| log_snippet=line.strip(), | |
| line_number=line_num, | |
| confidence=pattern.confidence, | |
| context={ | |
| "description": pattern.description, | |
| "common_causes": pattern.common_causes, | |
| "suggested_fixes": pattern.suggested_fixes | |
| } | |
| ) | |
| errors.append(error_info) | |
| return errors | |
| async def _context_analysis(self, logs: str) -> List[ErrorInfo]: | |
| """上下文感知的错误分析""" | |
| errors = [] | |
| for error_type, analyzer in self.context_analyzers.items(): | |
| try: | |
| type_errors = await analyzer.analyze(logs) | |
| errors.extend(type_errors) | |
| except Exception as e: | |
| self.logger.error(f"上下文分析器 {error_type} 执行失败: {e}") | |
| return errors | |
| def _deduplicate_errors(self, errors: List[ErrorInfo]) -> List[ErrorInfo]: | |
| """去重错误""" | |
| if not errors: | |
| return [] | |
| # 按行号和错误类型去重 | |
| seen = set() | |
| deduplicated = [] | |
| for error in errors: | |
| key = (error.line_number, error.error_type) | |
| if key not in seen: | |
| seen.add(key) | |
| deduplicated.append(error) | |
| return deduplicated | |
| def _calculate_final_confidence(self, errors: List[ErrorInfo], logs: str) -> List[ErrorInfo]: | |
| """计算最终置信度""" | |
| for error in errors: | |
| # 基于多种因素调整置信度 | |
| base_confidence = error.confidence | |
| # 如果错误信息中包含具体的技术关键词,提高置信度 | |
| tech_keywords = ['docker', 'pip', 'npm', 'apt', 'python', 'node'] | |
| keyword_boost = sum(0.1 for keyword in tech_keywords if keyword in error.message.lower()) | |
| # 如果错误在日志的末尾(最近的错误),提高置信度 | |
| lines = logs.split('\n') | |
| position_factor = (error.line_number or 0) / len(lines) if len(lines) > 0 else 0.5 | |
| recent_boost = (1 - position_factor) * 0.2 | |
| # 计算最终置信度 | |
| final_confidence = min(1.0, base_confidence + keyword_boost + recent_boost) | |
| error.confidence = final_confidence | |
| return errors | |
| def _initialize_patterns(self) -> List[ErrorPattern]: | |
| """初始化错误模式""" | |
| patterns = [ | |
| # Dockerfile 语法错误 | |
| ErrorPattern( | |
| regex=re.compile(r"failed to solve:.*syntax error|Dockerfile:\d+"), | |
| error_type=ErrorType.DOCKERFILE_SYNTAX, | |
| confidence=0.9, | |
| description="Dockerfile 语法错误", | |
| common_causes=["命令格式错误", "参数缺失", "缩进问题"], | |
| suggested_fixes=["检查命令语法", "验证参数", "修复格式"] | |
| ), | |
| # 依赖安装失败 | |
| ErrorPattern( | |
| regex=re.compile(r"ERROR: Could not find a version|No matching distribution|pip install failed"), | |
| error_type=ErrorType.DEPENDENCY_INSTALL, | |
| confidence=0.85, | |
| description="Python 依赖安装失败", | |
| common_causes=["版本不存在", "网络问题", "依赖冲突"], | |
| suggested_fixes=["检查版本", "更换源", "解决冲突"] | |
| ), | |
| # Node.js 依赖安装失败 | |
| ErrorPattern( | |
| regex=re.compile(r"npm ERR!|yarn error|failed to install node packages"), | |
| error_type=ErrorType.DEPENDENCY_INSTALL, | |
| confidence=0.85, | |
| description="Node.js 依赖安装失败", | |
| common_causes=["版本冲突", "网络问题", "缓存问题"], | |
| suggested_fixes=["清理缓存", "检查版本", "使用国内源"] | |
| ), | |
| # 环境变量配置问题 | |
| ErrorPattern( | |
| regex=re.compile(r"Environment variable.*not found|ENV.*undefined|getenv.*None"), | |
| error_type=ErrorType.ENVIRONMENT_CONFIG, | |
| confidence=0.8, | |
| description="环境变量配置问题", | |
| common_causes=["变量未设置", "配置文件缺失", "权限问题"], | |
| suggested_fixes=["设置环境变量", "创建配置文件", "检查权限"] | |
| ), | |
| # 端口冲突 | |
| ErrorPattern( | |
| regex=re.compile(r"Address already in use|Port.*already used|EADDRINUSE"), | |
| error_type=ErrorType.PORT_CONFLICT, | |
| confidence=0.95, | |
| description="端口冲突", | |
| common_causes=["端口被占用", "权限不足", "配置错误"], | |
| suggested_fixes=["更换端口", "杀死占用进程", "修改配置"] | |
| ), | |
| # 权限问题 | |
| ErrorPattern( | |
| regex=re.compile(r"Permission denied|Operation not permitted|EACCES"), | |
| error_type=ErrorType.PERMISSION_ERROR, | |
| confidence=0.9, | |
| description="权限不足", | |
| common_causes=["文件权限", "用户权限", "目录权限"], | |
| suggested_fixes=["修改权限", "使用 sudo", "更改用户"] | |
| ), | |
| # 网络连接问题 | |
| ErrorPattern( | |
| regex=re.compile(r"Connection refused|Network unreachable|Timeout|DNS resolution failed"), | |
| error_type=ErrorType.NETWORK_CONNECTION, | |
| confidence=0.8, | |
| description="网络连接问题", | |
| common_causes=["网络不可达", "DNS问题", "防火墙限制"], | |
| suggested_fixes=["检查网络", "配置DNS", "调整防火墙"] | |
| ), | |
| # 超时错误 | |
| ErrorPattern( | |
| regex=re.compile(r"timeout|timed out|deadline exceeded"), | |
| error_type=ErrorType.TIMEOUT_ERROR, | |
| confidence=0.75, | |
| description="操作超时", | |
| common_causes=["操作时间过长", "资源不足", "网络延迟"], | |
| suggested_fixes["增加超时时间", "优化性能", "检查资源"] | |
| ), | |
| # 资源超限 | |
| ErrorPattern( | |
| regex=re.compile(r"out of memory|disk full|CPU limit exceeded|resource exceeded"), | |
| error_type=ErrorType.RESOURCE_EXCEEDED, | |
| confidence=0.9, | |
| description="资源超限", | |
| common_causes=["内存不足", "磁盘满", "CPU限制"], | |
| suggested_fixes=["清理资源", "增加配额", "优化代码"] | |
| ) | |
| ] | |
| return patterns | |
| class ContextAnalyzer(ABC): | |
| """上下文分析器基类""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| """分析日志""" | |
| pass | |
| class DockerfileSyntaxAnalyzer(ContextAnalyzer): | |
| """Dockerfile 语法分析器""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| errors = [] | |
| # 分析 Dockerfile 特有的语法错误 | |
| dockerfile_errors = [ | |
| (r"FROM.*invalid", "FROM 指令格式错误"), | |
| (r"RUN.*command not found", "RUN 命令执行失败"), | |
| (r"COPY.*No such file", "COPY 源文件不存在"), | |
| (r"EXPOSE.*invalid port", "EXPOSE 端口格式错误"), | |
| (r"ENV.*invalid format", "ENV 环境变量格式错误") | |
| ] | |
| for pattern, description in dockerfile_errors: | |
| if re.search(pattern, logs, re.IGNORECASE): | |
| error_info = ErrorInfo( | |
| error_type=ErrorType.DOCKERFILE_SYNTAX, | |
| message=description, | |
| log_snippet="", | |
| confidence=0.8, | |
| context={"analysis_type": "dockerfile_syntax"} | |
| ) | |
| errors.append(error_info) | |
| return errors | |
| class DependencyErrorAnalyzer(ContextAnalyzer): | |
| """依赖错误分析器""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| errors = [] | |
| # Python 依赖问题 | |
| python_patterns = [ | |
| (r"pip.*Requirement already satisfied", "依赖重复安装"), | |
| (r"pip.*Could not find.*version", "依赖版本不存在"), | |
| (r"pip.*incompatible dependencies", "依赖版本冲突") | |
| ] | |
| # Node.js 依赖问题 | |
| node_patterns = [ | |
| (r"npm.*peer dependency", "peer 依赖问题"), | |
| (r"npm.*version mismatch", "版本不匹配"), | |
| (r"npm.*cache problem", "npm 缓存问题") | |
| ] | |
| all_patterns = python_patterns + node_patterns | |
| for pattern, description in all_patterns: | |
| if re.search(pattern, logs, re.IGNORECASE): | |
| error_info = ErrorInfo( | |
| error_type=ErrorType.DEPENDENCY_INSTALL, | |
| message=description, | |
| log_snippet="", | |
| confidence=0.75, | |
| context={"analysis_type": "dependency"} | |
| ) | |
| errors.append(error_info) | |
| return errors | |
| class EnvironmentErrorAnalyzer(ContextAnalyzer): | |
| """环境错误分析器""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| errors = [] | |
| # 环境变量问题 | |
| if re.search(r"PATH.*not found", logs, re.IGNORECASE): | |
| error_info = ErrorInfo( | |
| error_type=ErrorType.ENVIRONMENT_CONFIG, | |
| message="PATH 环境变量配置问题", | |
| log_snippet="", | |
| confidence=0.8, | |
| context={"analysis_type": "environment", "var_type": "PATH"} | |
| ) | |
| errors.append(error_info) | |
| return errors | |
| class PortErrorAnalyzer(ContextAnalyzer): | |
| """端口错误分析器""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| errors = [] | |
| # 检测常见的 HuggingFace Spaces 端口问题 | |
| if re.search(r"port.*7860", logs, re.IGNORECASE) and re.search(r"error|failed", logs, re.IGNORECASE): | |
| error_info = ErrorInfo( | |
| error_type=ErrorType.PORT_CONFLICT, | |
| message="HuggingFace Spaces 默认端口 7860 问题", | |
| log_snippet="", | |
| confidence=0.9, | |
| context={"analysis_type": "port", "port": "7860"} | |
| ) | |
| errors.append(error_info) | |
| return errors | |
| class PermissionErrorAnalyzer(ContextAnalyzer): | |
| """权限错误分析器""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| errors = [] | |
| # 检测文件权限问题 | |
| if re.search(r"permission denied.*\.py|\.js|\.sh", logs, re.IGNORECASE): | |
| error_info = ErrorInfo( | |
| error_type=ErrorType.PERMISSION_ERROR, | |
| message="脚本文件权限问题", | |
| log_snippet="", | |
| confidence=0.8, | |
| context={"analysis_type": "permission", "file_type": "script"} | |
| ) | |
| errors.append(error_info) | |
| return errors | |
| class NetworkErrorAnalyzer(ContextAnalyzer): | |
| """网络错误分析器""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| errors = [] | |
| # 检测网络连接问题 | |
| network_indicators = [ | |
| (r"github\.com.*timeout", "GitHub 连接超时"), | |
| (r"pypi\.org.*failed", "PyPI 连接失败"), | |
| (r"npm\.registry.*error", "npm registry 连接错误") | |
| ] | |
| for pattern, description in network_indicators: | |
| if re.search(pattern, logs, re.IGNORECASE): | |
| error_info = ErrorInfo( | |
| error_type=ErrorType.NETWORK_CONNECTION, | |
| message=description, | |
| log_snippet="", | |
| confidence=0.7, | |
| context={"analysis_type": "network", "service": pattern.split('.')[0]} | |
| ) | |
| errors.append(error_info) | |
| return errors | |
| class TimeoutErrorAnalyzer(ContextAnalyzer): | |
| """超时错误分析器""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| errors = [] | |
| # 检测不同类型的超时 | |
| timeout_patterns = [ | |
| (r"build.*timeout", "构建超时"), | |
| (r"install.*timeout", "安装超时"), | |
| (r"download.*timeout", "下载超时") | |
| ] | |
| for pattern, description in timeout_patterns: | |
| if re.search(pattern, logs, re.IGNORECASE): | |
| error_info = ErrorInfo( | |
| error_type=ErrorType.TIMEOUT_ERROR, | |
| message=description, | |
| log_snippet="", | |
| confidence=0.8, | |
| context={"analysis_type": "timeout", "operation": pattern.split('.')[0]} | |
| ) | |
| errors.append(error_info) | |
| return errors | |
| class ResourceErrorAnalyzer(ContextAnalyzer): | |
| """资源错误分析器""" | |
| async def analyze(self, logs: str) -> List[ErrorInfo]: | |
| errors = [] | |
| # 检测资源限制问题 | |
| resource_patterns = [ | |
| (r"memory.*limit", "内存限制"), | |
| (r"disk.*space", "磁盘空间不足"), | |
| (r"cpu.*quota", "CPU 配额限制") | |
| ] | |
| for pattern, description in resource_patterns: | |
| if re.search(pattern, logs, re.IGNORECASE): | |
| error_info = ErrorInfo( | |
| error_type=ErrorType.RESOURCE_EXCEEDED, | |
| message=description, | |
| log_snippet="", | |
| confidence=0.8, | |
| context={"analysis_type": "resource", "resource_type": pattern.split('.')[0]} | |
| ) | |
| errors.append(error_info) | |
| return errors |