""" 错误分析器实现 负责分析日志、识别错误类型和根本原因 """ import re import asyncio import logging from typing import List, Dict, Any, Tuple, Optional from dataclasses import dataclass from datetime import datetime from core_system import ErrorAnalyzer, ErrorInfo, ErrorType @dataclass class ErrorPattern: """错误模式定义""" regex: re.Pattern error_type: ErrorType confidence: float description: str common_causes: List[str] suggested_fixes: List[str] class LogAnalyzer: """日志分析器""" def __init__(self): self.logger = logging.getLogger(__name__) def extract_error_context(self, logs: str, error_line: int, context_size: int = 5) -> Dict[str, Any]: """提取错误上下文""" lines = logs.split('\n') start = max(0, error_line - context_size) end = min(len(lines), error_line + context_size + 1) return { "before": lines[start:error_line], "error_line": lines[error_line] if error_line < len(lines) else "", "after": lines[error_line + 1:end], "full_context": lines[start:end], "relative_line": error_line - start } def detect_error_sequence(self, logs: str) -> List[str]: """检测错误序列""" lines = logs.split('\n') error_sequence = [] for line in lines: if any(keyword in line.lower() for keyword in ['error', 'failed', 'exception', 'traceback']): error_sequence.append(line.strip()) return error_sequence def find_related_errors(self, logs: str, main_error: ErrorInfo) -> List[ErrorInfo]: """查找相关错误""" related_errors = [] lines = logs.split('\n') # 在主错误附近查找相关错误 if main_error.line_number: start = max(0, main_error.line_number - 10) end = min(len(lines), main_error.line_number + 10) for i, line in enumerate(lines[start:end], start): if i != main_error.line_number and 'error' in line.lower(): related_error = ErrorInfo( error_type=ErrorType.UNKNOWN_ERROR, message=line.strip(), log_snippet=line.strip(), line_number=i, confidence=0.5 ) related_errors.append(related_error) return related_errors class IntelligentErrorAnalyzer(ErrorAnalyzer): """智能错误分析器""" def __init__(self): self.logger = logging.getLogger(__name__) self.log_analyzer = LogAnalyzer() self.error_patterns = self._initialize_patterns() self.context_analyzers = { ErrorType.DOCKERFILE_SYNTAX: DockerfileSyntaxAnalyzer(), ErrorType.DEPENDENCY_INSTALL: DependencyErrorAnalyzer(), ErrorType.ENVIRONMENT_CONFIG: EnvironmentErrorAnalyzer(), ErrorType.PORT_CONFLICT: PortErrorAnalyzer(), ErrorType.PERMISSION_ERROR: PermissionErrorAnalyzer(), ErrorType.NETWORK_CONNECTION: NetworkErrorAnalyzer(), ErrorType.TIMEOUT_ERROR: TimeoutErrorAnalyzer(), ErrorType.RESOURCE_EXCEEDED: ResourceErrorAnalyzer() } async def analyze_logs(self, logs: str) -> List[ErrorInfo]: """分析日志并识别错误""" errors = [] # 首先使用正则模式进行快速匹配 pattern_errors = await self._pattern_matching(logs) errors.extend(pattern_errors) # 然后使用上下文分析器进行深度分析 context_errors = await self._context_analysis(logs) errors.extend(context_errors) # 去重和合并相似错误 deduplicated_errors = self._deduplicate_errors(errors) # 计算最终置信度 final_errors = self._calculate_final_confidence(deduplicated_errors, logs) return final_errors async def classify_error(self, error_message: str) -> ErrorType: """分类错误类型""" max_confidence = 0.0 best_type = ErrorType.UNKNOWN_ERROR for pattern in self.error_patterns: if pattern.regex.search(error_message): if pattern.confidence > max_confidence: max_confidence = pattern.confidence best_type = pattern.error_type return best_type async def _pattern_matching(self, logs: str) -> List[ErrorInfo]: """基于模式的错误匹配""" errors = [] lines = logs.split('\n') for line_num, line in enumerate(lines, 1): for pattern in self.error_patterns: if pattern.regex.search(line): error_info = ErrorInfo( error_type=pattern.error_type, message=line.strip(), log_snippet=line.strip(), line_number=line_num, confidence=pattern.confidence, context={ "description": pattern.description, "common_causes": pattern.common_causes, "suggested_fixes": pattern.suggested_fixes } ) errors.append(error_info) return errors async def _context_analysis(self, logs: str) -> List[ErrorInfo]: """上下文感知的错误分析""" errors = [] for error_type, analyzer in self.context_analyzers.items(): try: type_errors = await analyzer.analyze(logs) errors.extend(type_errors) except Exception as e: self.logger.error(f"上下文分析器 {error_type} 执行失败: {e}") return errors def _deduplicate_errors(self, errors: List[ErrorInfo]) -> List[ErrorInfo]: """去重错误""" if not errors: return [] # 按行号和错误类型去重 seen = set() deduplicated = [] for error in errors: key = (error.line_number, error.error_type) if key not in seen: seen.add(key) deduplicated.append(error) return deduplicated def _calculate_final_confidence(self, errors: List[ErrorInfo], logs: str) -> List[ErrorInfo]: """计算最终置信度""" for error in errors: # 基于多种因素调整置信度 base_confidence = error.confidence # 如果错误信息中包含具体的技术关键词,提高置信度 tech_keywords = ['docker', 'pip', 'npm', 'apt', 'python', 'node'] keyword_boost = sum(0.1 for keyword in tech_keywords if keyword in error.message.lower()) # 如果错误在日志的末尾(最近的错误),提高置信度 lines = logs.split('\n') position_factor = (error.line_number or 0) / len(lines) if len(lines) > 0 else 0.5 recent_boost = (1 - position_factor) * 0.2 # 计算最终置信度 final_confidence = min(1.0, base_confidence + keyword_boost + recent_boost) error.confidence = final_confidence return errors def _initialize_patterns(self) -> List[ErrorPattern]: """初始化错误模式""" patterns = [ # Dockerfile 语法错误 ErrorPattern( regex=re.compile(r"failed to solve:.*syntax error|Dockerfile:\d+"), error_type=ErrorType.DOCKERFILE_SYNTAX, confidence=0.9, description="Dockerfile 语法错误", common_causes=["命令格式错误", "参数缺失", "缩进问题"], suggested_fixes=["检查命令语法", "验证参数", "修复格式"] ), # 依赖安装失败 ErrorPattern( regex=re.compile(r"ERROR: Could not find a version|No matching distribution|pip install failed"), error_type=ErrorType.DEPENDENCY_INSTALL, confidence=0.85, description="Python 依赖安装失败", common_causes=["版本不存在", "网络问题", "依赖冲突"], suggested_fixes=["检查版本", "更换源", "解决冲突"] ), # Node.js 依赖安装失败 ErrorPattern( regex=re.compile(r"npm ERR!|yarn error|failed to install node packages"), error_type=ErrorType.DEPENDENCY_INSTALL, confidence=0.85, description="Node.js 依赖安装失败", common_causes=["版本冲突", "网络问题", "缓存问题"], suggested_fixes=["清理缓存", "检查版本", "使用国内源"] ), # 环境变量配置问题 ErrorPattern( regex=re.compile(r"Environment variable.*not found|ENV.*undefined|getenv.*None"), error_type=ErrorType.ENVIRONMENT_CONFIG, confidence=0.8, description="环境变量配置问题", common_causes=["变量未设置", "配置文件缺失", "权限问题"], suggested_fixes=["设置环境变量", "创建配置文件", "检查权限"] ), # 端口冲突 ErrorPattern( regex=re.compile(r"Address already in use|Port.*already used|EADDRINUSE"), error_type=ErrorType.PORT_CONFLICT, confidence=0.95, description="端口冲突", common_causes=["端口被占用", "权限不足", "配置错误"], suggested_fixes=["更换端口", "杀死占用进程", "修改配置"] ), # 权限问题 ErrorPattern( regex=re.compile(r"Permission denied|Operation not permitted|EACCES"), error_type=ErrorType.PERMISSION_ERROR, confidence=0.9, description="权限不足", common_causes=["文件权限", "用户权限", "目录权限"], suggested_fixes=["修改权限", "使用 sudo", "更改用户"] ), # 网络连接问题 ErrorPattern( regex=re.compile(r"Connection refused|Network unreachable|Timeout|DNS resolution failed"), error_type=ErrorType.NETWORK_CONNECTION, confidence=0.8, description="网络连接问题", common_causes=["网络不可达", "DNS问题", "防火墙限制"], suggested_fixes=["检查网络", "配置DNS", "调整防火墙"] ), # 超时错误 ErrorPattern( regex=re.compile(r"timeout|timed out|deadline exceeded"), error_type=ErrorType.TIMEOUT_ERROR, confidence=0.75, description="操作超时", common_causes=["操作时间过长", "资源不足", "网络延迟"], suggested_fixes["增加超时时间", "优化性能", "检查资源"] ), # 资源超限 ErrorPattern( regex=re.compile(r"out of memory|disk full|CPU limit exceeded|resource exceeded"), error_type=ErrorType.RESOURCE_EXCEEDED, confidence=0.9, description="资源超限", common_causes=["内存不足", "磁盘满", "CPU限制"], suggested_fixes=["清理资源", "增加配额", "优化代码"] ) ] return patterns class ContextAnalyzer(ABC): """上下文分析器基类""" async def analyze(self, logs: str) -> List[ErrorInfo]: """分析日志""" pass class DockerfileSyntaxAnalyzer(ContextAnalyzer): """Dockerfile 语法分析器""" async def analyze(self, logs: str) -> List[ErrorInfo]: errors = [] # 分析 Dockerfile 特有的语法错误 dockerfile_errors = [ (r"FROM.*invalid", "FROM 指令格式错误"), (r"RUN.*command not found", "RUN 命令执行失败"), (r"COPY.*No such file", "COPY 源文件不存在"), (r"EXPOSE.*invalid port", "EXPOSE 端口格式错误"), (r"ENV.*invalid format", "ENV 环境变量格式错误") ] for pattern, description in dockerfile_errors: if re.search(pattern, logs, re.IGNORECASE): error_info = ErrorInfo( error_type=ErrorType.DOCKERFILE_SYNTAX, message=description, log_snippet="", confidence=0.8, context={"analysis_type": "dockerfile_syntax"} ) errors.append(error_info) return errors class DependencyErrorAnalyzer(ContextAnalyzer): """依赖错误分析器""" async def analyze(self, logs: str) -> List[ErrorInfo]: errors = [] # Python 依赖问题 python_patterns = [ (r"pip.*Requirement already satisfied", "依赖重复安装"), (r"pip.*Could not find.*version", "依赖版本不存在"), (r"pip.*incompatible dependencies", "依赖版本冲突") ] # Node.js 依赖问题 node_patterns = [ (r"npm.*peer dependency", "peer 依赖问题"), (r"npm.*version mismatch", "版本不匹配"), (r"npm.*cache problem", "npm 缓存问题") ] all_patterns = python_patterns + node_patterns for pattern, description in all_patterns: if re.search(pattern, logs, re.IGNORECASE): error_info = ErrorInfo( error_type=ErrorType.DEPENDENCY_INSTALL, message=description, log_snippet="", confidence=0.75, context={"analysis_type": "dependency"} ) errors.append(error_info) return errors class EnvironmentErrorAnalyzer(ContextAnalyzer): """环境错误分析器""" async def analyze(self, logs: str) -> List[ErrorInfo]: errors = [] # 环境变量问题 if re.search(r"PATH.*not found", logs, re.IGNORECASE): error_info = ErrorInfo( error_type=ErrorType.ENVIRONMENT_CONFIG, message="PATH 环境变量配置问题", log_snippet="", confidence=0.8, context={"analysis_type": "environment", "var_type": "PATH"} ) errors.append(error_info) return errors class PortErrorAnalyzer(ContextAnalyzer): """端口错误分析器""" async def analyze(self, logs: str) -> List[ErrorInfo]: errors = [] # 检测常见的 HuggingFace Spaces 端口问题 if re.search(r"port.*7860", logs, re.IGNORECASE) and re.search(r"error|failed", logs, re.IGNORECASE): error_info = ErrorInfo( error_type=ErrorType.PORT_CONFLICT, message="HuggingFace Spaces 默认端口 7860 问题", log_snippet="", confidence=0.9, context={"analysis_type": "port", "port": "7860"} ) errors.append(error_info) return errors class PermissionErrorAnalyzer(ContextAnalyzer): """权限错误分析器""" async def analyze(self, logs: str) -> List[ErrorInfo]: errors = [] # 检测文件权限问题 if re.search(r"permission denied.*\.py|\.js|\.sh", logs, re.IGNORECASE): error_info = ErrorInfo( error_type=ErrorType.PERMISSION_ERROR, message="脚本文件权限问题", log_snippet="", confidence=0.8, context={"analysis_type": "permission", "file_type": "script"} ) errors.append(error_info) return errors class NetworkErrorAnalyzer(ContextAnalyzer): """网络错误分析器""" async def analyze(self, logs: str) -> List[ErrorInfo]: errors = [] # 检测网络连接问题 network_indicators = [ (r"github\.com.*timeout", "GitHub 连接超时"), (r"pypi\.org.*failed", "PyPI 连接失败"), (r"npm\.registry.*error", "npm registry 连接错误") ] for pattern, description in network_indicators: if re.search(pattern, logs, re.IGNORECASE): error_info = ErrorInfo( error_type=ErrorType.NETWORK_CONNECTION, message=description, log_snippet="", confidence=0.7, context={"analysis_type": "network", "service": pattern.split('.')[0]} ) errors.append(error_info) return errors class TimeoutErrorAnalyzer(ContextAnalyzer): """超时错误分析器""" async def analyze(self, logs: str) -> List[ErrorInfo]: errors = [] # 检测不同类型的超时 timeout_patterns = [ (r"build.*timeout", "构建超时"), (r"install.*timeout", "安装超时"), (r"download.*timeout", "下载超时") ] for pattern, description in timeout_patterns: if re.search(pattern, logs, re.IGNORECASE): error_info = ErrorInfo( error_type=ErrorType.TIMEOUT_ERROR, message=description, log_snippet="", confidence=0.8, context={"analysis_type": "timeout", "operation": pattern.split('.')[0]} ) errors.append(error_info) return errors class ResourceErrorAnalyzer(ContextAnalyzer): """资源错误分析器""" async def analyze(self, logs: str) -> List[ErrorInfo]: errors = [] # 检测资源限制问题 resource_patterns = [ (r"memory.*limit", "内存限制"), (r"disk.*space", "磁盘空间不足"), (r"cpu.*quota", "CPU 配额限制") ] for pattern, description in resource_patterns: if re.search(pattern, logs, re.IGNORECASE): error_info = ErrorInfo( error_type=ErrorType.RESOURCE_EXCEEDED, message=description, log_snippet="", confidence=0.8, context={"analysis_type": "resource", "resource_type": pattern.split('.')[0]} ) errors.append(error_info) return errors