from __future__ import annotations from trenches_env.env import FogOfWarDiplomacyEnv from trenches_env.models import BlackSwanEvent, LiveControlRequest from trenches_env.source_bundles import AGENT_LIVE_SOURCE_BUNDLES, AGENT_TRAINING_SOURCE_BUNDLES from trenches_env.source_ingestion import SourceHarvester class FakeFetcher: def fetch(self, url: str) -> tuple[str, str]: html = f""" Snapshot for {url}

Snapshot for {url}

""" return html, "text/html; charset=utf-8" def test_every_source_has_probe_target() -> None: harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False) assert harvester.all_sources_have_probe_targets() is True def test_training_source_packets_are_wired_into_observations() -> None: harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False) env = FogOfWarDiplomacyEnv(source_harvester=harvester) session = env.create_session(seed=7) session = env.refresh_session_sources(session, force=True) for agent_id, observation in session.observations.items(): assert len(observation.training_source_packets) == len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id]) assert len(observation.source_packets) == len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id]) assert all(packet.status == "ok" for packet in observation.training_source_packets) assert any(brief.category == "training_source" for brief in observation.private_brief) def test_live_source_packets_are_wired_when_live_mode_is_enabled() -> None: harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False) env = FogOfWarDiplomacyEnv(source_harvester=harvester) session = env.create_session(seed=7) session = env.configure_live_session( session, LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000), ) session = env.refresh_session_sources(session, force=True) for agent_id, observation in session.observations.items(): assert len(observation.live_source_packets) == len(AGENT_LIVE_SOURCE_BUNDLES[agent_id]) assert len(observation.source_packets) == ( len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id]) + len(AGENT_LIVE_SOURCE_BUNDLES[agent_id]) ) def test_source_monitor_confirms_each_entity_can_receive_source_news() -> None: harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False) env = FogOfWarDiplomacyEnv(source_harvester=harvester) session = env.create_session(seed=7) session = env.configure_live_session( session, LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000), ) session = env.refresh_session_sources(session, force=True) report = env.source_monitor(session) assert report.summary.blocked_agents == 0 assert report.summary.degraded_agents == 0 for agent in report.agents: assert agent.configured_training_sources > 0 assert agent.configured_live_sources > 0 assert not agent.missing_training_sources assert not agent.missing_live_sources assert not agent.unbundled_training_sources assert not agent.unbundled_live_sources assert not agent.missing_packet_sources assert not agent.sources_without_probe_targets assert agent.delivered_training_brief_count > 0 assert agent.delivered_live_brief_count > 0 def test_source_briefs_survive_heavy_event_pressure() -> None: harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False) env = FogOfWarDiplomacyEnv(source_harvester=harvester) session = env.create_session(seed=7) session.world.active_events = [ BlackSwanEvent( id=f"evt-{index}", summary=f"Shipping disruption report {index}", source="stress-test", severity=0.8, public=True, affected_agents=["us"], ) for index in range(6) ] session = env.refresh_session_sources(session, force=True) us_private_brief = session.observations["us"].private_brief assert len(us_private_brief) == 6 assert any(item.category == "training_source" for item in us_private_brief)