trenches / backend /tests /test_live_source_delivery.py
Codex
sync main snapshot for HF Space
1794757
from __future__ import annotations
import os
import pytest
from trenches_env.env import FogOfWarDiplomacyEnv
from trenches_env.models import LiveControlRequest
from trenches_env.source_ingestion import HttpSourceFetcher, SourceHarvester
pytestmark = pytest.mark.skipif(
os.getenv("TRENCHES_RUN_LIVE_SOURCE_TESTS") != "1",
reason="set TRENCHES_RUN_LIVE_SOURCE_TESTS=1 to run live network probes",
)
def test_live_source_monitor_delivers_packets_and_briefs_per_agent() -> None:
harvester = SourceHarvester(fetcher=HttpSourceFetcher(timeout_seconds=10.0), auto_start=False)
env = FogOfWarDiplomacyEnv(source_harvester=harvester).enable_source_warm_start()
try:
session = env.create_session(seed=7)
session = env.configure_live_session(
session,
LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000),
)
report = env.source_monitor(session)
failures: list[str] = []
for agent in report.agents:
if agent.available_training_packet_count == 0:
failures.append(f"{agent.agent_id}: no healthy training packet")
if agent.delivered_training_brief_count == 0:
failures.append(f"{agent.agent_id}: no training brief visible to the model")
if agent.configured_live_sources > 0 and agent.available_live_packet_count == 0:
failures.append(f"{agent.agent_id}: no healthy live packet")
if agent.configured_live_sources > 0 and agent.delivered_live_brief_count == 0:
failures.append(f"{agent.agent_id}: no live brief visible to the model")
assert not failures, "\n".join(failures)
finally:
env.shutdown()