Spaces:
Sleeping
Sleeping
| import pytest | |
| from fastapi.testclient import TestClient | |
| import hf_unified_server | |
| client = TestClient(hf_unified_server.app) | |
| def test_local_resource_service_exposes_assets(): | |
| """Loader should expose all symbols from the canonical registry.""" | |
| service = hf_unified_server.local_resource_service | |
| service.refresh() | |
| symbols = service.get_supported_symbols() | |
| assert "BTC" in symbols | |
| assert len(symbols) >= 5 | |
| def test_top_prices_endpoint_uses_local_fallback(monkeypatch): | |
| """/api/crypto/prices/top should gracefully fall back to the local registry.""" | |
| async def fail_get_top_coins(*_args, **_kwargs): | |
| raise hf_unified_server.CollectorError("coingecko unavailable") | |
| monkeypatch.setattr(hf_unified_server.market_collector, "get_top_coins", fail_get_top_coins) | |
| hf_unified_server.local_resource_service.refresh() | |
| response = client.get("/api/crypto/prices/top?limit=4") | |
| assert response.status_code == 200 | |
| payload = response.json() | |
| assert payload["source"] == "local-fallback" | |
| assert payload["count"] == 4 | |
| def test_market_prices_endpoint_survives_provider_failure(monkeypatch): | |
| """Critical market endpoints must respond even when live providers fail.""" | |
| async def fail_coin_details(*_args, **_kwargs): | |
| raise hf_unified_server.CollectorError("binance unavailable") | |
| async def fail_top_coins(*_args, **_kwargs): | |
| raise hf_unified_server.CollectorError("coingecko unavailable") | |
| monkeypatch.setattr(hf_unified_server.market_collector, "get_coin_details", fail_coin_details) | |
| monkeypatch.setattr(hf_unified_server.market_collector, "get_top_coins", fail_top_coins) | |
| hf_unified_server.local_resource_service.refresh() | |
| response = client.get("/api/market/prices?symbols=BTC,ETH,SOL") | |
| assert response.status_code == 200 | |
| payload = response.json() | |
| assert payload["source"] == "local-fallback" | |
| assert payload["count"] == 3 | |