| from __future__ import annotations | |
| import os | |
| import pytest | |
| from trenches_env.env import FogOfWarDiplomacyEnv | |
| from trenches_env.models import LiveControlRequest | |
| from trenches_env.source_ingestion import HttpSourceFetcher, SourceHarvester | |
| pytestmark = pytest.mark.skipif( | |
| os.getenv("TRENCHES_RUN_LIVE_SOURCE_TESTS") != "1", | |
| reason="set TRENCHES_RUN_LIVE_SOURCE_TESTS=1 to run live network probes", | |
| ) | |
| def test_live_source_monitor_delivers_packets_and_briefs_per_agent() -> None: | |
| harvester = SourceHarvester(fetcher=HttpSourceFetcher(timeout_seconds=10.0), auto_start=False) | |
| env = FogOfWarDiplomacyEnv(source_harvester=harvester).enable_source_warm_start() | |
| try: | |
| session = env.create_session(seed=7) | |
| session = env.configure_live_session( | |
| session, | |
| LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000), | |
| ) | |
| report = env.source_monitor(session) | |
| failures: list[str] = [] | |
| for agent in report.agents: | |
| if agent.available_training_packet_count == 0: | |
| failures.append(f"{agent.agent_id}: no healthy training packet") | |
| if agent.delivered_training_brief_count == 0: | |
| failures.append(f"{agent.agent_id}: no training brief visible to the model") | |
| if agent.configured_live_sources > 0 and agent.available_live_packet_count == 0: | |
| failures.append(f"{agent.agent_id}: no healthy live packet") | |
| if agent.configured_live_sources > 0 and agent.delivered_live_brief_count == 0: | |
| failures.append(f"{agent.agent_id}: no live brief visible to the model") | |
| assert not failures, "\n".join(failures) | |
| finally: | |
| env.shutdown() | |