Spaces:
Build error
Build error
| """ | |
| HuggingFace Spaces 监控系统单元测试 | |
| """ | |
| import pytest | |
| import asyncio | |
| from unittest.mock import Mock, AsyncMock, patch | |
| from datetime import datetime | |
| import json | |
| import os | |
| from config import ConfigManager, APIConfig | |
| from data_models import ( | |
| SpaceInfo, SpaceStatus, SpaceStatusInfo, SpaceRuntime, | |
| MonitorEvent, EventType, AlertLevel, AlertRule | |
| ) | |
| from huggingface_client_v2 import HuggingFaceClient, RetryClient, WebhookHandler | |
| from monitor_engine import MonitorEngine, HealthChecker, SpaceMonitor | |
| class TestConfigManager: | |
| def test_load_default_config(self): | |
| with patch.dict(os.environ, {'HF_TOKEN': 'test-token'}): | |
| manager = ConfigManager() | |
| config = manager.get_config() | |
| assert config.api.token == 'test-token' | |
| assert config.api.base_url == 'https://huggingface.co/api' | |
| def test_validate_config(self): | |
| with patch.dict(os.environ, {'HF_TOKEN': 'test-token'}): | |
| manager = ConfigManager() | |
| errors = manager.validate_config() | |
| assert len(errors) == 0 | |
| def test_validate_missing_token(self): | |
| manager = ConfigManager() | |
| manager.config = None | |
| errors = manager.validate_config() | |
| assert any('HF_TOKEN' in error for error in errors) | |
| class TestHuggingFaceClient: | |
| def client(self): | |
| return HuggingFaceClient(token="test-token") | |
| def mock_session(self): | |
| session = AsyncMock() | |
| return session | |
| async def test_get_space_info_success(self, client, mock_session): | |
| mock_response = { | |
| 'id': 'test-space', | |
| 'url': 'https://huggingface.co/spaces/test-space', | |
| 'author': 'test-user', | |
| 'description': 'Test space', | |
| 'sdk': 'gradio', | |
| 'lastModified': '2024-01-01T00:00:00.000Z' | |
| } | |
| with patch.object(client, '_get_session', return_value=mock_session): | |
| with patch.object(client, '_make_request', return_value=mock_response): | |
| space_info = await client.get_space_info('test-space') | |
| assert space_info.space_id == 'test-space' | |
| assert space_info.author == 'test-user' | |
| assert space_info.sdk == 'gradio' | |
| async def test_get_space_status_success(self, client, mock_session): | |
| with patch.object(client, '_get_session', return_value=mock_session): | |
| with patch.object(client, 'get_space_info', return_value=Mock()): | |
| with patch.object(client, 'get_space_runtime', return_value=SpaceRuntime( | |
| stage='RUNNING', state='RUNNING' | |
| )): | |
| status = await client.get_space_status('test-space') | |
| assert status.space_id == 'test-space' | |
| assert status.status == SpaceStatus.RUNNING | |
| async def test_rate_limit(self, client): | |
| client.config.rate_limit_per_minute = 2 | |
| start_time = asyncio.get_event_loop().time() | |
| for i in range(3): | |
| with patch.object(client, '_get_session', return_value AsyncMock()): | |
| with patch.object(client, '_make_request', return_value={}): | |
| await client.get_space_info('test-space') | |
| elapsed = asyncio.get_event_loop().time() - start_time | |
| assert elapsed >= 60 | |
| class TestRetryClient: | |
| def retry_client(self): | |
| base_client = Mock() | |
| return RetryClient(base_client, max_retries=2, base_delay=0.1) | |
| async def test_success_on_first_try(self, retry_client): | |
| retry_client.client.get_space_status = AsyncMock(return_value=Mock()) | |
| result = await retry_client.get_space_status('test-space') | |
| assert result is not None | |
| retry_client.client.get_space_status.assert_called_once() | |
| async def test_retry_on_failure(self, retry_client): | |
| retry_client.client.get_space_status = AsyncMock( | |
| side_effect=[Exception("First failure"), Mock(success=True)] | |
| ) | |
| result = await retry_client.get_space_status('test-space') | |
| assert result is not None | |
| assert retry_client.client.get_space_status.call_count == 2 | |
| async def test_max_retries_exceeded(self, retry_client): | |
| retry_client.client.get_space_status = AsyncMock( | |
| side_effect=Exception("Persistent failure") | |
| ) | |
| with pytest.raises(Exception): | |
| await retry_client.get_space_status('test-space') | |
| assert retry_client.client.get_space_status.call_count == 3 | |
| class TestWebhookHandler: | |
| def webhook_handler(self): | |
| client = Mock() | |
| return WebhookHandler(client, secret="test-secret") | |
| async def test_handle_valid_webhook(self, webhook_handler): | |
| payload = { | |
| 'event': 'space.status_updated', | |
| 'space': { | |
| 'id': 'test-space', | |
| 'runtime': {'stage': 'RUNNING', 'state': 'RUNNING'} | |
| } | |
| } | |
| with patch.object(webhook_handler, '_verify_signature'): | |
| event = await webhook_handler.handle_webhook(payload, {}) | |
| assert event.space_id == 'test-space' | |
| assert event.processed | |
| async def test_handle_unknown_event(self, webhook_handler): | |
| payload = { | |
| 'event': 'unknown.event', | |
| 'space': {'id': 'test-space'} | |
| } | |
| with patch.object(webhook_handler, '_verify_signature'): | |
| event = await webhook_handler.handle_webhook(payload, {}) | |
| assert not event.processed | |
| class TestMonitorEngine: | |
| def engine(self): | |
| return MonitorEngine() | |
| async def test_add_space(self, engine): | |
| with patch.object(engine.client.client, 'get_space_info', return_value=SpaceInfo( | |
| space_id='test-space', name='test-space' | |
| )): | |
| with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo( | |
| space_id='test-space', status=SpaceStatus.RUNNING, | |
| runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'), | |
| timestamp=datetime.now() | |
| )): | |
| with patch.object(engine.db_manager, 'save_space_info'): | |
| with patch.object(engine, '_emit_event'): | |
| await engine.add_space('test-space') | |
| assert 'test-space' in engine.monitored_spaces | |
| async def test_remove_space(self, engine): | |
| monitor = SpaceMonitor(space_id='test-space', config={}) | |
| engine.monitored_spaces['test-space'] = monitor | |
| with patch.object(engine, '_emit_event'): | |
| await engine.remove_space('test-space') | |
| assert 'test-space' not in engine.monitored_spaces | |
| async def test_status_change_event(self, engine): | |
| monitor = SpaceMonitor( | |
| space_id='test-space', | |
| config={}, | |
| last_status=SpaceStatus.BUILDING | |
| ) | |
| engine.monitored_spaces['test-space'] = monitor | |
| with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo( | |
| space_id='test-space', status=SpaceStatus.RUNNING, | |
| runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'), | |
| timestamp=datetime.now() | |
| )): | |
| with patch.object(engine.db_manager, 'save_status_history'): | |
| with patch.object(engine, '_handle_status_change') as mock_handler: | |
| await engine._check_space('test-space', monitor) | |
| mock_handler.assert_called_once() | |
| async def test_error_threshold_trigger(self, engine): | |
| monitor = SpaceMonitor( | |
| space_id='test-space', | |
| config={'error_threshold': 2}, | |
| consecutive_errors=1 | |
| ) | |
| engine.monitored_spaces['test-space'] = monitor | |
| with patch.object(engine.client, 'get_space_status', side_effect=Exception("API Error")): | |
| with patch.object(engine, '_trigger_error_alert') as mock_alert: | |
| await engine._check_space('test-space', monitor) | |
| mock_alert.assert_called_once() | |
| def test_register_event_callback(self, engine): | |
| callback = Mock() | |
| engine.register_event_callback(EventType.ERROR_DETECTED, callback) | |
| assert callback in engine.event_callbacks[EventType.ERROR_DETECTED] | |
| def test_unregister_event_callback(self, engine): | |
| callback = Mock() | |
| engine.event_callbacks[EventType.ERROR_DETECTED].append(callback) | |
| engine.unregister_event_callback(EventType.ERROR_DETECTED, callback) | |
| assert callback not in engine.event_callbacks[EventType.ERROR_DETECTED] | |
| class TestHealthChecker: | |
| def health_checker(self): | |
| engine = Mock() | |
| return HealthChecker(engine) | |
| async def test_healthy_status(self, health_checker): | |
| health_checker.engine.get_stats = AsyncMock(return_value={ | |
| 'state': 'running' | |
| }) | |
| health_checker.engine.client.client.validate_token = AsyncMock(return_value=True) | |
| with patch.object(health_checker.engine.db_manager, '_init_database'): | |
| status = await health_checker.check_health() | |
| assert status['status'] == 'healthy' | |
| async def test_unhealthy_engine(self, health_checker): | |
| health_checker.engine.get_stats = AsyncMock(return_value={ | |
| 'state': 'error' | |
| }) | |
| status = await health_checker.check_health() | |
| assert status['status'] == 'unhealthy' | |
| assert 'engine' in status['checks'] | |
| class TestDataModels: | |
| def test_space_info_creation(self): | |
| space_info = SpaceInfo( | |
| space_id='test-space', | |
| name='Test Space', | |
| author='test-user', | |
| sdk='gradio' | |
| ) | |
| assert space_info.space_id == 'test-space' | |
| assert space_info.author == 'test-user' | |
| assert space_info.sdk == 'gradio' | |
| def test_monitor_event_creation(self): | |
| event = MonitorEvent( | |
| space_id='test-space', | |
| event_type=EventType.ERROR_DETECTED, | |
| timestamp=datetime.now(), | |
| message='Test error', | |
| severity=AlertLevel.HIGH | |
| ) | |
| assert event.space_id == 'test-space' | |
| assert event.event_type == EventType.ERROR_DETECTED | |
| assert event.severity == AlertLevel.HIGH | |
| def test_alert_rule_creation(self): | |
| rule = AlertRule( | |
| name='Test Rule', | |
| condition={'event_type': 'error'}, | |
| severity=AlertLevel.MEDIUM, | |
| cooldown_minutes=30 | |
| ) | |
| assert rule.name == 'Test Rule' | |
| assert rule.severity == AlertLevel.MEDIUM | |
| assert rule.cooldown_minutes == 30 | |
| class TestIntegration: | |
| async def test_full_monitoring_cycle(self): | |
| with patch('config.get_config') as mock_config: | |
| mock_config.return_value.monitoring.default_check_interval = 1 | |
| engine = MonitorEngine() | |
| with patch.object(engine.client.client, 'validate_token', return_value=True): | |
| with patch.object(engine.client.client, 'get_space_info', return_value=SpaceInfo( | |
| space_id='test-space', name='test-space' | |
| )): | |
| with patch.object(engine.client, 'get_space_status', return_value=SpaceStatusInfo( | |
| space_id='test-space', status=SpaceStatus.RUNNING, | |
| runtime=SpaceRuntime(stage='RUNNING', state='RUNNING'), | |
| timestamp=datetime.now() | |
| )): | |
| with patch.object(engine.db_manager, 'save_space_info'): | |
| with patch.object(engine.db_manager, 'save_status_history'): | |
| with patch.object(engine, '_emit_event'): | |
| await engine.start() | |
| await engine.add_space('test-space') | |
| await asyncio.sleep(2) | |
| stats = await engine.get_stats() | |
| assert stats['total_checks'] > 0 | |
| await engine.stop() | |
| if __name__ == "__main__": | |
| pytest.main([__file__, "-v"]) |