hfproxydemo / usage_examples_v2.py
OpenCode Deployer
监控系统开发: 2026-02-01 15:40:53
14f6b4f
"""
HuggingFace Spaces 监控系统使用示例
演示如何使用监控系统的各种功能
"""
import asyncio
import logging
import os
from datetime import datetime
from typing import List
from config import ConfigManager, setup_logging, create_sample_config
from data_models import SpaceStatus, EventType, AlertLevel, AlertRule
from huggingface_client_v2 import HuggingFaceClient, RetryClient, WebhookHandler
from monitor_engine import MonitorEngine, HealthChecker
async def example_basic_monitoring():
基础监控示例
print("=" * 50)
print("基础监控示例")
print("=" * 50)
client = RetryClient(HuggingFaceClient())
try:
space_status = await client.get_space_status("meta-llama/Llama-2-7b-chat-hf")
print(f"Space 状态: {space_status.status.value}")
print(f"运行时阶段: {space_status.runtime.stage}")
print(f"运行时状态: {space_status.runtime.state}")
logs = await client.get_space_logs("meta-llama/Llama-2-7b-chat-hf", lines=10)
print(f"获取到 {len(logs.entries)} 条日志")
except Exception as e:
print(f"监控失败: {e}")
finally:
await client.client.close()
async def example_monitor_engine():
监控引擎示例
print("=" * 50)
print("监控引擎示例")
print("=" * 50)
engine = MonitorEngine()
def on_status_change(event):
print(f"状态变化事件: {event.space_id} - {event.message}")
def on_error(event):
print(f"错误事件: {event.space_id} - {event.message}")
engine.register_event_callback(EventType.STATUS_CHANGE, on_status_change)
engine.register_event_callback(EventType.ERROR_DETECTED, on_error)
try:
await engine.start()
await engine.add_space("meta-llama/Llama-2-7b-chat-hf")
alert_rule = AlertRule(
name="连续错误告警",
description="当 Space 连续 3 次检查失败时触发告警",
condition={"consecutive_errors": 3},
severity=AlertLevel.HIGH,
cooldown_minutes=30
)
await engine.add_alert_rule(alert_rule)
print("监控运行中,等待 30 秒...")
await asyncio.sleep(30)
stats = await engine.get_stats()
print(f"监控统计: {stats}")
except Exception as e:
print(f"监控引擎异常: {e}")
finally:
await engine.stop()
async def example_webhook_handling():
Webhook 处理示例
print("=" * 50)
print("Webhook 处理示例")
print("=" * 50)
client = HuggingFaceClient()
handler = WebhookHandler(client, secret="test-secret")
sample_webhook = {
"event": "space.status_updated",
"space": {
"id": "test-space",
"runtime": {
"stage": "RUNNING",
"state": "RUNNING"
}
}
}
headers = {
"X-Hub-Signature-256": "sha256=fake-signature"
}
try:
event = await handler.handle_webhook(sample_webhook, headers)
print(f"Webhook 事件处理完成: {event.event_type.value}")
except Exception as e:
print(f"Webhook 处理失败: {e}")
finally:
await client.close()
async def example_search_spaces():
搜索 Spaces 示例
print("=" * 50)
print("搜索 Spaces 示例")
print("=" * 50)
client = HuggingFaceClient()
try:
spaces = await client.search_spaces("text-generation", limit=5)
print(f"找到 {len(spaces)} 个 Spaces:")
for space in spaces:
print(f" - {space.space_id} by {space.author}")
print(f" SDK: {space.sdk}")
print(f" 描述: {space.description[:100]}...")
print()
except Exception as e:
print(f"搜索失败: {e}")
finally:
await client.close()
async def example_user_spaces():
获取用户 Spaces 示例
print("=" * 50)
print("获取用户 Spaces 示例")
print("=" * 50)
client = HuggingFaceClient()
try:
spaces = await client.get_user_spaces()
print(f"用户有 {len(spaces)} 个 Spaces:")
for space in spaces[:10]:
print(f" - {space.space_id}")
print(f" 状态: {space.last_modified}")
print()
except Exception as e:
print(f"获取用户 Spaces 失败: {e}")
finally:
await client.close()
async def example_health_check():
健康检查示例
print("=" * 50)
print("健康检查示例")
print("=" * 50)
engine = MonitorEngine()
health_checker = HealthChecker(engine)
try:
health_status = await health_checker.check_health()
print("健康检查结果:")
print(f" 总体状态: {health_status['status']}")
for check_name, check_result in health_status['checks'].items():
print(f" {check_name}: {check_result['status']}")
if 'details' in check_result:
print(f" 详情: {check_result['details']}")
except Exception as e:
print(f"健康检查失败: {e}")
async def example_configuration():
配置管理示例
print("=" * 50)
print("配置管理示例")
print("=" * 50)
print("1. 创建示例配置文件...")
create_sample_config("example_config.json")
print("2. 加载配置...")
config_manager = ConfigManager("example_config.json")
config = config_manager.get_config()
print(f"API 配置: {config.api.base_url}")
print(f"监控间隔: {config.monitoring.default_check_interval} 秒")
print(f"日志级别: {config.logging.level}")
print("3. 验证配置...")
errors = config_manager.validate_config()
if errors:
print("配置错误:")
for error in errors:
print(f" - {error}")
else:
print("配置验证通过")
print("4. 设置日志...")
setup_logging(config.logging)
logger = logging.getLogger(__name__)
logger.info("日志系统已初始化")
async def example_batch_monitoring():
批量监控示例
print("=" * 50)
print("批量监控示例")
print("=" * 50)
engine = MonitorEngine()
space_ids = [
"meta-llama/Llama-2-7b-chat-hf",
"stabilityai/stable-diffusion",
"microsoft/DialoGPT-medium"
]
try:
await engine.start()
print(f"添加 {len(space_ids)} 个 Spaces 到监控列表...")
for space_id in space_ids:
try:
await engine.add_space(space_id)
print(f" ✓ {space_id}")
except Exception as e:
print(f" ✗ {space_id}: {e}")
print("监控运行 60 秒...")
await asyncio.sleep(60)
monitored_spaces = await engine.get_monitored_spaces()
print(f"当前监控的 Spaces: {monitored_spaces}")
for space_id in monitored_spaces:
events = await engine.get_space_events(space_id, limit=5)
print(f"{space_id}: {len(events)} 个事件")
for event in events:
print(f" - {event.event_type.value}: {event.message}")
except Exception as e:
print(f"批量监控异常: {e}")
finally:
await engine.stop()
async def example_advanced_features():
高级功能示例
print("=" * 50)
print("高级功能示例")
print("=" * 50)
engine = MonitorEngine()
async def advanced_event_handler(event):
print(f"高级事件处理器: {event.event_type.value} - {event.space_id}")
if event.event_type == EventType.ERROR_DETECTED:
print(" 检测到错误,可以执行自动修复逻辑")
elif event.event_type == EventType.SPACE_STARTED:
print(" Space 启动,可以发送通知")
engine.register_event_callback(EventType.ERROR_DETECTED, advanced_event_handler)
engine.register_event_callback(EventType.SPACE_STARTED, advanced_event_handler)
try:
await engine.start()
await engine.add_space("meta-llama/Llama-2-7b-chat-hf")
custom_alert_rule = AlertRule(
name="自定义状态变化告警",
description="当 Space 从运行状态变为错误状态时触发",
condition={
"event_type": "status_change",
"from_status": "running",
"to_status": "error"
},
severity=AlertLevel.MEDIUM,
cooldown_minutes=15
)
await engine.add_alert_rule(custom_alert_rule)
print("运行高级功能演示 45 秒...")
await asyncio.sleep(45)
await engine.pause_monitoring()
print("监控已暂停 10 秒...")
await asyncio.sleep(10)
await engine.resume_monitoring()
print("监控已恢复")
await asyncio.sleep(10)
except Exception as e:
print(f"高级功能演示异常: {e}")
finally:
await engine.stop()
async def main():
print("HuggingFace Spaces 监控系统 - 使用示例")
print("=" * 60)
if not os.getenv("HF_TOKEN"):
print("警告: 未设置 HF_TOKEN 环境变量")
print("请设置有效的 HuggingFace 访问令牌以运行完整示例")
print()
examples = [
("配置管理", example_configuration),
("基础监控", example_basic_monitoring),
("搜索 Spaces", example_search_spaces),
("Webhook 处理", example_webhook_handling),
("健康检查", example_health_check),
("监控引擎", example_monitor_engine),
("批量监控", example_batch_monitoring),
("高级功能", example_advanced_features),
]
for name, example_func in examples:
print(f"\n运行示例: {name}")
try:
await example_func()
except Exception as e:
print(f"示例 {name} 执行失败: {e}")
print()
if __name__ == "__main__":
asyncio.run(main())