from __future__ import annotations
from trenches_env.env import FogOfWarDiplomacyEnv
from trenches_env.models import BlackSwanEvent, LiveControlRequest
from trenches_env.source_bundles import AGENT_LIVE_SOURCE_BUNDLES, AGENT_TRAINING_SOURCE_BUNDLES
from trenches_env.source_ingestion import SourceHarvester
class FakeFetcher:
def fetch(self, url: str) -> tuple[str, str]:
html = f"""
Snapshot for {url}
Snapshot for {url}
"""
return html, "text/html; charset=utf-8"
def test_every_source_has_probe_target() -> None:
harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
assert harvester.all_sources_have_probe_targets() is True
def test_training_source_packets_are_wired_into_observations() -> None:
harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
env = FogOfWarDiplomacyEnv(source_harvester=harvester)
session = env.create_session(seed=7)
session = env.refresh_session_sources(session, force=True)
for agent_id, observation in session.observations.items():
assert len(observation.training_source_packets) == len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id])
assert len(observation.source_packets) == len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id])
assert all(packet.status == "ok" for packet in observation.training_source_packets)
assert any(brief.category == "training_source" for brief in observation.private_brief)
def test_live_source_packets_are_wired_when_live_mode_is_enabled() -> None:
harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
env = FogOfWarDiplomacyEnv(source_harvester=harvester)
session = env.create_session(seed=7)
session = env.configure_live_session(
session,
LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000),
)
session = env.refresh_session_sources(session, force=True)
for agent_id, observation in session.observations.items():
assert len(observation.live_source_packets) == len(AGENT_LIVE_SOURCE_BUNDLES[agent_id])
assert len(observation.source_packets) == (
len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id]) + len(AGENT_LIVE_SOURCE_BUNDLES[agent_id])
)
def test_source_monitor_confirms_each_entity_can_receive_source_news() -> None:
harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
env = FogOfWarDiplomacyEnv(source_harvester=harvester)
session = env.create_session(seed=7)
session = env.configure_live_session(
session,
LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000),
)
session = env.refresh_session_sources(session, force=True)
report = env.source_monitor(session)
assert report.summary.blocked_agents == 0
assert report.summary.degraded_agents == 0
for agent in report.agents:
assert agent.configured_training_sources > 0
assert agent.configured_live_sources > 0
assert not agent.missing_training_sources
assert not agent.missing_live_sources
assert not agent.unbundled_training_sources
assert not agent.unbundled_live_sources
assert not agent.missing_packet_sources
assert not agent.sources_without_probe_targets
assert agent.delivered_training_brief_count > 0
assert agent.delivered_live_brief_count > 0
def test_source_briefs_survive_heavy_event_pressure() -> None:
harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
env = FogOfWarDiplomacyEnv(source_harvester=harvester)
session = env.create_session(seed=7)
session.world.active_events = [
BlackSwanEvent(
id=f"evt-{index}",
summary=f"Shipping disruption report {index}",
source="stress-test",
severity=0.8,
public=True,
affected_agents=["us"],
)
for index in range(6)
]
session = env.refresh_session_sources(session, force=True)
us_private_brief = session.observations["us"].private_brief
assert len(us_private_brief) == 6
assert any(item.category == "training_source" for item in us_private_brief)