Spaces:
Build error
Build error
| import asyncio | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Set, Callable, Any | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import json | |
| import time | |
| from concurrent.futures import ThreadPoolExecutor | |
| import threading | |
| from collections import defaultdict, deque | |
| from config import get_config, MonitoringConfig | |
| from data_models import ( | |
| SpaceInfo, SpaceStatusInfo, SpaceStatus, MonitorEvent, EventType, | |
| AlertLevel, DatabaseManager, BuildLog, WebhookEvent, Alert, AlertRule | |
| ) | |
| from huggingface_client_v2 import HuggingFaceClient, RetryClient, WebhookHandler | |
| class MonitorState(Enum): | |
| MONITOR_STATE = "monitor_state" | |
| STARTING = "starting" | |
| RUNNING = "running" | |
| PAUSED = "paused" | |
| STOPPING = "stopping" | |
| STOPPED = "stopped" | |
| ERROR = "error" | |
| class SpaceMonitor: | |
| space_id: str | |
| config: Dict[str, Any] | |
| last_check: Optional[datetime] = None | |
| last_status: Optional[SpaceStatus] = None | |
| error_count: int = 0 | |
| consecutive_errors: int = 0 | |
| events: List[MonitorEvent] = field(default_factory=list) | |
| active: bool = True | |
| task: Optional[asyncio.Task] = None | |
| class MonitorEngine: | |
| HuggingFace Spaces 监控引擎 | |
| def __init__(self, config: Optional[MonitoringConfig] = None): | |
| self.config = config or get_config().monitoring | |
| self.logger = logging.getLogger(__name__) | |
| self.client = RetryClient(HuggingFaceClient()) | |
| self.db_manager = DatabaseManager() | |
| self.webhook_handler = None | |
| self.monitored_spaces: Dict[str, SpaceMonitor] = {} | |
| self.state = MonitorState.STOPPED | |
| self.monitor_task: Optional[asyncio.Task] = None | |
| self.event_queue = asyncio.Queue() | |
| self.event_callbacks: Dict[EventType, List[Callable]] = defaultdict(list) | |
| self.alert_rules: Dict[str, AlertRule] = {} | |
| self._shutdown_event = asyncio.Event() | |
| self._executor = ThreadPoolExecutor(max_workers=4) | |
| self.stats = { | |
| 'total_checks': 0, | |
| 'successful_checks': 0, | |
| 'failed_checks': 0, | |
| 'events_generated': 0, | |
| 'alerts_triggered': 0, | |
| 'start_time': None, | |
| 'last_check_time': None | |
| } | |
| async def start(self) -> None: | |
| if self.state != MonitorState.STOPPED: | |
| self.logger.warning("监控引擎已在运行中") | |
| return | |
| try: | |
| self.state = MonitorState.STARTING | |
| self.logger.info("启动监控引擎...") | |
| await self.client.client.validate_token() | |
| self._shutdown_event.clear() | |
| self.monitor_task = asyncio.create_task(self._monitor_loop()) | |
| self.state = MonitorState.RUNNING | |
| self.stats['start_time'] = datetime.now() | |
| self.logger.info("监控引擎启动成功") | |
| except Exception as e: | |
| self.state = MonitorState.ERROR | |
| self.logger.error(f"启动监控引擎失败: {e}") | |
| raise | |
| async def stop(self) -> None: | |
| if self.state == MonitorState.STOPPED: | |
| return | |
| try: | |
| self.state = MonitorState.STOPPING | |
| self.logger.info("停止监控引擎...") | |
| self._shutdown_event.set() | |
| if self.monitor_task: | |
| self.monitor_task.cancel() | |
| try: | |
| await self.monitor_task | |
| except asyncio.CancelledError: | |
| pass | |
| for space_monitor in self.monitored_spaces.values(): | |
| if space_monitor.task: | |
| space_monitor.task.cancel() | |
| try: | |
| await space_monitor.task | |
| except asyncio.CancelledError: | |
| pass | |
| await self.client.client.close() | |
| self._executor.shutdown(wait=True) | |
| self.state = MonitorState.STOPPED | |
| self.logger.info("监控引擎已停止") | |
| except Exception as e: | |
| self.state = MonitorState.ERROR | |
| self.logger.error(f"停止监控引擎失败: {e}") | |
| raise | |
| async def add_space(self, space_id: str, config: Optional[Dict[str, Any]] = None) -> None: | |
| if space_id in self.monitored_spaces: | |
| self.logger.warning(f"Space {space_id} 已在监控中") | |
| return | |
| monitor_config = config or self.config.model_dump() | |
| try: | |
| space_info = await self.client.client.get_space_info(space_id) | |
| initial_status = await self.client.get_space_status(space_id) | |
| space_monitor = SpaceMonitor( | |
| space_id=space_id, | |
| config=monitor_config, | |
| last_check=datetime.now(), | |
| last_status=initial_status.status | |
| ) | |
| self.monitored_spaces[space_id] = space_monitor | |
| await self.db_manager.save_space_info(space_info) | |
| event = MonitorEvent( | |
| space_id=space_id, | |
| event_type=EventType.BUILD_STARTED, | |
| timestamp=datetime.now(), | |
| message=f"开始监控 Space {space_id},当前状态: {initial_status.status.value}", | |
| current_status=initial_status.status | |
| ) | |
| await self._emit_event(event) | |
| self.logger.info(f"已添加 Space {space_id} 到监控列表") | |
| except Exception as e: | |
| self.logger.error(f"添加 Space {space_id} 失败: {e}") | |
| raise | |
| async def remove_space(self, space_id: str) -> None: | |
| if space_id not in self.monitored_spaces: | |
| self.logger.warning(f"Space {space_id} 不在监控中") | |
| return | |
| space_monitor = self.monitored_spaces[space_id] | |
| if space_monitor.task: | |
| space_monitor.task.cancel() | |
| try: | |
| await space_monitor.task | |
| except asyncio.CancelledError: | |
| pass | |
| del self.monitored_spaces[space_id] | |
| event = MonitorEvent( | |
| space_id=space_id, | |
| event_type=EventType.SPACE_STOPPED, | |
| timestamp=datetime.now(), | |
| message=f"停止监控 Space {space_id}" | |
| ) | |
| await self._emit_event(event) | |
| self.logger.info(f"已从监控列表移除 Space {space_id}") | |
| async def get_monitored_spaces(self) -> List[str]: | |
| return list(self.monitored_spaces.keys()) | |
| async def get_space_status(self, space_id: str) -> Optional[SpaceStatusInfo]: | |
| if space_id not in self.monitored_spaces: | |
| return None | |
| return await self.client.get_space_status(space_id) | |
| async def get_space_events(self, space_id: str, limit: int = 100) -> List[MonitorEvent]: | |
| return await self.db_manager.get_recent_events(space_id, limit) | |
| async def add_alert_rule(self, rule: AlertRule) -> None: | |
| self.alert_rules[rule.rule_id] = rule | |
| await self.db_manager.save_alert(Alert( | |
| rule_id=rule.rule_id, | |
| space_id=rule.space_id or "*", | |
| severity=rule.severity, | |
| title=f"告警规则创建: {rule.name}", | |
| message=f"告警规则 '{rule.name}' 已创建", | |
| timestamp=datetime.now() | |
| )) | |
| self.logger.info(f"已添加告警规则: {rule.name}") | |
| async def remove_alert_rule(self, rule_id: str) -> None: | |
| if rule_id in self.alert_rules: | |
| del self.alert_rules[rule_id] | |
| self.logger.info(f"已移除告警规则: {rule_id}") | |
| def register_event_callback(self, event_type: EventType, callback: Callable) -> None: | |
| self.event_callbacks[event_type].append(callback) | |
| def unregister_event_callback(self, event_type: EventType, callback: Callable) -> None: | |
| if callback in self.event_callbacks[event_type]: | |
| self.event_callbacks[event_type].remove(callback) | |
| async def _monitor_loop(self) -> None: | |
| self.logger.info("监控循环开始") | |
| while not self._shutdown_event.is_set(): | |
| try: | |
| if self.monitored_spaces: | |
| await self._check_all_spaces() | |
| await asyncio.sleep(1) | |
| if self._shutdown_event.is_set(): | |
| break | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| self.logger.error(f"监控循环异常: {e}") | |
| await asyncio.sleep(5) | |
| self.logger.info("监控循环结束") | |
| async def _check_all_spaces(self) -> None: | |
| tasks = [] | |
| for space_id, monitor in self.monitored_spaces.items(): | |
| if not monitor.active: | |
| continue | |
| interval = monitor.config.get('check_interval_seconds', self.config.default_check_interval) | |
| if monitor.last_check and (datetime.now() - monitor.last_check).total_seconds() < interval: | |
| continue | |
| task = asyncio.create_task(self._check_space(space_id, monitor)) | |
| tasks.append(task) | |
| if tasks: | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| for i, result in enumerate(results): | |
| if isinstance(result, Exception): | |
| space_id = list(self.monitored_spaces.keys())[i] | |
| self.logger.error(f"检查 Space {space_id} 失败: {result}") | |
| async def _check_space(self, space_id: str, monitor: SpaceMonitor) -> None: | |
| try: | |
| self.stats['total_checks'] += 1 | |
| status_info = await self.client.get_space_status(space_id) | |
| monitor.last_check = datetime.now() | |
| self.stats['last_check_time'] = monitor.last_check | |
| status_changed = monitor.last_status != status_info.status | |
| if status_changed: | |
| await self._handle_status_change(space_id, monitor.last_status, status_info) | |
| monitor.last_status = status_info.status | |
| if status_info.status == SpaceStatus.ERROR: | |
| monitor.error_count += 1 | |
| monitor.consecutive_errors += 1 | |
| await self._handle_error_state(space_id, status_info, monitor) | |
| threshold = monitor.config.get('error_threshold', self.config.error_threshold) | |
| if monitor.consecutive_errors >= threshold: | |
| await self._trigger_error_alert(space_id, monitor) | |
| else: | |
| monitor.consecutive_errors = 0 | |
| await self.db_manager.save_status_history(status_info) | |
| self.stats['successful_checks'] += 1 | |
| except Exception as e: | |
| self.stats['failed_checks'] += 1 | |
| monitor.consecutive_errors += 1 | |
| self.logger.error(f"检查 Space {space_id} 异常: {e}") | |
| event = MonitorEvent( | |
| space_id=space_id, | |
| event_type=EventType.ERROR_DETECTED, | |
| timestamp=datetime.now(), | |
| message=f"检查失败: {str(e)}", | |
| severity=AlertLevel.HIGH | |
| ) | |
| await self._emit_event(event) | |
| async def _handle_status_change(self, space_id: str, old_status: Optional[SpaceStatus], new_status: SpaceStatusInfo) -> None: | |
| event_type = None | |
| message = f"状态变化: {old_status.value if old_status else 'UNKNOWN'} → {new_status.status.value}" | |
| if new_status.status == SpaceStatus.BUILDING: | |
| event_type = EventType.BUILD_STARTED | |
| elif new_status.status == SpaceStatus.RUNNING: | |
| event_type = EventType.SPACE_STARTED | |
| elif new_status.status == SpaceStatus.STOPPED: | |
| event_type = EventType.SPACE_STOPPED | |
| elif new_status.status == SpaceStatus.ERROR: | |
| event_type = EventType.BUILD_FAILED | |
| if event_type: | |
| event = MonitorEvent( | |
| space_id=space_id, | |
| event_type=event_type, | |
| timestamp=datetime.now(), | |
| previous_status=old_status, | |
| current_status=new_status.status, | |
| message=message | |
| ) | |
| await self._emit_event(event) | |
| async def _handle_error_state(self, space_id: str, status_info: SpaceStatusInfo, monitor: SpaceMonitor) -> None: | |
| try: | |
| logs = await self.client.get_space_logs(space_id, lines=monitor.config.get('log_lines_count', 50)) | |
| error_entries = [entry for entry in logs.entries if 'error' in entry.message.lower()] | |
| if error_entries: | |
| latest_error = error_entries[-1].message | |
| event = MonitorEvent( | |
| space_id=space_id, | |
| event_type=EventType.ERROR_DETECTED, | |
| timestamp=datetime.now(), | |
| current_status=SpaceStatus.ERROR, | |
| message=f"检测到错误: {latest_error[:200]}...", | |
| severity=AlertLevel.HIGH, | |
| data={'error_logs': [entry.message for entry in error_entries[-5:]]} | |
| ) | |
| await self._emit_event(event) | |
| except Exception as e: | |
| self.logger.error(f"处理错误状态失败: {e}") | |
| async def _trigger_error_alert(self, space_id: str, monitor: SpaceMonitor) -> None: | |
| alert = Alert( | |
| rule_id="auto_error_threshold", | |
| space_id=space_id, | |
| severity=AlertLevel.CRITICAL, | |
| title=f"Space {space_id} 连续错误", | |
| message=f"Space {space_id} 连续 {monitor.consecutive_errors} 次检查失败,超过阈值 {monitor.config.get('error_threshold', self.config.error_threshold)}", | |
| timestamp=datetime.now() | |
| ) | |
| await self.db_manager.save_alert(alert) | |
| self.stats['alerts_triggered'] += 1 | |
| self.logger.critical(f"触发严重告警: {alert.title}") | |
| async def _emit_event(self, event: MonitorEvent) -> None: | |
| self.stats['events_generated'] += 1 | |
| await self.db_manager.save_monitor_event(event) | |
| await self.event_queue.put(event) | |
| for callback in self.event_callbacks[event.event_type]: | |
| try: | |
| if asyncio.iscoroutinefunction(callback): | |
| await callback(event) | |
| else: | |
| await self._executor.submit(callback, event) | |
| except Exception as e: | |
| self.logger.error(f"事件回调执行失败: {e}") | |
| for rule_id, rule in self.alert_rules.items(): | |
| if await self._should_trigger_alert(rule, event): | |
| await self._create_alert(rule, event) | |
| async def _should_trigger_alert(self, rule: AlertRule, event: MonitorEvent) -> bool: | |
| if not rule.enabled: | |
| return False | |
| if rule.space_id and rule.space_id != event.space_id: | |
| return False | |
| if rule.last_triggered: | |
| cooldown = timedelta(minutes=rule.cooldown_minutes) | |
| if datetime.now() - rule.last_triggered < cooldown: | |
| return False | |
| condition = rule.condition | |
| if 'event_type' in condition and condition['event_type'] != event.event_type.value: | |
| return False | |
| if 'severity' in condition: | |
| required_severity = AlertLevel(condition['severity']) | |
| if event.severity.value < required_severity.value: | |
| return False | |
| return True | |
| async def _create_alert(self, rule: AlertRule, event: MonitorEvent) -> None: | |
| alert = Alert( | |
| rule_id=rule.rule_id, | |
| space_id=event.space_id, | |
| severity=rule.severity, | |
| title=f"告警: {rule.name}", | |
| message=f"{rule.description or ''} - {event.message}", | |
| timestamp=datetime.now(), | |
| metadata={'event_id': event.event_id, 'rule_name': rule.name} | |
| ) | |
| await self.db_manager.save_alert(alert) | |
| self.stats['alerts_triggered'] += 1 | |
| rule.last_triggered = datetime.now() | |
| self.logger.warning(f"触发告警: {alert.title}") | |
| async def get_stats(self) -> Dict[str, Any]: | |
| uptime = None | |
| if self.stats['start_time']: | |
| uptime = (datetime.now() - self.stats['start_time']).total_seconds() | |
| return { | |
| **self.stats, | |
| 'state': self.state.value, | |
| 'monitored_spaces_count': len(self.monitored_spaces), | |
| 'active_spaces_count': sum(1 for m in self.monitored_spaces.values() if m.active), | |
| 'uptime_seconds': uptime, | |
| 'alerts_rules_count': len(self.alert_rules) | |
| } | |
| async def set_webhook_handler(self, handler: WebhookHandler) -> None: | |
| self.webhook_handler = handler | |
| async def handle_webhook(self, payload: Dict[str, Any], headers: Dict[str, str]) -> WebhookEvent: | |
| if not self.webhook_handler: | |
| raise ValueError("Webhook 处理器未设置") | |
| webhook_event = await self.webhook_handler.handle_webhook(payload, headers) | |
| await self.db_manager.save_webhook_event(webhook_event) | |
| space_id = webhook_event.space_id | |
| if space_id in self.monitored_spaces: | |
| await self._check_space(space_id, self.monitored_spaces[space_id]) | |
| return webhook_event | |
| async def pause_monitoring(self, space_id: Optional[str] = None) -> None: | |
| if space_id: | |
| if space_id in self.monitored_spaces: | |
| self.monitored_spaces[space_id].active = False | |
| self.logger.info(f"已暂停监控 Space {space_id}") | |
| else: | |
| self.state = MonitorState.PAUSED | |
| for monitor in self.monitored_spaces.values(): | |
| monitor.active = False | |
| self.logger.info("已暂停所有监控") | |
| async def resume_monitoring(self, space_id: Optional[str] = None) -> None: | |
| if space_id: | |
| if space_id in self.monitored_spaces: | |
| self.monitored_spaces[space_id].active = True | |
| self.logger.info(f"已恢复监控 Space {space_id}") | |
| else: | |
| self.state = MonitorState.RUNNING | |
| for monitor in self.monitored_spaces.values(): | |
| monitor.active = True | |
| self.logger.info("已恢复所有监控") | |
| class HealthChecker: | |
| 健康检查器 | |
| def __init__(self, engine: MonitorEngine): | |
| self.engine = engine | |
| self.logger = logging.getLogger(__name__) | |
| async def check_health(self) -> Dict[str, Any]: | |
| health_status = { | |
| 'status': 'healthy', | |
| 'timestamp': datetime.now().isoformat(), | |
| 'checks': {} | |
| } | |
| try: | |
| stats = await self.engine.get_stats() | |
| health_status['checks']['engine'] = { | |
| 'status': 'healthy' if stats['state'] == 'running' else 'unhealthy', | |
| 'details': stats | |
| } | |
| token_valid = await self.engine.client.client.validate_token() | |
| health_status['checks']['token'] = { | |
| 'status': 'healthy' if token_valid else 'unhealthy', | |
| 'details': {'valid': token_valid} | |
| } | |
| try: | |
| await self.engine.db_manager._init_database() | |
| health_status['checks']['database'] = { | |
| 'status': 'healthy', | |
| 'details': {'connection': 'ok'} | |
| } | |
| except Exception as e: | |
| health_status['checks']['database'] = { | |
| 'status': 'unhealthy', | |
| 'details': {'error': str(e)} | |
| } | |
| overall_status = 'healthy' | |
| for check in health_status['checks'].values(): | |
| if check['status'] != 'healthy': | |
| overall_status = 'unhealthy' | |
| break | |
| health_status['status'] = overall_status | |
| except Exception as e: | |
| health_status['status'] = 'unhealthy' | |
| health_status['error'] = str(e) | |
| return health_status |