| from __future__ import annotations | |
| import os | |
| from concurrent.futures import ThreadPoolExecutor | |
| import pytest | |
| from trenches_env.source_catalog import get_all_sources | |
| from trenches_env.source_ingestion import HttpSourceFetcher, SourceHarvester | |
| pytestmark = pytest.mark.skipif( | |
| os.getenv("TRENCHES_RUN_LIVE_SOURCE_TESTS") != "1", | |
| reason="set TRENCHES_RUN_LIVE_SOURCE_TESTS=1 to run live network probes", | |
| ) | |
| def test_all_sources_have_a_live_accessible_probe() -> None: | |
| harvester = SourceHarvester(fetcher=HttpSourceFetcher(timeout_seconds=10.0), auto_start=False) | |
| sources = get_all_sources() | |
| def probe(source_id: str) -> tuple[str, str, str | None]: | |
| source = next(source for source in sources if source.id == source_id) | |
| packet = harvester.probe_source(source) | |
| return source.name, packet.status, packet.error | |
| failures: list[str] = [] | |
| try: | |
| with ThreadPoolExecutor(max_workers=12) as executor: | |
| for source_name, status, error in executor.map(probe, [source.id for source in sources]): | |
| if status != "ok": | |
| failures.append(f"{source_name}: {error or 'probe failed'}") | |
| finally: | |
| harvester.stop() | |
| assert not failures, "\n".join(failures) | |