""" HuggingFace Spaces 监控系统单元测试 """ import pytest import asyncio from unittest.mock import Mock, AsyncMock, patch from datetime import datetime import json import os from config import ConfigManager, APIConfig from data_models import ( SpaceInfo, SpaceStatus, SpaceStatusInfo, SpaceRuntime, MonitorEvent, EventType, AlertLevel, AlertRule ) from huggingface_client_v2 import HuggingFaceClient, RetryClient, WebhookHandler from monitor_engine import MonitorEngine, HealthChecker, SpaceMonitor class TestConfigManager: def test_load_default_config(self): with patch.dict(os.environ, {'HF_TOKEN': 'test-token'}): manager = ConfigManager() config = manager.get_config() assert config.api.token == 'test-token' assert config.api.base_url == 'https://huggingface.co/api' def test_validate_config(self): with patch.dict(os.environ, {'HF_TOKEN': 'test-token'}): manager = ConfigManager() errors = manager.validate_config() assert len(errors) == 0 def test_validate_missing_token(self): manager = ConfigManager() manager.config = None errors = manager.validate_config() assert any('HF_TOKEN' in error for error in errors) class TestHuggingFaceClient: @pytest.fixture def client(self): return HuggingFaceClient(token="test-token") @pytest.fixture def mock_session(self): session = AsyncMock() return session @pytest.mark.asyncio async def test_get_space_info_success(self, client, mock_session): mock_response = { 'id': 'test-space', 'url': 'https://huggingface.co/spaces/test-space', 'author': 'test-user', 'description': 'Test space', 'sdk': 'gradio', 'lastModified': '2024-01-01T00:00:00.000Z' } with patch.object(client, '_get_session', return_value=mock_session): with patch.object(client, '_make_request', return_value=mock_response): space_info = await client.get_space_info('test-space') assert space_info.space_id == 'test-space' assert space_info.author == 'test-user' assert space_info.sdk == 'gradio' @pytest.mark.asyncio async def test_get_space_status_success(self, client, mock_session): with patch.object(client, '_get_session', return_value=mock_session): with patch.object(client, 'get_space_info', return_value=Mock()): with patch.object(client, 'get_space_runtime', return_value=SpaceRuntime( stage='RUNNING', state='RUNNING' )): status = await client.get_space_status('test-space') assert status.space_id == 'test-space' assert status.status == SpaceStatus.RUNNING @pytest.mark.asyncio async def test_rate_limit(self, client): client.config.rate_limit_per_minute = 2 start_time = asyncio.get_event_loop().time() for i in range(3): with patch.object(client, '_get_session', return_value AsyncMock()): with patch.object(client, '_make_request', return_value={}): await client.get_space_info('test-space') elapsed = asyncio.get_event_loop().time() - start_time assert elapsed >= 60 class TestRetryClient: @pytest.fixture def retry_client(self): base_client = Mock() return RetryClient(base_client, max_retries=2, base_delay=0.1) @pytest.mark.asyncio async def test_success_on_first_try(self, retry_client): retry_client.client.get_space_status = AsyncMock(return_value=Mock()) result = await retry_client.get_space_status('test-space') assert result is not None retry_client.client.get_space_status.assert_called_once() @pytest.mark.asyncio async def test_retry_on_failure(self, retry_client): retry_client.client.get_space_status = AsyncMock( side_effect=[Exception("First failure"), Mock(success=True)] ) result = await retry_client.get_space_status('test-space') assert result is not None assert retry_client.client.get_space_status.call_count == 2 @pytest.mark.asyncio async def test_max_retries_exceeded(self, retry_client): retry_client.client.get_space_status = AsyncMock( side_effect=Exception("Persistent failure") ) with pytest.raises(Exception): await retry_client.get_space_status('test-space') assert retry_client.client.get_space_status.call_count == 3 class TestWebhookHandler: @pytest.fixture def webhook_handler(self): client = Mock() return WebhookHandler(client, secret="test-secret") @pytest.mark.asyncio async def test_handle_valid_webhook(self, webhook_handler): payload = { 'event': 'space.status_updated', 'space': { 'id': 'test-space', 'runtime': {'stage': 'RUNNING', 'state': 'RUNNING'} } } with patch.object(webhook_handler, '_verify_signature'): event = await webhook_handler.handle_webhook(payload, {}) assert event.space_id == 'test-space' assert event.processed @pytest.mark.asyncio async def test_handle_unknown_event(self, webhook_handler): payload = { 'event': 'unknown.event', 'space': {'id': 'test-space'} } with patch.object(webhook_handler, '_verify_signature'): event = await webhook_handler.handle_webhook(payload, {}) assert not event.processed class TestMonitorEngine: @pytest.fixture def engine(self): return MonitorEngine() @pytest.mark.asyncio async def test_add_space(self, engine): with patch.object(engine.client.client, 'get_space_info', return_value=SpaceInfo( space_id='test-space', name='test-space' )): with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo( space_id='test-space', status=SpaceStatus.RUNNING, runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'), timestamp=datetime.now() )): with patch.object(engine.db_manager, 'save_space_info'): with patch.object(engine, '_emit_event'): await engine.add_space('test-space') assert 'test-space' in engine.monitored_spaces @pytest.mark.asyncio async def test_remove_space(self, engine): monitor = SpaceMonitor(space_id='test-space', config={}) engine.monitored_spaces['test-space'] = monitor with patch.object(engine, '_emit_event'): await engine.remove_space('test-space') assert 'test-space' not in engine.monitored_spaces @pytest.mark.asyncio async def test_status_change_event(self, engine): monitor = SpaceMonitor( space_id='test-space', config={}, last_status=SpaceStatus.BUILDING ) engine.monitored_spaces['test-space'] = monitor with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo( space_id='test-space', status=SpaceStatus.RUNNING, runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'), timestamp=datetime.now() )): with patch.object(engine.db_manager, 'save_status_history'): with patch.object(engine, '_handle_status_change') as mock_handler: await engine._check_space('test-space', monitor) mock_handler.assert_called_once() @pytest.mark.asyncio async def test_error_threshold_trigger(self, engine): monitor = SpaceMonitor( space_id='test-space', config={'error_threshold': 2}, consecutive_errors=1 ) engine.monitored_spaces['test-space'] = monitor with patch.object(engine.client, 'get_space_status', side_effect=Exception("API Error")): with patch.object(engine, '_trigger_error_alert') as mock_alert: await engine._check_space('test-space', monitor) mock_alert.assert_called_once() def test_register_event_callback(self, engine): callback = Mock() engine.register_event_callback(EventType.ERROR_DETECTED, callback) assert callback in engine.event_callbacks[EventType.ERROR_DETECTED] def test_unregister_event_callback(self, engine): callback = Mock() engine.event_callbacks[EventType.ERROR_DETECTED].append(callback) engine.unregister_event_callback(EventType.ERROR_DETECTED, callback) assert callback not in engine.event_callbacks[EventType.ERROR_DETECTED] class TestHealthChecker: @pytest.fixture def health_checker(self): engine = Mock() return HealthChecker(engine) @pytest.mark.asyncio async def test_healthy_status(self, health_checker): health_checker.engine.get_stats = AsyncMock(return_value={ 'state': 'running' }) health_checker.engine.client.client.validate_token = AsyncMock(return_value=True) with patch.object(health_checker.engine.db_manager, '_init_database'): status = await health_checker.check_health() assert status['status'] == 'healthy' @pytest.mark.asyncio async def test_unhealthy_engine(self, health_checker): health_checker.engine.get_stats = AsyncMock(return_value={ 'state': 'error' }) status = await health_checker.check_health() assert status['status'] == 'unhealthy' assert 'engine' in status['checks'] class TestDataModels: def test_space_info_creation(self): space_info = SpaceInfo( space_id='test-space', name='Test Space', author='test-user', sdk='gradio' ) assert space_info.space_id == 'test-space' assert space_info.author == 'test-user' assert space_info.sdk == 'gradio' def test_monitor_event_creation(self): event = MonitorEvent( space_id='test-space', event_type=EventType.ERROR_DETECTED, timestamp=datetime.now(), message='Test error', severity=AlertLevel.HIGH ) assert event.space_id == 'test-space' assert event.event_type == EventType.ERROR_DETECTED assert event.severity == AlertLevel.HIGH def test_alert_rule_creation(self): rule = AlertRule( name='Test Rule', condition={'event_type': 'error'}, severity=AlertLevel.MEDIUM, cooldown_minutes=30 ) assert rule.name == 'Test Rule' assert rule.severity == AlertLevel.MEDIUM assert rule.cooldown_minutes == 30 class TestIntegration: @pytest.mark.asyncio async def test_full_monitoring_cycle(self): with patch('config.get_config') as mock_config: mock_config.return_value.monitoring.default_check_interval = 1 engine = MonitorEngine() with patch.object(engine.client.client, 'validate_token', return_value=True): with patch.object(engine.client.client, 'get_space_info', return_value=SpaceInfo( space_id='test-space', name='test-space' )): with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo( space_id='test-space', status=SpaceStatus.RUNNING, runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'), timestamp=datetime.now() )): with patch.object(engine.db_manager, 'save_space_info'): with patch.object(engine.db_manager, 'save_status_history'): with patch.object(engine, '_emit_event'): await engine.start() await engine.add_space('test-space') await asyncio.sleep(2) stats = await engine.get_stats() assert stats['total_checks'] > 0 await engine.stop() if __name__ == "__main__": pytest.main([__file__, "-v"])