hfproxydemo / test_monitor_system.py
OpenCode Deployer
监控系统开发: 2026-02-01 15:40:53
14f6b4f
"""
HuggingFace Spaces 监控系统单元测试
"""
import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime
import json
import os
from config import ConfigManager, APIConfig
from data_models import (
SpaceInfo, SpaceStatus, SpaceStatusInfo, SpaceRuntime,
MonitorEvent, EventType, AlertLevel, AlertRule
)
from huggingface_client_v2 import HuggingFaceClient, RetryClient, WebhookHandler
from monitor_engine import MonitorEngine, HealthChecker, SpaceMonitor
class TestConfigManager:
def test_load_default_config(self):
with patch.dict(os.environ, {'HF_TOKEN': 'test-token'}):
manager = ConfigManager()
config = manager.get_config()
assert config.api.token == 'test-token'
assert config.api.base_url == 'https://huggingface.co/api'
def test_validate_config(self):
with patch.dict(os.environ, {'HF_TOKEN': 'test-token'}):
manager = ConfigManager()
errors = manager.validate_config()
assert len(errors) == 0
def test_validate_missing_token(self):
manager = ConfigManager()
manager.config = None
errors = manager.validate_config()
assert any('HF_TOKEN' in error for error in errors)
class TestHuggingFaceClient:
@pytest.fixture
def client(self):
return HuggingFaceClient(token="test-token")
@pytest.fixture
def mock_session(self):
session = AsyncMock()
return session
@pytest.mark.asyncio
async def test_get_space_info_success(self, client, mock_session):
mock_response = {
'id': 'test-space',
'url': 'https://huggingface.co/spaces/test-space',
'author': 'test-user',
'description': 'Test space',
'sdk': 'gradio',
'lastModified': '2024-01-01T00:00:00.000Z'
}
with patch.object(client, '_get_session', return_value=mock_session):
with patch.object(client, '_make_request', return_value=mock_response):
space_info = await client.get_space_info('test-space')
assert space_info.space_id == 'test-space'
assert space_info.author == 'test-user'
assert space_info.sdk == 'gradio'
@pytest.mark.asyncio
async def test_get_space_status_success(self, client, mock_session):
with patch.object(client, '_get_session', return_value=mock_session):
with patch.object(client, 'get_space_info', return_value=Mock()):
with patch.object(client, 'get_space_runtime', return_value=SpaceRuntime(
stage='RUNNING', state='RUNNING'
)):
status = await client.get_space_status('test-space')
assert status.space_id == 'test-space'
assert status.status == SpaceStatus.RUNNING
@pytest.mark.asyncio
async def test_rate_limit(self, client):
client.config.rate_limit_per_minute = 2
start_time = asyncio.get_event_loop().time()
for i in range(3):
with patch.object(client, '_get_session', return_value AsyncMock()):
with patch.object(client, '_make_request', return_value={}):
await client.get_space_info('test-space')
elapsed = asyncio.get_event_loop().time() - start_time
assert elapsed >= 60
class TestRetryClient:
@pytest.fixture
def retry_client(self):
base_client = Mock()
return RetryClient(base_client, max_retries=2, base_delay=0.1)
@pytest.mark.asyncio
async def test_success_on_first_try(self, retry_client):
retry_client.client.get_space_status = AsyncMock(return_value=Mock())
result = await retry_client.get_space_status('test-space')
assert result is not None
retry_client.client.get_space_status.assert_called_once()
@pytest.mark.asyncio
async def test_retry_on_failure(self, retry_client):
retry_client.client.get_space_status = AsyncMock(
side_effect=[Exception("First failure"), Mock(success=True)]
)
result = await retry_client.get_space_status('test-space')
assert result is not None
assert retry_client.client.get_space_status.call_count == 2
@pytest.mark.asyncio
async def test_max_retries_exceeded(self, retry_client):
retry_client.client.get_space_status = AsyncMock(
side_effect=Exception("Persistent failure")
)
with pytest.raises(Exception):
await retry_client.get_space_status('test-space')
assert retry_client.client.get_space_status.call_count == 3
class TestWebhookHandler:
@pytest.fixture
def webhook_handler(self):
client = Mock()
return WebhookHandler(client, secret="test-secret")
@pytest.mark.asyncio
async def test_handle_valid_webhook(self, webhook_handler):
payload = {
'event': 'space.status_updated',
'space': {
'id': 'test-space',
'runtime': {'stage': 'RUNNING', 'state': 'RUNNING'}
}
}
with patch.object(webhook_handler, '_verify_signature'):
event = await webhook_handler.handle_webhook(payload, {})
assert event.space_id == 'test-space'
assert event.processed
@pytest.mark.asyncio
async def test_handle_unknown_event(self, webhook_handler):
payload = {
'event': 'unknown.event',
'space': {'id': 'test-space'}
}
with patch.object(webhook_handler, '_verify_signature'):
event = await webhook_handler.handle_webhook(payload, {})
assert not event.processed
class TestMonitorEngine:
@pytest.fixture
def engine(self):
return MonitorEngine()
@pytest.mark.asyncio
async def test_add_space(self, engine):
with patch.object(engine.client.client, 'get_space_info', return_value=SpaceInfo(
space_id='test-space', name='test-space'
)):
with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo(
space_id='test-space', status=SpaceStatus.RUNNING,
runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'),
timestamp=datetime.now()
)):
with patch.object(engine.db_manager, 'save_space_info'):
with patch.object(engine, '_emit_event'):
await engine.add_space('test-space')
assert 'test-space' in engine.monitored_spaces
@pytest.mark.asyncio
async def test_remove_space(self, engine):
monitor = SpaceMonitor(space_id='test-space', config={})
engine.monitored_spaces['test-space'] = monitor
with patch.object(engine, '_emit_event'):
await engine.remove_space('test-space')
assert 'test-space' not in engine.monitored_spaces
@pytest.mark.asyncio
async def test_status_change_event(self, engine):
monitor = SpaceMonitor(
space_id='test-space',
config={},
last_status=SpaceStatus.BUILDING
)
engine.monitored_spaces['test-space'] = monitor
with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo(
space_id='test-space', status=SpaceStatus.RUNNING,
runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'),
timestamp=datetime.now()
)):
with patch.object(engine.db_manager, 'save_status_history'):
with patch.object(engine, '_handle_status_change') as mock_handler:
await engine._check_space('test-space', monitor)
mock_handler.assert_called_once()
@pytest.mark.asyncio
async def test_error_threshold_trigger(self, engine):
monitor = SpaceMonitor(
space_id='test-space',
config={'error_threshold': 2},
consecutive_errors=1
)
engine.monitored_spaces['test-space'] = monitor
with patch.object(engine.client, 'get_space_status', side_effect=Exception("API Error")):
with patch.object(engine, '_trigger_error_alert') as mock_alert:
await engine._check_space('test-space', monitor)
mock_alert.assert_called_once()
def test_register_event_callback(self, engine):
callback = Mock()
engine.register_event_callback(EventType.ERROR_DETECTED, callback)
assert callback in engine.event_callbacks[EventType.ERROR_DETECTED]
def test_unregister_event_callback(self, engine):
callback = Mock()
engine.event_callbacks[EventType.ERROR_DETECTED].append(callback)
engine.unregister_event_callback(EventType.ERROR_DETECTED, callback)
assert callback not in engine.event_callbacks[EventType.ERROR_DETECTED]
class TestHealthChecker:
@pytest.fixture
def health_checker(self):
engine = Mock()
return HealthChecker(engine)
@pytest.mark.asyncio
async def test_healthy_status(self, health_checker):
health_checker.engine.get_stats = AsyncMock(return_value={
'state': 'running'
})
health_checker.engine.client.client.validate_token = AsyncMock(return_value=True)
with patch.object(health_checker.engine.db_manager, '_init_database'):
status = await health_checker.check_health()
assert status['status'] == 'healthy'
@pytest.mark.asyncio
async def test_unhealthy_engine(self, health_checker):
health_checker.engine.get_stats = AsyncMock(return_value={
'state': 'error'
})
status = await health_checker.check_health()
assert status['status'] == 'unhealthy'
assert 'engine' in status['checks']
class TestDataModels:
def test_space_info_creation(self):
space_info = SpaceInfo(
space_id='test-space',
name='Test Space',
author='test-user',
sdk='gradio'
)
assert space_info.space_id == 'test-space'
assert space_info.author == 'test-user'
assert space_info.sdk == 'gradio'
def test_monitor_event_creation(self):
event = MonitorEvent(
space_id='test-space',
event_type=EventType.ERROR_DETECTED,
timestamp=datetime.now(),
message='Test error',
severity=AlertLevel.HIGH
)
assert event.space_id == 'test-space'
assert event.event_type == EventType.ERROR_DETECTED
assert event.severity == AlertLevel.HIGH
def test_alert_rule_creation(self):
rule = AlertRule(
name='Test Rule',
condition={'event_type': 'error'},
severity=AlertLevel.MEDIUM,
cooldown_minutes=30
)
assert rule.name == 'Test Rule'
assert rule.severity == AlertLevel.MEDIUM
assert rule.cooldown_minutes == 30
class TestIntegration:
@pytest.mark.asyncio
async def test_full_monitoring_cycle(self):
with patch('config.get_config') as mock_config:
mock_config.return_value.monitoring.default_check_interval = 1
engine = MonitorEngine()
with patch.object(engine.client.client, 'validate_token', return_value=True):
with patch.object(engine.client.client, 'get_space_info', return_value=SpaceInfo(
space_id='test-space', name='test-space'
)):
with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo(
space_id='test-space', status=SpaceStatus.RUNNING,
runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'),
timestamp=datetime.now()
)):
with patch.object(engine.db_manager, 'save_space_info'):
with patch.object(engine.db_manager, 'save_status_history'):
with patch.object(engine, '_emit_event'):
await engine.start()
await engine.add_space('test-space')
await asyncio.sleep(2)
stats = await engine.get_stats()
assert stats['total_checks'] > 0
await engine.stop()
if __name__ == "__main__":
pytest.main([__file__, "-v"])