File size: 1,257 Bytes
1794757 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | from __future__ import annotations
import os
from concurrent.futures import ThreadPoolExecutor
import pytest
from trenches_env.source_catalog import get_all_sources
from trenches_env.source_ingestion import HttpSourceFetcher, SourceHarvester
pytestmark = pytest.mark.skipif(
os.getenv("TRENCHES_RUN_LIVE_SOURCE_TESTS") != "1",
reason="set TRENCHES_RUN_LIVE_SOURCE_TESTS=1 to run live network probes",
)
def test_all_sources_have_a_live_accessible_probe() -> None:
harvester = SourceHarvester(fetcher=HttpSourceFetcher(timeout_seconds=10.0), auto_start=False)
sources = get_all_sources()
def probe(source_id: str) -> tuple[str, str, str | None]:
source = next(source for source in sources if source.id == source_id)
packet = harvester.probe_source(source)
return source.name, packet.status, packet.error
failures: list[str] = []
try:
with ThreadPoolExecutor(max_workers=12) as executor:
for source_name, status, error in executor.map(probe, [source.id for source in sources]):
if status != "ok":
failures.append(f"{source_name}: {error or 'probe failed'}")
finally:
harvester.stop()
assert not failures, "\n".join(failures)
|