hfproxydemo / core_system.py
OpenCode Deployer
update
4ca5973
"""
HuggingFace Spaces 自动化监控修复系统
核心系统架构和主要类定义
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Tuple, Union
from enum import Enum
import asyncio
import logging
from datetime import datetime
import json
import sqlite3
import os
from pathlib import Path
# ============================================================================
# 数据模型和枚举
# ============================================================================
class SpaceStatus(Enum):
"""Space 状态枚举"""
BUILDING = "building"
RUNNING = "running"
STOPPED = "stopped"
ERROR = "error"
UNKNOWN = "unknown"
class ErrorType(Enum):
"""错误类型枚举"""
DOCKERFILE_SYNTAX = "dockerfile_syntax"
DEPENDENCY_INSTALL = "dependency_install"
ENVIRONMENT_CONFIG = "environment_config"
PORT_CONFLICT = "port_conflict"
PERMISSION_ERROR = "permission_error"
NETWORK_CONNECTION = "network_connection"
TIMEOUT_ERROR = "timeout_error"
RESOURCE_EXCEEDED = "resource_exceeded"
UNKNOWN_ERROR = "unknown_error"
class RepairAction(Enum):
"""修复动作枚举"""
MODIFY_DOCKERFILE = "modify_dockerfile"
UPDATE_DEPENDENCIES = "update_dependencies"
FIX_ENVIRONMENT = "fix_environment"
CHANGE_PORT = "change_port"
SET_PERMISSIONS = "set_permissions"
UPDATE_SOURCES = "update_sources"
ADJUST_RESOURCES = "adjust_resources"
RETRY_BUILD = "retry_build"
@dataclass
class SpaceInfo:
"""Space 信息"""
space_id: str
name: str
repository_url: str
current_status: SpaceStatus
last_updated: datetime
dockerfile_path: str = "Dockerfile"
local_path: str = ""
@dataclass
class ErrorInfo:
"""错误信息"""
error_type: ErrorType
message: str
log_snippet: str
line_number: Optional[int] = None
confidence: float = 0.0
context: Dict[str, Any] = field(default_factory=dict)
@dataclass
class RepairStrategy:
"""修复策略"""
action: RepairAction
description: str
modifications: Dict[str, Any]
risk_level: str # low, medium, high
success_rate: float = 0.0
estimated_time: int = 0 # 秒
@dataclass
class RepairHistory:
"""修复历史"""
id: int
space_id: str
timestamp: datetime
error_info: ErrorInfo
strategy: RepairStrategy
success: bool
git_commit: Optional[str] = None
rollback_data: Optional[str] = None
# ============================================================================
# 核心接口定义
# ============================================================================
class HuggingFaceAPI(ABC):
"""HuggingFace API 接口"""
@abstractmethod
async def get_space_status(self, space_id: str) -> SpaceStatus:
"""获取 Space 状态"""
pass
@abstractmethod
async def get_space_logs(self, space_id: str, lines: int = 100) -> str:
"""获取 Space 日志"""
pass
@abstractmethod
async def trigger_rebuild(self, space_id: str) -> bool:
"""触发重新构建"""
pass
@abstractmethod
async def get_space_info(self, space_id: str) -> SpaceInfo:
"""获取 Space 详细信息"""
pass
class ErrorAnalyzer(ABC):
"""错误分析器接口"""
@abstractmethod
async def analyze_logs(self, logs: str) -> List[ErrorInfo]:
"""分析日志并识别错误"""
pass
@abstractmethod
async def classify_error(self, error_message: str) -> ErrorType:
"""分类错误类型"""
pass
class RepairStrategyEngine(ABC):
"""修复策略引擎接口"""
@abstractmethod
async def generate_strategy(self, error: ErrorInfo, space_info: SpaceInfo) -> Optional[RepairStrategy]:
"""生成修复策略"""
pass
@abstractmethod
async def estimate_success(self, strategy: RepairStrategy) -> float:
"""估算成功概率"""
pass
class FileModifier(ABC):
"""文件修改器接口"""
@abstractmethod
async def apply_modifications(self, file_path: str, modifications: Dict[str, Any]) -> bool:
"""应用修改"""
pass
@abstractmethod
async def backup_file(self, file_path: str) -> str:
"""备份文件"""
pass
# ============================================================================
# 核心系统类
# ============================================================================
class HFSpaceMonitor:
"""HuggingFace Space 监控器"""
def __init__(self, hf_api: HuggingFaceAPI, check_interval: int = 60):
self.hf_api = hf_api
self.check_interval = check_interval
self.logger = logging.getLogger(__name__)
self._running = False
async def start_monitoring(self, space_ids: List[str]) -> None:
"""开始监控 Spaces"""
self._running = True
self.logger.info(f"开始监控 {len(space_ids)} 个 Space")
while self._running:
try:
await self._check_spaces(space_ids)
await asyncio.sleep(self.check_interval)
except Exception as e:
self.logger.error(f"监控过程出错: {e}")
await asyncio.sleep(5)
async def _check_spaces(self, space_ids: List[str]) -> None:
"""检查所有 Space 状态"""
tasks = [self._check_single_space(space_id) for space_id in space_ids]
await asyncio.gather(*tasks, return_exceptions=True)
async def _check_single_space(self, space_id: str) -> None:
"""检查单个 Space 状态"""
try:
status = await self.hf_api.get_space_status(space_id)
self.logger.info(f"Space {space_id} 状态: {status.value}")
if status == SpaceStatus.ERROR:
logs = await self.hf_api.get_space_logs(space_id)
# 触发错误分析和修复流程
await self._handle_error(space_id, logs)
except Exception as e:
self.logger.error(f"检查 Space {space_id} 失败: {e}")
async def _handle_error(self, space_id: str, logs: str) -> None:
"""处理错误"""
# 这里会调用错误分析器和修复引擎
pass
def stop(self) -> None:
"""停止监控"""
self._running = False
class IntelligentErrorAnalyzer:
"""智能错误分析器"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.error_patterns = self._load_error_patterns()
async def analyze_logs(self, logs: str) -> List[ErrorInfo]:
"""分析日志并识别错误"""
errors = []
# 分行分析日志
for line_num, line in enumerate(logs.split('\n'), 1):
for error_type, patterns in self.error_patterns.items():
for pattern in patterns:
if pattern['regex'].search(line):
error_info = ErrorInfo(
error_type=ErrorType(error_type),
message=line.strip(),
log_snippet=line.strip(),
line_number=line_num,
confidence=pattern['confidence'],
context=self._extract_context(line, logs, line_num)
)
errors.append(error_info)
break
return errors
def _load_error_patterns(self) -> Dict[str, List[Dict]]:
"""加载错误模式"""
return {
"dockerfile_syntax": [
{
"regex": re.compile(r"ERROR:.*failed to solve|failed to compute cache key"),
"confidence": 0.9
}
],
"dependency_install": [
{
"regex": re.compile(r"ERROR:.*Could not find a version|No matching distribution"),
"confidence": 0.85
}
],
"environment_config": [
{
"regex": re.compile(r"ERROR:.*environment variable|ENV not found"),
"confidence": 0.8
}
]
# 更多模式...
}
def _extract_context(self, error_line: str, logs: str, line_num: int) -> Dict[str, Any]:
"""提取错误上下文"""
lines = logs.split('\n')
start = max(0, line_num - 3)
end = min(len(lines), line_num + 3)
return {
"before": lines[start:line_num],
"after": lines[line_num + 1:end],
"full_context": lines[start:end]
}
class SmartRepairEngine:
"""智能修复引擎"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.repair_rules = self._load_repair_rules()
async def generate_strategy(self, error: ErrorInfo, space_info: SpaceInfo) -> Optional[RepairStrategy]:
"""生成修复策略"""
error_type = error.error_type.value
if error_type in self.repair_rules:
rules = self.repair_rules[error_type]
# 选择最适合的规则
best_rule = max(rules, key=lambda r: r['success_rate'])
return RepairStrategy(
action=RepairAction(best_rule['action']),
description=best_rule['description'],
modifications=best_rule['modifications'],
risk_level=best_rule['risk_level'],
success_rate=best_rule['success_rate'],
estimated_time=best_rule['estimated_time']
)
return None
def _load_repair_rules(self) -> Dict[str, List[Dict]]:
"""加载修复规则"""
return {
"dockerfile_syntax": [
{
"action": "modify_dockerfile",
"description": "修复 Dockerfile 语法错误",
"modifications": {
"type": "syntax_fix",
"target": error.line_number
},
"risk_level": "medium",
"success_rate": 0.7,
"estimated_time": 120
}
],
"dependency_install": [
{
"action": "update_dependencies",
"description": "更新依赖版本或更换源地址",
"modifications": {
"type": "dependency_update",
"strategy": "version_bump_or_source_change"
},
"risk_level": "low",
"success_rate": 0.8,
"estimated_time": 300
}
]
# 更多规则...
}
class AutoRepairSystem:
"""自动修复系统主类"""
def __init__(self, config_path: str = "config.json"):
self.config = self._load_config(config_path)
self.logger = self._setup_logging()
# 初始化各个组件
self.hf_api = HuggingFaceAPIClient(self.config['hf_token'])
self.error_analyzer = IntelligentErrorAnalyzer()
self.repair_engine = SmartRepairEngine()
self.file_modifier = DockerfileModifier()
self.state_manager = StateManager(self.config['db_path'])
# 监控器
self.monitor = HFSpaceMonitor(self.hf_api, self.config['check_interval'])
# 修复队列
self.repair_queue = asyncio.Queue()
async def start(self, space_ids: List[str]) -> None:
"""启动系统"""
self.logger.info("启动 HuggingFace Spaces 自动修复系统")
# 启动监控任务
monitor_task = asyncio.create_task(self.monitor.start_monitoring(space_ids))
# 启动修复任务
repair_task = asyncio.create_task(self._process_repair_queue())
# 等待任务完成(正常情况下不会完成)
await asyncio.gather(monitor_task, repair_task)
async def _process_repair_queue(self) -> None:
"""处理修复队列"""
while True:
try:
repair_job = await self.repair_queue.get()
await self._execute_repair(repair_job)
except Exception as e:
self.logger.error(f"修复任务执行失败: {e}")
async def _execute_repair(self, job: Dict[str, Any]) -> None:
"""执行修复任务"""
space_id = job['space_id']
error_info = job['error_info']
self.logger.info(f"开始修复 Space {space_id}")
# 获取 Space 信息
space_info = await self.hf_api.get_space_info(space_id)
# 生成修复策略
strategy = await self.repair_engine.generate_strategy(error_info, space_info)
if strategy:
try:
# 备份原文件
backup_path = await self.file_modifier.backup_file(space_info.dockerfile_path)
# 应用修改
success = await self.file_modifier.apply_modifications(
space_info.dockerfile_path,
strategy.modifications
)
if success:
# 提交到 Git
git_commit = await self._commit_changes(space_id, strategy)
# 触发重新构建
await self.hf_api.trigger_rebuild(space_id)
# 记录历史
await self.state_manager.record_repair(
space_id, error_info, strategy, True, git_commit
)
self.logger.info(f"Space {space_id} 修复完成")
else:
# 回滚
await self._rollback(backup_path, space_info.dockerfile_path)
except Exception as e:
self.logger.error(f"修复失败: {e}")
await self._rollback(backup_path, space_info.dockerfile_path)
def _load_config(self, config_path: str) -> Dict[str, Any]:
"""加载配置"""
default_config = {
"hf_token": os.getenv("HF_TOKEN", ""),
"check_interval": 60,
"db_path": "repair_system.db",
"max_retry": 3,
"log_level": "INFO"
}
if os.path.exists(config_path):
with open(config_path, 'r') as f:
user_config = json.load(f)
default_config.update(user_config)
return default_config
def _setup_logging(self) -> logging.Logger:
"""设置日志"""
logger = logging.getLogger(__name__)
logger.setLevel(getattr(logging, self.config['log_level']))
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
# ============================================================================
# 具体实现类
# ============================================================================
class HuggingFaceAPIClient(HuggingFaceAPI):
"""HuggingFace API 客户端实现"""
def __init__(self, token: str):
self.token = token
self.base_url = "https://huggingface.co/api"
self.headers = {"Authorization": f"Bearer {token}"}
async def get_space_status(self, space_id: str) -> SpaceStatus:
"""获取 Space 状态"""
# 实现具体的 API 调用逻辑
pass
async def get_space_logs(self, space_id: str, lines: int = 100) -> str:
"""获取 Space 日志"""
# 实现具体的 API 调用逻辑
pass
async def trigger_rebuild(self, space_id: str) -> bool:
"""触发重新构建"""
# 实现具体的 API 调用逻辑
pass
async def get_space_info(self, space_id: str) -> SpaceInfo:
"""获取 Space 详细信息"""
# 实现具体的 API 调用逻辑
pass
class DockerfileModifier(FileModifier):
"""Dockerfile 修改器实现"""
async def apply_modifications(self, file_path: str, modifications: Dict[str, Any]) -> bool:
"""应用修改"""
# 实现具体的 Dockerfile 修改逻辑
pass
async def backup_file(self, file_path: str) -> str:
"""备份文件"""
# 实现文件备份逻辑
pass
class StateManager:
"""状态管理器"""
def __init__(self, db_path: str):
self.db_path = db_path
self._init_database()
def _init_database(self) -> None:
"""初始化数据库"""
# 创建数据库表结构
pass
async def record_repair(self, space_id: str, error_info: ErrorInfo,
strategy: RepairStrategy, success: bool,
git_commit: Optional[str] = None) -> None:
"""记录修复历史"""
# 实现修复历史记录逻辑
pass
async def get_repair_history(self, space_id: str) -> List[RepairHistory]:
"""获取修复历史"""
# 实现历史查询逻辑
pass
if __name__ == "__main__":
# 系统启动示例
system = AutoRepairSystem()
# 要监控的 Space ID 列表
space_ids = [
"your-username/your-space-1",
"your-username/your-space-2"
]
# 启动系统
asyncio.run(system.start(space_ids))