File size: 4,411 Bytes
1794757
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
from __future__ import annotations

from trenches_env.env import FogOfWarDiplomacyEnv
from trenches_env.models import BlackSwanEvent, LiveControlRequest
from trenches_env.source_bundles import AGENT_LIVE_SOURCE_BUNDLES, AGENT_TRAINING_SOURCE_BUNDLES
from trenches_env.source_ingestion import SourceHarvester


class FakeFetcher:
    def fetch(self, url: str) -> tuple[str, str]:
        html = f"""
        <html>
          <head>
            <title>Snapshot for {url}</title>
            <meta name="description" content="Latest collected snapshot from {url}" />
          </head>
          <body>
            <h1>Snapshot for {url}</h1>
          </body>
        </html>
        """
        return html, "text/html; charset=utf-8"


def test_every_source_has_probe_target() -> None:
    harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
    assert harvester.all_sources_have_probe_targets() is True


def test_training_source_packets_are_wired_into_observations() -> None:
    harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
    env = FogOfWarDiplomacyEnv(source_harvester=harvester)
    session = env.create_session(seed=7)
    session = env.refresh_session_sources(session, force=True)

    for agent_id, observation in session.observations.items():
        assert len(observation.training_source_packets) == len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id])
        assert len(observation.source_packets) == len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id])
        assert all(packet.status == "ok" for packet in observation.training_source_packets)
        assert any(brief.category == "training_source" for brief in observation.private_brief)


def test_live_source_packets_are_wired_when_live_mode_is_enabled() -> None:
    harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
    env = FogOfWarDiplomacyEnv(source_harvester=harvester)
    session = env.create_session(seed=7)
    session = env.configure_live_session(
        session,
        LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000),
    )
    session = env.refresh_session_sources(session, force=True)

    for agent_id, observation in session.observations.items():
        assert len(observation.live_source_packets) == len(AGENT_LIVE_SOURCE_BUNDLES[agent_id])
        assert len(observation.source_packets) == (
            len(AGENT_TRAINING_SOURCE_BUNDLES[agent_id]) + len(AGENT_LIVE_SOURCE_BUNDLES[agent_id])
        )


def test_source_monitor_confirms_each_entity_can_receive_source_news() -> None:
    harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
    env = FogOfWarDiplomacyEnv(source_harvester=harvester)
    session = env.create_session(seed=7)
    session = env.configure_live_session(
        session,
        LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000),
    )
    session = env.refresh_session_sources(session, force=True)
    report = env.source_monitor(session)

    assert report.summary.blocked_agents == 0
    assert report.summary.degraded_agents == 0

    for agent in report.agents:
        assert agent.configured_training_sources > 0
        assert agent.configured_live_sources > 0
        assert not agent.missing_training_sources
        assert not agent.missing_live_sources
        assert not agent.unbundled_training_sources
        assert not agent.unbundled_live_sources
        assert not agent.missing_packet_sources
        assert not agent.sources_without_probe_targets
        assert agent.delivered_training_brief_count > 0
        assert agent.delivered_live_brief_count > 0


def test_source_briefs_survive_heavy_event_pressure() -> None:
    harvester = SourceHarvester(fetcher=FakeFetcher(), auto_start=False)
    env = FogOfWarDiplomacyEnv(source_harvester=harvester)
    session = env.create_session(seed=7)
    session.world.active_events = [
        BlackSwanEvent(
            id=f"evt-{index}",
            summary=f"Shipping disruption report {index}",
            source="stress-test",
            severity=0.8,
            public=True,
            affected_agents=["us"],
        )
        for index in range(6)
    ]

    session = env.refresh_session_sources(session, force=True)

    us_private_brief = session.observations["us"].private_brief
    assert len(us_private_brief) == 6
    assert any(item.category == "training_source" for item in us_private_brief)