from __future__ import annotations
from trenches_env.env import FogOfWarDiplomacyEnv
from trenches_env.models import LiveControlRequest
from trenches_env.source_ingestion import SourceHarvester
class FakeFetcher:
def fetch(self, url: str) -> tuple[str, str]:
html = f"""
Snapshot for {url}
Snapshot for {url}
"""
return html, "text/html; charset=utf-8"
def build_warm_env() -> FogOfWarDiplomacyEnv:
harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
return FogOfWarDiplomacyEnv(source_harvester=harvester).enable_source_warm_start()
def test_source_monitor_reports_training_delivery_per_agent() -> None:
env = build_warm_env()
session = env.create_session(seed=7)
session = env.refresh_session_sources(session, force=True)
report = env.source_monitor(session)
assert report.live_enabled is False
assert report.summary.blocked_agents == 0
for agent in report.agents:
assert agent.configured_training_sources >= 1
assert agent.available_training_packet_count >= 1
assert agent.delivered_training_brief_count >= 1
assert agent.status in {"healthy", "degraded"}
assert all(
issue.message != "Training-source packets are available but none reached the model brief."
for issue in agent.issues
)
def test_source_monitor_reports_live_delivery_per_agent() -> None:
env = build_warm_env()
session = env.create_session(seed=7)
session = env.configure_live_session(
session,
LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000),
)
session = env.refresh_session_sources(session, force=True)
report = env.source_monitor(session)
assert report.live_enabled is True
assert report.summary.blocked_agents == 0
for agent in report.agents:
assert agent.configured_live_sources >= 1
assert agent.available_live_packet_count >= 1
assert agent.delivered_live_brief_count >= 1
assert agent.status in {"healthy", "degraded"}
assert all(
issue.message != "Live-source packets are available but none reached the model brief."
for issue in agent.issues
)