""" HuggingFace Spaces 监控系统使用示例 演示如何使用监控系统的各种功能 """ import asyncio import logging import os from datetime import datetime from typing import List from config import ConfigManager, setup_logging, create_sample_config from data_models import SpaceStatus, EventType, AlertLevel, AlertRule from huggingface_client_v2 import HuggingFaceClient, RetryClient, WebhookHandler from monitor_engine import MonitorEngine, HealthChecker async def example_basic_monitoring(): 基础监控示例 print("=" * 50) print("基础监控示例") print("=" * 50) client = RetryClient(HuggingFaceClient()) try: space_status = await client.get_space_status("meta-llama/Llama-2-7b-chat-hf") print(f"Space 状态: {space_status.status.value}") print(f"运行时阶段: {space_status.runtime.stage}") print(f"运行时状态: {space_status.runtime.state}") logs = await client.get_space_logs("meta-llama/Llama-2-7b-chat-hf", lines=10) print(f"获取到 {len(logs.entries)} 条日志") except Exception as e: print(f"监控失败: {e}") finally: await client.client.close() async def example_monitor_engine(): 监控引擎示例 print("=" * 50) print("监控引擎示例") print("=" * 50) engine = MonitorEngine() def on_status_change(event): print(f"状态变化事件: {event.space_id} - {event.message}") def on_error(event): print(f"错误事件: {event.space_id} - {event.message}") engine.register_event_callback(EventType.STATUS_CHANGE, on_status_change) engine.register_event_callback(EventType.ERROR_DETECTED, on_error) try: await engine.start() await engine.add_space("meta-llama/Llama-2-7b-chat-hf") alert_rule = AlertRule( name="连续错误告警", description="当 Space 连续 3 次检查失败时触发告警", condition={"consecutive_errors": 3}, severity=AlertLevel.HIGH, cooldown_minutes=30 ) await engine.add_alert_rule(alert_rule) print("监控运行中,等待 30 秒...") await asyncio.sleep(30) stats = await engine.get_stats() print(f"监控统计: {stats}") except Exception as e: print(f"监控引擎异常: {e}") finally: await engine.stop() async def example_webhook_handling(): Webhook 处理示例 print("=" * 50) print("Webhook 处理示例") print("=" * 50) client = HuggingFaceClient() handler = WebhookHandler(client, secret="test-secret") sample_webhook = { "event": "space.status_updated", "space": { "id": "test-space", "runtime": { "stage": "RUNNING", "state": "RUNNING" } } } headers = { "X-Hub-Signature-256": "sha256=fake-signature" } try: event = await handler.handle_webhook(sample_webhook, headers) print(f"Webhook 事件处理完成: {event.event_type.value}") except Exception as e: print(f"Webhook 处理失败: {e}") finally: await client.close() async def example_search_spaces(): 搜索 Spaces 示例 print("=" * 50) print("搜索 Spaces 示例") print("=" * 50) client = HuggingFaceClient() try: spaces = await client.search_spaces("text-generation", limit=5) print(f"找到 {len(spaces)} 个 Spaces:") for space in spaces: print(f" - {space.space_id} by {space.author}") print(f" SDK: {space.sdk}") print(f" 描述: {space.description[:100]}...") print() except Exception as e: print(f"搜索失败: {e}") finally: await client.close() async def example_user_spaces(): 获取用户 Spaces 示例 print("=" * 50) print("获取用户 Spaces 示例") print("=" * 50) client = HuggingFaceClient() try: spaces = await client.get_user_spaces() print(f"用户有 {len(spaces)} 个 Spaces:") for space in spaces[:10]: print(f" - {space.space_id}") print(f" 状态: {space.last_modified}") print() except Exception as e: print(f"获取用户 Spaces 失败: {e}") finally: await client.close() async def example_health_check(): 健康检查示例 print("=" * 50) print("健康检查示例") print("=" * 50) engine = MonitorEngine() health_checker = HealthChecker(engine) try: health_status = await health_checker.check_health() print("健康检查结果:") print(f" 总体状态: {health_status['status']}") for check_name, check_result in health_status['checks'].items(): print(f" {check_name}: {check_result['status']}") if 'details' in check_result: print(f" 详情: {check_result['details']}") except Exception as e: print(f"健康检查失败: {e}") async def example_configuration(): 配置管理示例 print("=" * 50) print("配置管理示例") print("=" * 50) print("1. 创建示例配置文件...") create_sample_config("example_config.json") print("2. 加载配置...") config_manager = ConfigManager("example_config.json") config = config_manager.get_config() print(f"API 配置: {config.api.base_url}") print(f"监控间隔: {config.monitoring.default_check_interval} 秒") print(f"日志级别: {config.logging.level}") print("3. 验证配置...") errors = config_manager.validate_config() if errors: print("配置错误:") for error in errors: print(f" - {error}") else: print("配置验证通过") print("4. 设置日志...") setup_logging(config.logging) logger = logging.getLogger(__name__) logger.info("日志系统已初始化") async def example_batch_monitoring(): 批量监控示例 print("=" * 50) print("批量监控示例") print("=" * 50) engine = MonitorEngine() space_ids = [ "meta-llama/Llama-2-7b-chat-hf", "stabilityai/stable-diffusion", "microsoft/DialoGPT-medium" ] try: await engine.start() print(f"添加 {len(space_ids)} 个 Spaces 到监控列表...") for space_id in space_ids: try: await engine.add_space(space_id) print(f" ✓ {space_id}") except Exception as e: print(f" ✗ {space_id}: {e}") print("监控运行 60 秒...") await asyncio.sleep(60) monitored_spaces = await engine.get_monitored_spaces() print(f"当前监控的 Spaces: {monitored_spaces}") for space_id in monitored_spaces: events = await engine.get_space_events(space_id, limit=5) print(f"{space_id}: {len(events)} 个事件") for event in events: print(f" - {event.event_type.value}: {event.message}") except Exception as e: print(f"批量监控异常: {e}") finally: await engine.stop() async def example_advanced_features(): 高级功能示例 print("=" * 50) print("高级功能示例") print("=" * 50) engine = MonitorEngine() async def advanced_event_handler(event): print(f"高级事件处理器: {event.event_type.value} - {event.space_id}") if event.event_type == EventType.ERROR_DETECTED: print(" 检测到错误,可以执行自动修复逻辑") elif event.event_type == EventType.SPACE_STARTED: print(" Space 启动,可以发送通知") engine.register_event_callback(EventType.ERROR_DETECTED, advanced_event_handler) engine.register_event_callback(EventType.SPACE_STARTED, advanced_event_handler) try: await engine.start() await engine.add_space("meta-llama/Llama-2-7b-chat-hf") custom_alert_rule = AlertRule( name="自定义状态变化告警", description="当 Space 从运行状态变为错误状态时触发", condition={ "event_type": "status_change", "from_status": "running", "to_status": "error" }, severity=AlertLevel.MEDIUM, cooldown_minutes=15 ) await engine.add_alert_rule(custom_alert_rule) print("运行高级功能演示 45 秒...") await asyncio.sleep(45) await engine.pause_monitoring() print("监控已暂停 10 秒...") await asyncio.sleep(10) await engine.resume_monitoring() print("监控已恢复") await asyncio.sleep(10) except Exception as e: print(f"高级功能演示异常: {e}") finally: await engine.stop() async def main(): print("HuggingFace Spaces 监控系统 - 使用示例") print("=" * 60) if not os.getenv("HF_TOKEN"): print("警告: 未设置 HF_TOKEN 环境变量") print("请设置有效的 HuggingFace 访问令牌以运行完整示例") print() examples = [ ("配置管理", example_configuration), ("基础监控", example_basic_monitoring), ("搜索 Spaces", example_search_spaces), ("Webhook 处理", example_webhook_handling), ("健康检查", example_health_check), ("监控引擎", example_monitor_engine), ("批量监控", example_batch_monitoring), ("高级功能", example_advanced_features), ] for name, example_func in examples: print(f"\n运行示例: {name}") try: await example_func() except Exception as e: print(f"示例 {name} 执行失败: {e}") print() if __name__ == "__main__": asyncio.run(main())