hfproxydemo / data_models.py
OpenCode Deployer
监控系统开发: 2026-02-01 15:40:53
14f6b4f
"""
HuggingFace Spaces 监控数据模型
定义所有数据结构和数据库表结构
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Union
from enum import Enum
from datetime import datetime
import json
import sqlite3
import asyncio
from pathlib import Path
import uuid
from pydantic import BaseModel, Field
import aiohttp
# ============================================================================
# 基础枚举类型
# ============================================================================
class SpaceStatus(Enum):
"""Space 状态枚举"""
BUILDING = "building"
RUNNING = "running"
STOPPED = "stopped"
ERROR = "error"
UNKNOWN = "unknown"
PAUSED = "paused"
SLEEPING = "sleeping"
class LogLevel(Enum):
"""日志级别枚举"""
DEBUG = "debug"
INFO = "info"
WARNING = "warning"
ERROR = "error"
CRITICAL = "critical"
class EventType(Enum):
"""事件类型枚举"""
STATUS_CHANGE = "status_change"
BUILD_STARTED = "build_started"
BUILD_COMPLETED = "build_completed"
BUILD_FAILED = "build_failed"
SPACE_STARTED = "space_started"
SPACE_STOPPED = "space_stopped"
ERROR_DETECTED = "error_detected"
WEBHOOK_RECEIVED = "webhook_received"
class AlertLevel(Enum):
"""告警级别枚举"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
# ============================================================================
# Pydantic 数据模型
# ============================================================================
class SpaceInfo(BaseModel):
"""Space 基本信息"""
space_id: str = Field(..., description="Space ID")
name: str = Field(..., description="Space 名称")
repository_url: str = Field("", description="仓库 URL")
description: Optional[str] = Field(None, description="描述")
author: Optional[str] = Field(None, description="作者")
tags: List[str] = Field(default_factory=list, description="标签")
sdk: Optional[str] = Field(None, description="SDK 类型")
python_version: Optional[str] = Field(None, description="Python 版本")
dockerfile_path: str = Field("Dockerfile", description="Dockerfile 路径")
local_path: Optional[str] = Field(None, description="本地路径")
created_at: Optional[datetime] = Field(None, description="创建时间")
last_modified: Optional[datetime] = Field(None, description="最后修改时间")
class SpaceRuntime(BaseModel):
"""Space 运行时信息"""
stage: str = Field(..., description="运行阶段")
state: str = Field(..., description="运行状态")
hardware: Optional[Dict[str, Any]] = Field(default_factory=dict, description="硬件配置")
replicas: Optional[int] = Field(None, description="副本数量")
requested_hardware: Optional[Dict[str, Any]] = Field(default_factory=dict, description="请求的硬件")
acs_type: Optional[str] = Field(None, description="ACS 类型")
storage: Optional[str] = Field(None, description="存储信息")
sha: Optional[str] = Field(None, description="Git SHA")
class SpaceStatusInfo(BaseModel):
"""Space 状态信息"""
space_id: str
status: SpaceStatus
runtime: SpaceRuntime
timestamp: datetime
url: Optional[str] = None
emoji: Optional[str] = None
color: Optional[str] = None
likes: Optional[int] = None
tags: List[str] = Field(default_factory=list)
class BuildLogEntry(BaseModel):
"""构建日志条目"""
timestamp: datetime
level: LogLevel
message: str
source: Optional[str] = None # 日志来源
line_number: Optional[int] = None
class BuildLog(BaseModel):
"""构建日志集合"""
space_id: str
build_id: Optional[str] = None
entries: List[BuildLogEntry] = Field(default_factory=list)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
status: Optional[SpaceStatus] = None
total_lines: int = 0
class WebhookEvent(BaseModel):
"""Webhook 事件"""
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
event_type: EventType
space_id: str
timestamp: datetime
payload: Dict[str, Any]
processed: bool = Field(default=False, description="是否已处理")
retry_count: int = Field(0, description="重试次数")
error_message: Optional[str] = None
class MonitorEvent(BaseModel):
"""监控事件"""
event_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
space_id: str
event_type: EventType
timestamp: datetime
data: Dict[str, Any] = Field(default_factory=dict)
previous_status: Optional[SpaceStatus] = None
current_status: Optional[SpaceStatus] = None
severity: AlertLevel = AlertLevel.LOW
message: str = ""
resolved: bool = Field(default=False, description="是否已解决")
class AlertRule(BaseModel):
"""告警规则"""
rule_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
name: str
description: Optional[str] = None
space_id: Optional[str] = None # None 表示适用于所有 Space
condition: Dict[str, Any] # 触发条件
severity: AlertLevel
enabled: bool = Field(True, description="是否启用")
cooldown_minutes: int = Field(15, description="冷却时间(分钟)")
last_triggered: Optional[datetime] = None
notification_channels: List[str] = Field(default_factory=list)
class Alert(BaseModel):
"""告警记录"""
alert_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
rule_id: str
space_id: str
severity: AlertLevel
title: str
message: str
timestamp: datetime
acknowledged: bool = Field(False, description="是否已确认")
resolved: bool = Field(False, description="是否已解决")
resolved_at: Optional[datetime] = None
acknowledged_by: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class MonitorConfig(BaseModel):
"""监控配置"""
space_id: str
enabled: bool = Field(True, description="是否启用监控")
check_interval_seconds: int = Field(60, description="检查间隔(秒)")
retry_attempts: int = Field(3, description="重试次数")
retry_delay_seconds: int = Field(30, description="重试延迟(秒)")
log_lines_count: int = Field(100, description="获取日志行数")
error_threshold: int = Field(5, description="错误阈值")
webhook_enabled: bool = Field(False, description="是否启用 Webhook")
webhook_url: Optional[str] = None
notification_channels: List[str] = Field(default_factory=list)
custom_rules: Dict[str, Any] = Field(default_factory=dict)
# ============================================================================
# 数据库操作类
# ============================================================================
class DatabaseManager:
"""数据库管理器"""
def __init__(self, db_path: str = "monitoring.db"):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""初始化数据库表"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# Spaces 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS spaces (
space_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
repository_url TEXT,
description TEXT,
author TEXT,
tags TEXT, -- JSON 格式
sdk TEXT,
python_version TEXT,
dockerfile_path TEXT,
local_path TEXT,
created_at TEXT,
last_modified TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Status History 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS status_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
space_id TEXT NOT NULL,
status TEXT NOT NULL,
runtime TEXT, -- JSON 格式
timestamp TEXT NOT NULL,
url TEXT,
emoji TEXT,
color TEXT,
likes INTEGER,
tags TEXT, -- JSON 格式
FOREIGN KEY (space_id) REFERENCES spaces (space_id)
)
""")
# Build Logs 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS build_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
space_id TEXT NOT NULL,
build_id TEXT,
entries TEXT, -- JSON 格式
start_time TEXT,
end_time TEXT,
status TEXT,
total_lines INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (space_id) REFERENCES spaces (space_id)
)
""")
# Monitor Events 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS monitor_events (
event_id TEXT PRIMARY KEY,
space_id TEXT NOT NULL,
event_type TEXT NOT NULL,
timestamp TEXT NOT NULL,
data TEXT, -- JSON 格式
previous_status TEXT,
current_status TEXT,
severity TEXT NOT NULL,
message TEXT,
resolved BOOLEAN DEFAULT FALSE,
FOREIGN KEY (space_id) REFERENCES spaces (space_id)
)
""")
# Webhook Events 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS webhook_events (
event_id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
space_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
payload TEXT, -- JSON 格式
processed BOOLEAN DEFAULT FALSE,
retry_count INTEGER DEFAULT 0,
error_message TEXT,
FOREIGN KEY (space_id) REFERENCES spaces (space_id)
)
""")
# Alert Rules 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS alert_rules (
rule_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
space_id TEXT,
condition TEXT NOT NULL, -- JSON 格式
severity TEXT NOT NULL,
enabled BOOLEAN DEFAULT TRUE,
cooldown_minutes INTEGER DEFAULT 15,
last_triggered TEXT,
notification_channels TEXT, -- JSON 格式
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (space_id) REFERENCES spaces (space_id)
)
""")
# Alerts 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS alerts (
alert_id TEXT PRIMARY KEY,
rule_id TEXT NOT NULL,
space_id TEXT NOT NULL,
severity TEXT NOT NULL,
title TEXT NOT NULL,
message TEXT NOT NULL,
timestamp TEXT NOT NULL,
acknowledged BOOLEAN DEFAULT FALSE,
resolved BOOLEAN DEFAULT FALSE,
resolved_at TEXT,
acknowledged_by TEXT,
metadata TEXT, -- JSON 格式
FOREIGN KEY (rule_id) REFERENCES alert_rules (rule_id),
FOREIGN KEY (space_id) REFERENCES spaces (space_id)
)
""")
# Monitor Config 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS monitor_config (
space_id TEXT PRIMARY KEY,
enabled BOOLEAN DEFAULT TRUE,
check_interval_seconds INTEGER DEFAULT 60,
retry_attempts INTEGER DEFAULT 3,
retry_delay_seconds INTEGER DEFAULT 30,
log_lines_count INTEGER DEFAULT 100,
error_threshold INTEGER DEFAULT 5,
webhook_enabled BOOLEAN DEFAULT FALSE,
webhook_url TEXT,
notification_channels TEXT, -- JSON 格式
custom_rules TEXT, -- JSON 格式
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (space_id) REFERENCES spaces (space_id)
)
""")
conn.commit()
async def save_space_info(self, space_info: SpaceInfo) -> None:
"""保存 Space 信息"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO spaces
(space_id, name, repository_url, description, author, tags, sdk,
python_version, dockerfile_path, local_path, created_at, last_modified)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
space_info.space_id,
space_info.name,
space_info.repository_url,
space_info.description,
space_info.author,
json.dumps(space_info.tags),
space_info.sdk,
space_info.python_version,
space_info.dockerfile_path,
space_info.local_path,
space_info.created_at.isoformat() if space_info.created_at else None,
space_info.last_modified.isoformat() if space_info.last_modified else None
))
conn.commit()
async def save_status_history(self, status_info: SpaceStatusInfo) -> None:
"""保存状态历史"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO status_history
(space_id, status, runtime, timestamp, url, emoji, color, likes, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
status_info.space_id,
status_info.status.value,
status_info.runtime.model_dump_json(),
status_info.timestamp.isoformat(),
status_info.url,
status_info.emoji,
status_info.color,
status_info.likes,
json.dumps(status_info.tags)
))
conn.commit()
async def save_monitor_event(self, event: MonitorEvent) -> None:
"""保存监控事件"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO monitor_events
(event_id, space_id, event_type, timestamp, data, previous_status,
current_status, severity, message, resolved)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
event.event_id,
event.space_id,
event.event_type.value,
event.timestamp.isoformat(),
json.dumps(event.data),
event.previous_status.value if event.previous_status else None,
event.current_status.value if event.current_status else None,
event.severity.value,
event.message,
event.resolved
))
conn.commit()
async def save_webhook_event(self, event: WebhookEvent) -> None:
"""保存 Webhook 事件"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO webhook_events
(event_id, event_type, space_id, timestamp, payload, processed,
retry_count, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
event.event_id,
event.event_type.value,
event.space_id,
event.timestamp.isoformat(),
json.dumps(event.payload),
event.processed,
event.retry_count,
event.error_message
))
conn.commit()
async def save_alert(self, alert: Alert) -> None:
"""保存告警"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT OR REPLACE INTO alerts
(alert_id, rule_id, space_id, severity, title, message, timestamp,
acknowledged, resolved, resolved_at, acknowledged_by, metadata)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
alert.alert_id,
alert.rule_id,
alert.space_id,
alert.severity.value,
alert.title,
alert.message,
alert.timestamp.isoformat(),
alert.acknowledged,
alert.resolved,
alert.resolved_at.isoformat() if alert.resolved_at else None,
alert.acknowledged_by,
json.dumps(alert.metadata)
))
conn.commit()
async def get_space_config(self, space_id: str) -> Optional[MonitorConfig]:
"""获取 Space 监控配置"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT space_id, enabled, check_interval_seconds, retry_attempts,
retry_delay_seconds, log_lines_count, error_threshold,
webhook_enabled, webhook_url, notification_channels, custom_rules
FROM monitor_config WHERE space_id = ?
""", (space_id,))
row = cursor.fetchone()
if row:
return MonitorConfig(
space_id=row[0],
enabled=bool(row[1]),
check_interval_seconds=row[2],
retry_attempts=row[3],
retry_delay_seconds=row[4],
log_lines_count=row[5],
error_threshold=row[6],
webhook_enabled=bool(row[7]),
webhook_url=row[8],
notification_channels=json.loads(row[9]) if row[9] else [],
custom_rules=json.loads(row[10]) if row[10] else {}
)
return None
async def get_recent_events(self, space_id: str, limit: int = 100) -> List[MonitorEvent]:
"""获取最近的监控事件"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT event_id, space_id, event_type, timestamp, data, previous_status,
current_status, severity, message, resolved
FROM monitor_events
WHERE space_id = ?
ORDER BY timestamp DESC
LIMIT ?
""", (space_id, limit))
events = []
for row in cursor.fetchall():
events.append(MonitorEvent(
event_id=row[0],
space_id=row[1],
event_type=EventType(row[2]),
timestamp=datetime.fromisoformat(row[3]),
data=json.loads(row[4]) if row[4] else {},
previous_status=SpaceStatus(row[5]) if row[5] else None,
current_status=SpaceStatus(row[6]) if row[6] else None,
severity=AlertLevel(row[7]),
message=row[8] or "",
resolved=bool(row[9])
))
return events
async def cleanup_old_data(self, days: int = 30) -> None:
"""清理旧数据"""
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# 清理旧的状态历史
cursor.execute("DELETE FROM status_history WHERE timestamp < ?", (cutoff_date,))
# 清理旧的构建日志
cursor.execute("DELETE FROM build_logs WHERE created_at < ?", (cutoff_date,))
# 清理已解决的旧事件
cursor.execute("DELETE FROM monitor_events WHERE resolved = TRUE AND timestamp < ?", (cutoff_date,))
# 清理已解决的旧告警
cursor.execute("DELETE FROM alerts WHERE resolved = TRUE AND timestamp < ?", (cutoff_date,))
conn.commit()
# ============================================================================
# 工具函数
# ============================================================================
def parse_hf_space_data(data: Dict[str, Any]) -> SpaceInfo:
"""解析 HuggingFace Space 数据"""
return SpaceInfo(
space_id=data.get('id', ''),
name=data.get('id', ''),
repository_url=data.get('url', ''),
description=data.get('description'),
author=data.get('author', ''),
tags=data.get('tags', []),
sdk=data.get('sdk'),
python_version=data.get('pythonVersion'),
last_modified=datetime.fromisoformat(data['lastModified'].replace('Z', '+00:00')) if data.get('lastModified') else None
)
def parse_hf_runtime_data(data: Dict[str, Any]) -> SpaceRuntime:
"""解析 HuggingFace Runtime 数据"""
return SpaceRuntime(
stage=data.get('stage', ''),
state=data.get('state', ''),
hardware=data.get('hardware', {}),
replicas=data.get('replicas'),
requested_hardware=data.get('requestedHardware', {}),
acs_type=data.get('acsType'),
storage=data.get('storage'),
sha=data.get('sha')
)
# ============================================================================
# 错误和修复相关模型
# ============================================================================
class ErrorType(Enum):
"""错误类型枚举"""
DEPENDENCY_INSTALL = "dependency_install"
DOCKER_BUILD_ERROR = "docker_build_error"
PORT_BINDING_ERROR = "port_binding_error"
PERMISSION_ERROR = "permission_error"
MEMORY_ERROR = "memory_error"
DISK_SPACE_ERROR = "disk_space_error"
TIMEOUT_ERROR = "timeout_error"
NETWORK_ERROR = "network_error"
CONFIGURATION_ERROR = "configuration_error"
RUNTIME_ERROR = "runtime_error"
UNKNOWN_ERROR = "unknown_error"
class RepairAction(Enum):
"""修复动作枚举"""
MODIFY_DOCKERFILE = "modify_dockerfile"
UPDATE_DEPENDENCIES = "update_dependencies"
CHANGE_PORT = "change_port"
FIX_ENVIRONMENT = "fix_environment"
SET_PERMISSIONS = "set_permissions"
UPDATE_SOURCES = "update_sources"
INCREASE_RESOURCES = "increase_resources"
CLEANUP_DISK = "cleanup_disk"
RESTART_SERVICE = "restart_service"
GENERAL_FIX = "general_fix"
@dataclass
class ErrorInfo:
"""错误信息"""
error_type: ErrorType
message: str
log_snippet: Optional[str] = None
confidence: float = 0.0
severity: AlertLevel = AlertLevel.MEDIUM
occurred_at: datetime = field(default_factory=datetime.now)
additional_data: Dict[str, Any] = field(default_factory=dict)
suggested_fixes: List[str] = field(default_factory=list)
@dataclass
class RepairStrategy:
"""修复策略"""
action: RepairAction
description: str
modifications: Dict[str, Any] = field(default_factory=dict)
risk_level: str = "medium" # low, medium, high, critical
success_rate: float = 0.5
estimated_time: int = 300 # 秒
prerequisites: List[str] = field(default_factory=list)
side_effects: List[str] = field(default_factory=list)
rollback_possible: bool = True
manual_review_required: bool = False
@dataclass
class RepairHistory:
"""修复历史记录"""
repair_id: str = field(default_factory=lambda: str(uuid.uuid4()))
space_id: str = ""
error_info: Optional[ErrorInfo] = None
strategy: Optional[RepairStrategy] = None
executed_at: datetime = field(default_factory=datetime.now)
success: bool = False
commit_sha: Optional[str] = None
execution_time: int = 0 # 秒
error_message: Optional[str] = None
rollback_performed: bool = False
rollback_reason: Optional[str] = None
verification_passed: bool = False
notes: str = ""