Spaces:
Paused
Paused
| from __future__ import annotations | |
| import hashlib | |
| import random | |
| from datetime import datetime, timedelta, timezone | |
| from uuid import uuid4 | |
| from trenches_env.agents import AGENT_IDS, AGENT_PROFILES | |
| from trenches_env.entity_knowledge import load_entity_pack | |
| from trenches_env.historical_replay import ( | |
| get_historical_replay, | |
| severity_distance, | |
| severity_score, | |
| ) | |
| from trenches_env.model_runtime import build_entity_model_bindings | |
| from trenches_env.models import ( | |
| ActionLogEntry, | |
| AgentAction, | |
| AgentBeliefEntry, | |
| AgentBeliefState, | |
| AgentObservation, | |
| AssetCondition, | |
| BlackSwanEvent, | |
| DataSourceContext, | |
| EpisodeMetadata, | |
| EntityModelBinding, | |
| ExternalSignal, | |
| HistoricalEvent, | |
| HistoricalReplayState, | |
| IntelSnippet, | |
| LatentEvent, | |
| LatentEventNarrative, | |
| LiveControlRequest, | |
| ObservationProjection, | |
| OversightIntervention, | |
| Prediction, | |
| PredictionAssessment, | |
| ProviderDiagnosticsResponse, | |
| ReactionActorOutcome, | |
| ReactionLogEntry, | |
| RewardBreakdown, | |
| SessionState, | |
| SourcePacket, | |
| SourceMonitorReport, | |
| StepTrace, | |
| StepSessionRequest, | |
| StepSessionResponse, | |
| WorldState, | |
| ) | |
| from trenches_env.provider_runtime import ProviderDecisionError, ProviderDecisionRequest, ProviderDecisionRuntime | |
| from trenches_env.rl import ( | |
| AGENT_ALLOWED_ACTIONS, | |
| AGENT_ACTION_ALIGNMENT, | |
| AGENT_ACTION_IMPACTS, | |
| AGENT_PREFERRED_COALITIONS, | |
| AGENT_REWARD_METRIC_CONFIGS, | |
| AGENT_STATE_ACTION_EFFECTS, | |
| AGENT_STATE_BASELINES, | |
| DEFAULT_ACTION_IMPACTS, | |
| DEFAULT_MAX_TURNS, | |
| DEFAULT_TRAINING_STAGE, | |
| TRAINING_STAGE_CONFIGS, | |
| ) | |
| from trenches_env.source_catalog import get_source_by_id, get_sources_for_agent | |
| from trenches_env.source_bundles import AGENT_LIVE_SOURCE_BUNDLES, AGENT_TRAINING_SOURCE_BUNDLES | |
| from trenches_env.source_ingestion import SourceHarvester, source_ttl_seconds | |
| from trenches_env.source_monitor import build_source_monitor_report | |
| from trenches_env.scenarios import ( | |
| ScenarioAssetImpact, | |
| ScenarioDefinition, | |
| ScenarioLatentEvent, | |
| ScenarioSignal, | |
| get_scenario_definition, | |
| list_scenario_definitions, | |
| scenario_signals_for_turn, | |
| ) | |
| ACTION_STANCE_SCORES: dict[str, float] = { | |
| "hold": -0.4, | |
| "negotiate": -0.8, | |
| "sanction": 0.3, | |
| "strike": 1.0, | |
| "defend": -0.1, | |
| "intel_query": -0.5, | |
| "mobilize": 0.6, | |
| "deceive": 0.8, | |
| "oversight_review": -0.6, | |
| } | |
| COOPERATIVE_INTENT_MARKERS = ( | |
| "deconflict", | |
| "de-escal", | |
| "ceasefire", | |
| "stabil", | |
| "protect", | |
| "defend", | |
| "monitor", | |
| "assess", | |
| "review", | |
| "guarantee", | |
| "humanitarian", | |
| "contain", | |
| "hold", | |
| "query", | |
| ) | |
| ESCALATORY_INTENT_MARKERS = ( | |
| "retaliat", | |
| "punish", | |
| "strike", | |
| "degrade", | |
| "launch", | |
| "coerce", | |
| "pressure", | |
| "mobilize", | |
| "disrupt", | |
| "sanction", | |
| "deceive", | |
| ) | |
| MAX_PUBLIC_BRIEF_ITEMS = 4 | |
| MAX_PRIVATE_BRIEF_ITEMS = 6 | |
| MAX_INTEL_SUMMARY_CHARS = 220 | |
| MAX_TRAINING_SOURCE_BRIEFS = 2 | |
| MAX_LIVE_SOURCE_BRIEFS = 2 | |
| MAX_AUTO_REACTION_SIGNALS = 8 | |
| MIN_LIVE_AUTO_STEP_MS = 1_000 | |
| FORECAST_REWARD_BLEND = 0.35 | |
| LOW_FIDELITY_SOURCE_KINDS = {"telegram", "scrape", "video"} | |
| SOURCE_KIND_BASE_RELIABILITY = { | |
| "structured": 0.8, | |
| "api": 0.76, | |
| "rss": 0.7, | |
| "scrape": 0.58, | |
| "telegram": 0.46, | |
| "video": 0.38, | |
| } | |
| SOURCE_DELIVERY_RELIABILITY = { | |
| "training_core": 0.06, | |
| "live_demo": 0.0, | |
| } | |
| CONTRADICTION_TOPIC_LABELS = { | |
| "shipping": "shipping disruption", | |
| "commodities": "commodity-market disruption", | |
| "border": "border escalation", | |
| "corridor": "corridor interdiction", | |
| "domestic": "domestic stability", | |
| "cyber": "cyber disruption", | |
| "market": "market dislocation", | |
| "humanitarian": "humanitarian fallout", | |
| "diplomacy": "diplomatic signaling", | |
| } | |
| LATENT_EVENT_TOPIC_KEYWORDS = { | |
| "shipping": ("shipping", "tanker", "hormuz", "oil", "maritime", "terminal", "seaport", "harbor", "ais", "vessel"), | |
| "commodities": ( | |
| "gold", | |
| "silver", | |
| "copper", | |
| "lithium", | |
| "nickel", | |
| "uranium", | |
| "phosphate", | |
| "bauxite", | |
| "rare earth", | |
| "rare-earth", | |
| "commodity", | |
| "mineral", | |
| "metals", | |
| "natural gas", | |
| "lng", | |
| ), | |
| "border": ("rocket", "missile", "border", "galilee", "idf", "drone", "lebanon", "launch", "intercept"), | |
| "corridor": ("corridor", "bekaa", "syria", "transfer", "logistics", "interdiction", "sustainment"), | |
| "domestic": ("sanction", "unrest", "protest", "inflation", "currency", "domestic", "regime"), | |
| "cyber": ("cyber", "outage", "blackout", "cable", "internet", "network", "malware"), | |
| "market": ("market", "investor", "trade", "stocks", "shares", "bond", "premium", "insurance"), | |
| "humanitarian": ("humanitarian", "aid", "displacement", "relief", "civilian", "refugee", "shelter"), | |
| "diplomacy": ("ceasefire", "talk", "negotiat", "summit", "diplomat", "mediat", "channel"), | |
| } | |
| LATENT_EVENT_LINKS = { | |
| "shipping": ("market",), | |
| "commodities": ("market", "shipping"), | |
| "border": ("humanitarian",), | |
| "corridor": ("border",), | |
| "cyber": ("market",), | |
| } | |
| BELIEF_TOPIC_PRIORS = { | |
| "us": {"shipping": 0.12, "commodities": 0.09, "diplomacy": 0.08, "market": 0.06, "border": 0.04}, | |
| "israel": {"border": 0.14, "corridor": 0.08, "diplomacy": 0.04}, | |
| "iran": {"corridor": 0.14, "domestic": 0.1, "shipping": 0.06, "commodities": 0.08}, | |
| "hezbollah": {"border": 0.12, "corridor": 0.1, "domestic": 0.04}, | |
| "gulf": {"shipping": 0.16, "commodities": 0.12, "market": 0.12, "diplomacy": 0.06}, | |
| "oversight": {"cyber": 0.12, "shipping": 0.08, "commodities": 0.08, "border": 0.08, "domestic": 0.08}, | |
| } | |
| BELIEF_PERSISTENCE_FLOOR = 0.12 | |
| BELIEF_MAX_STALE_TURNS = 4 | |
| BELIEF_CONFIRMATION_BONUS = 0.03 | |
| BELIEF_CONTRADICTION_PENALTY = 0.14 | |
| PUBLIC_STATE_SYNC_FACTORS = { | |
| "support": 0.62, | |
| "confidence": 0.6, | |
| "clarity": 0.58, | |
| "resilience": 0.66, | |
| "security": 0.74, | |
| "continuity": 0.74, | |
| "stability": 0.72, | |
| "default": 0.7, | |
| } | |
| ASSET_DECISION_SOURCE_LIMITS: dict[str, tuple[int, int]] = { | |
| "large": (8, 8), | |
| "medium-large": (6, 6), | |
| "medium": (5, 5), | |
| } | |
| PHYSICAL_ASSET_SECTIONS = ( | |
| "locations", | |
| "fronts", | |
| "infrastructure", | |
| "strategic_sites", | |
| "chokepoints", | |
| "geospatial_anchors", | |
| "alliance_anchors", | |
| ) | |
| ASSET_PRIORITY_SCORES = { | |
| "critical": 4, | |
| "high": 3, | |
| "medium": 2, | |
| "tracked": 1, | |
| "linked": 1, | |
| } | |
| ASSET_STATUS_DAMAGE_THRESHOLDS = ( | |
| (25.0, "destroyed"), | |
| (55.0, "malfunctioning"), | |
| (85.0, "degraded"), | |
| ) | |
| AGENT_PRIMARY_ADVERSARIES: dict[str, tuple[str, ...]] = { | |
| "us": ("iran",), | |
| "israel": ("hezbollah", "iran"), | |
| "iran": ("israel", "us", "gulf"), | |
| "hezbollah": ("israel",), | |
| "gulf": ("iran",), | |
| "oversight": (), | |
| } | |
| AGENT_TOOL_LABELS: dict[str, str] = { | |
| "us": "shipping security, coalition access, and force-posture tools", | |
| "israel": "air-defense, reserve, and northern-front tools", | |
| "iran": "proxy, chokepoint, and regime-security tools", | |
| "hezbollah": "launch-survivability, logistics, and deniable-pressure tools", | |
| "gulf": "shipping, infrastructure, and market-stability tools", | |
| "oversight": "trace, intervention, and autonomy-balancing tools", | |
| } | |
| ASSET_CATEGORY_BIAS: dict[str, dict[str, tuple[str, ...]]] = { | |
| "us": { | |
| "strike": ("energy", "port", "chokepoint", "command", "launch-zone"), | |
| "defend": ("airbase", "base", "naval", "logistics-port", "base-network", "command-system"), | |
| "deceive": ("command", "command-system", "radar", "air-defense", "theater-anchor"), | |
| "sanction": ("energy", "port", "logistics-network", "energy-network"), | |
| }, | |
| "israel": { | |
| "strike": ("launch-network", "launch-zone", "corridor-node", "logistics-network", "logistics"), | |
| "defend": ("command", "front", "civil-core", "infrastructure-zone", "offshore-zone", "air-defense"), | |
| "deceive": ("launch-zone", "command-network", "corridor-node"), | |
| }, | |
| "iran": { | |
| "strike": ("airbase", "base", "energy", "energy-port", "port", "chokepoint"), | |
| "defend": ("command", "energy-network", "maritime-control-zone", "command-and-industry-network"), | |
| "deceive": ("chokepoint", "port", "naval", "maritime-box", "maritime-access"), | |
| }, | |
| "hezbollah": { | |
| "strike": ("front", "civil-core", "command", "offshore-zone", "infrastructure-zone"), | |
| "defend": ("launch-network", "front", "corridor-node", "command-network", "logistics-network"), | |
| "deceive": ("launch-zone", "command-network", "logistics-network"), | |
| }, | |
| "gulf": { | |
| "strike": ("energy-network", "energy", "energy-port", "port", "chokepoint", "base"), | |
| "defend": ("port", "energy", "energy-port", "capital", "infrastructure-protection", "chokepoint"), | |
| "deceive": ("energy-port", "port", "chokepoint"), | |
| "sanction": ("energy", "energy-port", "port", "logistics-network"), | |
| }, | |
| "oversight": { | |
| "defend": ("chokepoint", "theater", "civil-center"), | |
| }, | |
| } | |
| class FogOfWarDiplomacyEnv: | |
| """OpenEnv-compatible scaffolding for the crisis simulator. | |
| The concrete OpenEnv inheritance point can be added once the package | |
| dependency is pinned. For now this class owns the transition logic, state | |
| construction, and observation projection needed by the session API. | |
| """ | |
| def __init__( | |
| self, | |
| source_harvester: SourceHarvester | None = None, | |
| provider_runtime: ProviderDecisionRuntime | None = None, | |
| ) -> None: | |
| self._rng = random.Random() | |
| self._source_harvester = source_harvester or SourceHarvester(auto_start=False) | |
| self._provider_runtime = provider_runtime or ProviderDecisionRuntime() | |
| self._source_warm_start_enabled = False | |
| def enable_source_warm_start(self) -> "FogOfWarDiplomacyEnv": | |
| self._source_warm_start_enabled = True | |
| return self | |
| def create_session( | |
| self, | |
| seed: int | None = None, | |
| session_id: str | None = None, | |
| training_agent: str = "us", | |
| training_stage: str = DEFAULT_TRAINING_STAGE, | |
| max_turns: int | None = None, | |
| scenario_id: str | None = None, | |
| replay_id: str | None = None, | |
| replay_start_index: int | None = None, | |
| ) -> SessionState: | |
| resolved_session_id = session_id or str(uuid4()) | |
| self._seed(seed) | |
| scenario = get_scenario_definition(scenario_id) | |
| world = self._initial_world() | |
| self._apply_scenario(world, scenario) | |
| historical_replay = self._initialize_historical_replay( | |
| world=world, | |
| training_agent=training_agent, | |
| replay_id=replay_id, | |
| replay_start_index=replay_start_index, | |
| ) | |
| episode = self._build_episode_metadata( | |
| training_stage=training_stage, | |
| max_turns=max_turns, | |
| scenario=scenario, | |
| historical_replay=historical_replay, | |
| ) | |
| if self._source_warm_start_enabled: | |
| self._source_harvester.warm_start_agents( | |
| include_live=False, | |
| max_training_sources=2, | |
| max_live_sources=0, | |
| ) | |
| belief_state = self._initialize_belief_state(world, episode) | |
| observations = self._build_observations( | |
| world, | |
| episode, | |
| belief_state=belief_state, | |
| historical_replay=historical_replay, | |
| ) | |
| rewards = {agent_id: RewardBreakdown() for agent_id in AGENT_IDS} | |
| session = SessionState( | |
| session_id=resolved_session_id, | |
| seed=seed, | |
| world=world, | |
| observations=observations, | |
| belief_state=belief_state, | |
| rewards=rewards, | |
| historical_replay=historical_replay, | |
| model_bindings=self._build_model_bindings(), | |
| episode=episode, | |
| ) | |
| last_sync_at = self._source_harvester.last_sync_at() | |
| if last_sync_at is not None: | |
| session.live.last_source_sync_at = last_sync_at | |
| return session | |
| def reset_session( | |
| self, | |
| session_id: str, | |
| seed: int | None = None, | |
| training_agent: str = "us", | |
| training_stage: str = DEFAULT_TRAINING_STAGE, | |
| max_turns: int | None = None, | |
| scenario_id: str | None = None, | |
| replay_id: str | None = None, | |
| replay_start_index: int | None = None, | |
| ) -> SessionState: | |
| return self.create_session( | |
| seed=seed, | |
| session_id=session_id, | |
| training_agent=training_agent, | |
| training_stage=training_stage, | |
| max_turns=max_turns, | |
| scenario_id=scenario_id, | |
| replay_id=replay_id, | |
| replay_start_index=replay_start_index, | |
| ) | |
| def configure_live_session(self, session: SessionState, request: LiveControlRequest) -> SessionState: | |
| updated = session.model_copy(deep=True) | |
| if request.enabled and not updated.episode.live_mode_capable: | |
| raise ValueError( | |
| f"Live mode is only supported for {DEFAULT_TRAINING_STAGE} sessions." | |
| ) | |
| updated.live.enabled = request.enabled | |
| updated.live.auto_step = request.auto_step | |
| updated.live.poll_interval_ms = max(request.poll_interval_ms, MIN_LIVE_AUTO_STEP_MS) | |
| updated.live.started_at = datetime.now(timezone.utc) if request.enabled else None | |
| updated.live.last_source_sync_at = datetime.now(timezone.utc) if request.enabled else None | |
| updated.live.last_auto_step_at = None | |
| updated.live.reacted_packet_fetched_at = {} | |
| updated.live.source_queue_sizes = ( | |
| { | |
| agent_id: len(AGENT_LIVE_SOURCE_BUNDLES.get(agent_id, [])) | |
| for agent_id in AGENT_IDS | |
| } | |
| if request.enabled | |
| else {} | |
| ) | |
| if not request.enabled: | |
| updated.live.source_queue_sizes = {} | |
| updated.updated_at = datetime.now(timezone.utc) | |
| if request.enabled: | |
| if self._source_warm_start_enabled: | |
| self._source_harvester.warm_start_agents( | |
| include_live=True, | |
| max_training_sources=1, | |
| max_live_sources=1, | |
| ) | |
| else: | |
| self._source_harvester.refresh_due_batch(include_live=True) | |
| return self.refresh_session_sources(updated) | |
| def step_session(self, session: SessionState, request: StepSessionRequest) -> StepSessionResponse: | |
| next_session = session.model_copy(deep=True) | |
| before_tension = next_session.world.tension_level | |
| prior_latent_event_ids = {event.event_id for event in next_session.world.latent_events} | |
| next_session.world.turn += 1 | |
| if next_session.live.enabled: | |
| self._source_harvester.refresh_due_batch(include_live=True) | |
| else: | |
| self._source_harvester.refresh_due_batch(include_live=False) | |
| self._inject_external_signals(next_session.world, request.external_signals) | |
| resolved_actions = dict(request.actions) | |
| oversight = OversightIntervention() | |
| if next_session.episode.oversight_enabled: | |
| oversight = self._compute_oversight(next_session.world, resolved_actions, request.external_signals) | |
| resolved_actions = self._resolve_oversight_actions(resolved_actions, oversight) | |
| next_session.world.last_actions = list(resolved_actions.values()) | |
| self._apply_actions(next_session.world, resolved_actions) | |
| if next_session.episode.oversight_enabled: | |
| self._apply_oversight(next_session.world, oversight) | |
| self._resync_public_events(next_session.world) | |
| latent_event_ids = [ | |
| event.event_id for event in next_session.world.latent_events if event.event_id not in prior_latent_event_ids | |
| ] | |
| revealed_event, prediction_assessments = self._advance_historical_replay( | |
| next_session, | |
| request.predictions, | |
| ) | |
| next_session.rewards = self._compute_rewards(world=next_session.world, episode=next_session.episode) | |
| if prediction_assessments: | |
| self._apply_forecast_rewards(next_session.rewards, prediction_assessments) | |
| next_session.model_bindings = self._build_model_bindings() | |
| next_session.belief_state = self._update_belief_state(next_session) | |
| next_session.observations = self._build_observations( | |
| next_session.world, | |
| next_session.episode, | |
| include_live_sources=next_session.live.enabled, | |
| belief_state=next_session.belief_state, | |
| historical_replay=next_session.historical_replay, | |
| ) | |
| next_session.recent_traces.append( | |
| StepTrace( | |
| turn=next_session.world.turn, | |
| tension_before=before_tension, | |
| tension_after=next_session.world.tension_level, | |
| actions=resolved_actions, | |
| predictions=request.predictions, | |
| prediction_assessments=prediction_assessments, | |
| revealed_event=revealed_event, | |
| rewards=next_session.rewards, | |
| oversight=oversight, | |
| ) | |
| ) | |
| next_session.recent_traces = next_session.recent_traces[-25:] | |
| next_session.action_log.extend(self._build_action_log_entries(next_session, resolved_actions)) | |
| next_session.action_log = next_session.action_log[-250:] | |
| if request.external_signals: | |
| next_session.reaction_log.append( | |
| self._build_reaction_log_entry( | |
| session=next_session, | |
| signals=request.external_signals, | |
| latent_event_ids=latent_event_ids, | |
| actions=resolved_actions, | |
| oversight=oversight, | |
| tension_before=before_tension, | |
| ) | |
| ) | |
| next_session.reaction_log = next_session.reaction_log[-100:] | |
| next_session.updated_at = datetime.now(timezone.utc) | |
| return StepSessionResponse( | |
| session=next_session, | |
| oversight=oversight, | |
| done=( | |
| next_session.world.turn >= next_session.episode.max_turns | |
| or next_session.world.tension_level >= 95.0 | |
| or ( | |
| next_session.historical_replay.enabled | |
| and next_session.historical_replay.current_event_index | |
| >= len(next_session.historical_replay.ground_truth_timeline) - 1 | |
| ) | |
| ), | |
| ) | |
| def refresh_session_sources(self, session: SessionState, force: bool = False) -> SessionState: | |
| updated = session.model_copy(deep=True) | |
| if force: | |
| self._source_harvester.refresh_agents(include_live=updated.live.enabled, force=True) | |
| elif self._source_warm_start_enabled: | |
| self._source_harvester.warm_start_agents(include_live=updated.live.enabled, force=False) | |
| updated.model_bindings = self._build_model_bindings() | |
| updated.belief_state = self._update_belief_state(updated) | |
| updated.observations = self._build_observations( | |
| updated.world, | |
| updated.episode, | |
| include_live_sources=updated.live.enabled, | |
| belief_state=updated.belief_state, | |
| historical_replay=updated.historical_replay, | |
| ) | |
| last_sync_at = self._source_harvester.last_sync_at() | |
| if last_sync_at is not None: | |
| updated.live.last_source_sync_at = last_sync_at | |
| updated.updated_at = datetime.now(timezone.utc) | |
| return updated | |
| def source_monitor(self, session: SessionState) -> SourceMonitorReport: | |
| return build_source_monitor_report(session, harvester=self._source_harvester) | |
| def provider_diagnostics(self, session: SessionState) -> ProviderDiagnosticsResponse: | |
| bindings = session.model_bindings or self._build_model_bindings() | |
| return ProviderDiagnosticsResponse( | |
| agents=self._provider_runtime.diagnostics(bindings) | |
| ) | |
| def list_scenarios(self) -> list[ScenarioDefinition]: | |
| return list_scenario_definitions() | |
| def scenario_turn_signals(self, scenario_id: str | None, turn: int) -> list[ExternalSignal]: | |
| return scenario_signals_for_turn(scenario_id, turn) | |
| def maybe_auto_step_live_session(self, session: SessionState) -> SessionState: | |
| refreshed = self.refresh_session_sources(session) | |
| if not refreshed.live.enabled or not refreshed.live.auto_step: | |
| return refreshed | |
| now = datetime.now(timezone.utc) | |
| if not self._live_auto_step_due(refreshed, now): | |
| return refreshed | |
| signals, reacted_packets = self._collect_live_external_signals(refreshed) | |
| if not signals: | |
| return refreshed | |
| actions = self.resolve_policy_actions(refreshed, signals) | |
| result = self.step_session( | |
| refreshed, | |
| StepSessionRequest(actions=actions, external_signals=signals), | |
| ) | |
| next_session = result.session | |
| next_session.live.last_auto_step_at = now | |
| next_session.live.reacted_packet_fetched_at.update(reacted_packets) | |
| next_session.updated_at = datetime.now(timezone.utc) | |
| return next_session | |
| def _live_auto_step_due(self, session: SessionState, now: datetime) -> bool: | |
| last_auto_step_at = session.live.last_auto_step_at | |
| if last_auto_step_at is None: | |
| return True | |
| return now - last_auto_step_at >= timedelta( | |
| milliseconds=max(session.live.poll_interval_ms, MIN_LIVE_AUTO_STEP_MS) | |
| ) | |
| def _collect_live_external_signals( | |
| self, | |
| session: SessionState, | |
| ) -> tuple[list[ExternalSignal], dict[str, datetime]]: | |
| newest_packets: dict[str, tuple[str, SourcePacket]] = {} | |
| for agent_id, observation in session.observations.items(): | |
| for packet in observation.source_packets: | |
| if packet.status != "ok" or not packet.summary or packet.fetched_at is None: | |
| continue | |
| if not self._is_source_packet_fresh(packet): | |
| continue | |
| last_reacted = session.live.reacted_packet_fetched_at.get(packet.source_id) | |
| if last_reacted is not None and packet.fetched_at <= last_reacted: | |
| continue | |
| cached = newest_packets.get(packet.source_id) | |
| if cached is None or packet.fetched_at > (cached[1].fetched_at or datetime.fromtimestamp(0, tz=timezone.utc)): | |
| newest_packets[packet.source_id] = (agent_id, packet) | |
| ordered_packets = sorted( | |
| newest_packets.values(), | |
| key=lambda item: item[1].fetched_at or datetime.fromtimestamp(0, tz=timezone.utc), | |
| reverse=True, | |
| )[:MAX_AUTO_REACTION_SIGNALS] | |
| signals: list[ExternalSignal] = [] | |
| reacted_packets: dict[str, datetime] = {} | |
| for agent_id, packet in ordered_packets: | |
| signal = self._packet_to_external_signal(agent_id, packet) | |
| signals.append(signal) | |
| if packet.fetched_at is not None: | |
| reacted_packets[packet.source_id] = packet.fetched_at | |
| return signals, reacted_packets | |
| def _packet_to_external_signal(self, agent_id: str, packet: SourcePacket) -> ExternalSignal: | |
| packet_text = " ".join([packet.source_name, packet.summary, *packet.sample_items]).lower() | |
| categories = self._classify_signal_categories(packet_text) | |
| severity = 0.35 | |
| if "attack" in categories: | |
| severity = max(severity, 0.82) | |
| if "shipping" in categories: | |
| severity = max(severity, 0.72) | |
| if "commodities" in categories: | |
| severity = max(severity, 0.62) | |
| if "cyber" in categories or "unrest" in categories: | |
| severity = max(severity, 0.64) | |
| if "market" in categories: | |
| severity = max(severity, 0.56) | |
| if "humanitarian" in categories or "diplomacy" in categories: | |
| severity = max(severity, 0.44) | |
| if packet.delivery == "live_demo": | |
| severity += 0.05 | |
| lead = packet.sample_items[0] if packet.sample_items else packet.summary | |
| return ExternalSignal( | |
| source=packet.source_name, | |
| headline=self._clip_summary(f"{packet.source_name}: {lead}", limit=160), | |
| region=None if agent_id == "oversight" else agent_id, | |
| tags=sorted(categories | {agent_id, packet.kind, packet.delivery}), | |
| severity=round(min(severity, 0.95), 2), | |
| ) | |
| def _classify_signal_categories(self, text: str) -> set[str]: | |
| categories: set[str] = set() | |
| if self._signal_mentions( | |
| text, | |
| "strike", | |
| "rocket", | |
| "missile", | |
| "drone", | |
| "attack", | |
| "raid", | |
| "blast", | |
| "explosion", | |
| "intercept", | |
| "retaliat", | |
| "launch", | |
| ): | |
| categories.add("attack") | |
| if self._signal_mentions( | |
| text, | |
| "shipping", | |
| "tanker", | |
| "vessel", | |
| "hormuz", | |
| "oil", | |
| "terminal", | |
| "seaport", | |
| "harbor", | |
| "maritime", | |
| "strait", | |
| "pipeline", | |
| "red sea", | |
| ): | |
| categories.add("shipping") | |
| if self._signal_mentions( | |
| text, | |
| "gold", | |
| "silver", | |
| "copper", | |
| "lithium", | |
| "nickel", | |
| "uranium", | |
| "phosphate", | |
| "bauxite", | |
| "rare earth", | |
| "rare-earth", | |
| "commodity", | |
| "mineral", | |
| "metals", | |
| "natural gas", | |
| "lng", | |
| ): | |
| categories.add("commodities") | |
| if self._signal_mentions(text, "ceasefire", "talk", "negotiat", "summit", "diplomat", "mediat"): | |
| categories.add("diplomacy") | |
| if self._signal_mentions(text, "humanitarian", "aid", "displacement", "relief", "civilian", "refugee"): | |
| categories.add("humanitarian") | |
| if self._signal_mentions(text, "cyber", "internet", "blackout", "outage", "malware", "network"): | |
| categories.add("cyber") | |
| if self._signal_mentions(text, "protest", "unrest", "sanction", "inflation", "currency", "black market"): | |
| categories.add("unrest") | |
| if self._signal_mentions(text, "market", "investor", "trade", "stocks", "shares", "bond", "price", "futures"): | |
| categories.add("market") | |
| return categories or {"general"} | |
| def resolve_policy_actions( | |
| self, | |
| session: SessionState, | |
| signals: list[ExternalSignal], | |
| *, | |
| preset_actions: dict[str, AgentAction] | None = None, | |
| agent_ids: list[str] | None = None, | |
| ) -> dict[str, AgentAction]: | |
| actions: dict[str, AgentAction] = dict(preset_actions or {}) | |
| target_agent_ids = agent_ids or list(AGENT_IDS) | |
| for agent_id, action in actions.items(): | |
| self._validate_action(agent_id, action) | |
| for agent_id in target_agent_ids: | |
| if agent_id in actions: | |
| continue | |
| provider_action, provider_error = self._resolve_provider_action(session, agent_id, signals) | |
| if provider_action is not None: | |
| actions[agent_id] = provider_action | |
| continue | |
| signal_context, top_headline = self._build_signal_context(agent_id, signals) | |
| allowed_actions = AGENT_ALLOWED_ACTIONS.get(agent_id, ()) | |
| action_type = max( | |
| allowed_actions, | |
| key=lambda candidate: self._score_live_action( | |
| agent_id=agent_id, | |
| action_type=candidate, | |
| session=session, | |
| signal_context=signal_context, | |
| ), | |
| ) | |
| target = self._select_live_action_target(agent_id, action_type, session, signal_context) | |
| driver = self._event_driver_label(signal_context) | |
| metadata: dict[str, object] = { | |
| "mode": "heuristic_fallback", | |
| "driver": driver, | |
| "signal_count": int(signal_context["relevant_count"]), | |
| } | |
| if provider_error is not None: | |
| metadata["provider_error"] = provider_error | |
| if top_headline: | |
| metadata["trigger_headline"] = top_headline | |
| actions[agent_id] = AgentAction( | |
| actor=agent_id, | |
| type=action_type, | |
| target=target, | |
| summary=self._build_auto_action_summary(agent_id, action_type, target, driver), | |
| metadata=metadata, | |
| ) | |
| return actions | |
| def _resolve_provider_action( | |
| self, | |
| session: SessionState, | |
| agent_id: str, | |
| signals: list[ExternalSignal], | |
| ) -> tuple[AgentAction | None, str | None]: | |
| binding = session.model_bindings.get(agent_id) | |
| if binding is None or not binding.ready_for_inference: | |
| return None, None | |
| try: | |
| action = self._provider_runtime.decide_action( | |
| ProviderDecisionRequest( | |
| agent_id=agent_id, | |
| binding=binding, | |
| observation=session.observations[agent_id], | |
| external_signals=signals, | |
| ) | |
| ) | |
| self._validate_action(agent_id, action) | |
| return action, None | |
| except ProviderDecisionError as exc: | |
| return None, str(exc) | |
| def _select_live_actions( | |
| self, | |
| session: SessionState, | |
| signals: list[ExternalSignal], | |
| ) -> dict[str, AgentAction]: | |
| return self.resolve_policy_actions(session, signals) | |
| def _build_signal_context( | |
| self, | |
| agent_id: str, | |
| signals: list[ExternalSignal], | |
| ) -> tuple[dict[str, float], str]: | |
| signal_context = { | |
| "attack": 0.0, | |
| "shipping": 0.0, | |
| "commodities": 0.0, | |
| "diplomacy": 0.0, | |
| "humanitarian": 0.0, | |
| "cyber": 0.0, | |
| "unrest": 0.0, | |
| "market": 0.0, | |
| "general": 0.0, | |
| "pressure": 0.0, | |
| "relevant_count": 0.0, | |
| } | |
| top_headline = "" | |
| top_severity = 0.0 | |
| for signal in signals: | |
| if agent_id != "oversight": | |
| affected_agents = set(self._infer_affected_agents(signal)) | |
| if agent_id not in affected_agents and signal.region != agent_id: | |
| continue | |
| text = f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}".lower() | |
| categories = self._classify_signal_categories(text) | |
| weight = max(0.2, signal.severity) | |
| signal_context["relevant_count"] += 1.0 | |
| for category in categories: | |
| signal_context[category] = signal_context.get(category, 0.0) + weight | |
| if signal.severity >= top_severity: | |
| top_severity = signal.severity | |
| top_headline = signal.headline | |
| signal_context["pressure"] = sum( | |
| signal_context[category] | |
| for category in ("attack", "shipping", "commodities", "diplomacy", "humanitarian", "cyber", "unrest", "market") | |
| ) + 0.3 * signal_context["general"] | |
| return signal_context, top_headline | |
| def _score_live_action( | |
| self, | |
| *, | |
| agent_id: str, | |
| action_type: str, | |
| session: SessionState, | |
| signal_context: dict[str, float], | |
| ) -> float: | |
| observation = session.observations[agent_id] | |
| metric_gain = 0.0 | |
| action_effects = AGENT_STATE_ACTION_EFFECTS[agent_id].get(action_type, {}) | |
| for metric, config in AGENT_REWARD_METRIC_CONFIGS[agent_id].items(): | |
| current_value = observation.strategic_state.get( | |
| metric, | |
| AGENT_STATE_BASELINES[agent_id].get(metric, 50.0), | |
| ) | |
| projected_value = self._clamp_percent(current_value + action_effects.get(metric, 0.0)) | |
| before_score = self._target_score(current_value, config.target, config.tolerance) | |
| after_score = self._target_score(projected_value, config.target, config.tolerance) | |
| metric_gain += (after_score - before_score) * config.weight | |
| doctrinal_fit = AGENT_ACTION_ALIGNMENT[agent_id].get(action_type, 0.0) | |
| signal_bias = self._live_signal_action_bias(agent_id, action_type, signal_context) | |
| belief_bias = self._belief_action_bias( | |
| agent_id, | |
| action_type, | |
| session.belief_state.get(agent_id, AgentBeliefState(agent_id=agent_id)), | |
| ) | |
| asset_pressure = self._asset_pressure(session.world, agent_id) | |
| if action_type in {"defend", "intel_query"}: | |
| signal_bias += 0.22 * asset_pressure | |
| elif action_type == "negotiate": | |
| signal_bias += 0.10 * asset_pressure | |
| elif action_type == "hold": | |
| signal_bias -= 0.14 * asset_pressure | |
| continuity_bonus = 0.0 | |
| if any(action.actor == agent_id and action.type == action_type for action in session.world.last_actions): | |
| continuity_bonus = 0.06 | |
| escalation_penalty = 0.0 | |
| if session.world.tension_level >= 78.0 and action_type in {"strike", "mobilize", "deceive", "sanction"}: | |
| escalation_penalty += 0.28 | |
| if signal_context["diplomacy"] >= 0.6 and action_type in {"strike", "mobilize", "deceive"}: | |
| escalation_penalty += 0.18 | |
| if signal_context["attack"] >= 0.65 and action_type == "hold": | |
| escalation_penalty += 0.18 | |
| if signal_context["shipping"] >= 0.55 and agent_id in {"us", "gulf"} and action_type in {"strike", "deceive"}: | |
| escalation_penalty += 0.2 | |
| if asset_pressure >= 0.35 and action_type in {"strike", "mobilize", "deceive"}: | |
| escalation_penalty += 0.16 | |
| if agent_id == "oversight" and action_type == "oversight_review": | |
| escalation_penalty -= min(0.25, signal_context["pressure"] * 0.2) | |
| return metric_gain * 1.8 + doctrinal_fit * 0.28 + signal_bias + belief_bias + continuity_bonus - escalation_penalty | |
| def _belief_action_bias( | |
| agent_id: str, | |
| action_type: str, | |
| belief_state: AgentBeliefState, | |
| ) -> float: | |
| topic_weights = {belief.topic: belief.confidence for belief in belief_state.beliefs[:4]} | |
| shipping = topic_weights.get("shipping", 0.0) | |
| commodities = topic_weights.get("commodities", 0.0) | |
| border = topic_weights.get("border", 0.0) | |
| corridor = topic_weights.get("corridor", 0.0) | |
| diplomacy = topic_weights.get("diplomacy", 0.0) | |
| cyber = topic_weights.get("cyber", 0.0) | |
| domestic = topic_weights.get("domestic", 0.0) | |
| if agent_id in {"us", "gulf"}: | |
| return { | |
| "defend": 0.22 * shipping, | |
| "negotiate": 0.16 * diplomacy + 0.08 * shipping + 0.10 * commodities, | |
| "intel_query": 0.14 * cyber + 0.10 * commodities, | |
| "strike": 0.06 * border - 0.12 * diplomacy, | |
| }.get(action_type, 0.0) | |
| if agent_id == "israel": | |
| return { | |
| "defend": 0.2 * border, | |
| "strike": 0.12 * border, | |
| "intel_query": 0.1 * corridor + 0.08 * cyber, | |
| "negotiate": 0.1 * diplomacy - 0.08 * border, | |
| }.get(action_type, 0.0) | |
| if agent_id in {"iran", "hezbollah"}: | |
| return { | |
| "mobilize": 0.16 * corridor + 0.08 * border, | |
| "deceive": 0.12 * cyber + 0.08 * corridor + 0.06 * commodities, | |
| "defend": 0.1 * border + 0.08 * domestic, | |
| "negotiate": 0.1 * diplomacy - 0.08 * corridor - 0.04 * commodities, | |
| }.get(action_type, 0.0) | |
| return { | |
| "oversight_review": 0.18 * (shipping + commodities + border + corridor + cyber + domestic), | |
| "negotiate": 0.12 * diplomacy, | |
| "intel_query": 0.1 * cyber + 0.08 * commodities, | |
| }.get(action_type, 0.0) | |
| def _live_signal_action_bias( | |
| self, | |
| agent_id: str, | |
| action_type: str, | |
| signal_context: dict[str, float], | |
| ) -> float: | |
| attack = signal_context["attack"] | |
| shipping = signal_context["shipping"] | |
| commodities = signal_context["commodities"] | |
| diplomacy = signal_context["diplomacy"] | |
| humanitarian = signal_context["humanitarian"] | |
| cyber = signal_context["cyber"] | |
| unrest = signal_context["unrest"] | |
| market = signal_context["market"] | |
| pressure = signal_context["pressure"] | |
| if agent_id == "us": | |
| return { | |
| "defend": 0.34 * shipping + 0.22 * attack + 0.18 * cyber + 0.14 * market, | |
| "negotiate": 0.30 * diplomacy + 0.18 * humanitarian + 0.16 * market + 0.08 * attack + 0.10 * commodities, | |
| "intel_query": 0.24 * cyber + 0.18 * attack + 0.12 * unrest + 0.12 * commodities, | |
| "mobilize": 0.16 * attack + 0.12 * shipping + 0.06 * commodities, | |
| "sanction": 0.18 * unrest + 0.10 * cyber, | |
| "strike": 0.08 * attack - 0.18 * diplomacy - 0.12 * humanitarian, | |
| "deceive": 0.04 * attack - 0.10 * diplomacy, | |
| "hold": 0.10 * diplomacy - 0.10 * attack, | |
| }.get(action_type, 0.0) | |
| if agent_id == "israel": | |
| return { | |
| "defend": 0.38 * attack + 0.14 * cyber, | |
| "strike": 0.22 * attack - 0.12 * humanitarian, | |
| "mobilize": 0.18 * attack, | |
| "intel_query": 0.20 * cyber + 0.16 * attack, | |
| "negotiate": 0.16 * diplomacy + 0.08 * humanitarian, | |
| "hold": 0.08 * diplomacy - 0.12 * attack, | |
| "deceive": 0.12 * attack, | |
| "sanction": 0.06 * unrest, | |
| }.get(action_type, 0.0) | |
| if agent_id == "iran": | |
| return { | |
| "mobilize": 0.26 * shipping + 0.18 * attack + 0.12 * commodities, | |
| "deceive": 0.22 * attack + 0.18 * shipping + 0.14 * cyber + 0.08 * commodities, | |
| "defend": 0.26 * unrest + 0.12 * attack, | |
| "intel_query": 0.18 * cyber + 0.16 * unrest, | |
| "negotiate": 0.14 * diplomacy - 0.14 * attack - 0.06 * commodities, | |
| "strike": 0.12 * attack + 0.10 * shipping + 0.06 * commodities - 0.18 * diplomacy, | |
| "hold": 0.08 * diplomacy - 0.06 * shipping - 0.04 * commodities, | |
| "sanction": 0.08 * unrest, | |
| }.get(action_type, 0.0) | |
| if agent_id == "hezbollah": | |
| return { | |
| "defend": 0.28 * attack, | |
| "deceive": 0.24 * attack + 0.14 * cyber, | |
| "mobilize": 0.18 * attack, | |
| "strike": 0.14 * attack - 0.14 * humanitarian, | |
| "negotiate": 0.22 * humanitarian + 0.14 * diplomacy - 0.18 * attack, | |
| "hold": 0.18 * humanitarian + 0.10 * diplomacy - 0.12 * attack, | |
| "intel_query": 0.12 * cyber + 0.10 * attack, | |
| "sanction": 0.04 * unrest, | |
| }.get(action_type, 0.0) | |
| if agent_id == "gulf": | |
| return { | |
| "defend": 0.38 * shipping + 0.18 * attack + 0.16 * market, | |
| "negotiate": 0.28 * diplomacy + 0.24 * market + 0.14 * humanitarian + 0.14 * commodities, | |
| "intel_query": 0.22 * shipping + 0.14 * cyber + 0.18 * commodities, | |
| "mobilize": 0.16 * attack + 0.10 * shipping + 0.04 * commodities - 0.12 * market, | |
| "hold": 0.12 * diplomacy - 0.12 * shipping - 0.06 * commodities, | |
| "strike": 0.04 * attack + 0.04 * commodities - 0.24 * market, | |
| "sanction": 0.06 * unrest + 0.04 * commodities - 0.10 * market, | |
| "deceive": 0.04 * commodities - 0.12 * market, | |
| }.get(action_type, 0.0) | |
| return { | |
| "oversight_review": 0.34 * pressure + 0.20 * attack + 0.16 * shipping + 0.14 * commodities, | |
| "intel_query": 0.24 * cyber + 0.18 * attack + 0.10 * unrest + 0.14 * commodities, | |
| "negotiate": 0.20 * diplomacy + 0.12 * humanitarian, | |
| "defend": 0.16 * attack + 0.12 * shipping, | |
| "hold": 0.08 * diplomacy - 0.08 * pressure, | |
| }.get(action_type, 0.0) | |
| def _select_live_action_target( | |
| self, | |
| agent_id: str, | |
| action_type: str, | |
| session: SessionState, | |
| signal_context: dict[str, float], | |
| ) -> str | None: | |
| if action_type == "defend": | |
| return agent_id if agent_id != "oversight" else None | |
| if action_type == "negotiate": | |
| if agent_id == "us": | |
| return "gulf" if signal_context["shipping"] >= signal_context["attack"] else "israel" | |
| if agent_id == "israel": | |
| return "us" | |
| if agent_id == "iran": | |
| return "hezbollah" | |
| if agent_id == "hezbollah": | |
| return "iran" | |
| if agent_id == "gulf": | |
| return "us" | |
| if agent_id == "oversight": | |
| return max( | |
| ("us", "israel", "iran", "hezbollah", "gulf"), | |
| key=lambda candidate: session.world.risk_scores.get(candidate, 0.0), | |
| ) | |
| if action_type in {"strike", "sanction"}: | |
| adversaries = AGENT_PRIMARY_ADVERSARIES.get(agent_id, ()) | |
| if not adversaries: | |
| return None | |
| if agent_id == "iran" and signal_context["shipping"] > signal_context["attack"]: | |
| return "gulf" | |
| return adversaries[0] | |
| return None | |
| def _event_driver_label(self, signal_context: dict[str, float]) -> str: | |
| ranked_categories = sorted( | |
| ( | |
| (category, value) | |
| for category, value in signal_context.items() | |
| if category in {"attack", "shipping", "commodities", "diplomacy", "humanitarian", "cyber", "unrest", "market"} | |
| ), | |
| key=lambda item: item[1], | |
| reverse=True, | |
| ) | |
| if not ranked_categories or ranked_categories[0][1] <= 0.0: | |
| return "regional source refresh" | |
| top_category = ranked_categories[0][0] | |
| return { | |
| "attack": "cross-border attack reporting", | |
| "shipping": "shipping-lane disruption reporting", | |
| "commodities": "commodity-market disruption reporting", | |
| "diplomacy": "diplomatic movement", | |
| "humanitarian": "humanitarian pressure", | |
| "cyber": "cyber and trace uncertainty", | |
| "unrest": "domestic instability", | |
| "market": "market stress", | |
| }.get(top_category, "regional source refresh") | |
| def _build_auto_action_summary( | |
| self, | |
| agent_id: str, | |
| action_type: str, | |
| target: str | None, | |
| driver: str, | |
| ) -> str: | |
| tool_label = AGENT_TOOL_LABELS[agent_id] | |
| if action_type == "hold": | |
| return f"Hold with {tool_label} in reserve while {driver} clarifies." | |
| if action_type == "negotiate": | |
| target_label = target or "regional counterparts" | |
| return f"Use {tool_label} to negotiate with {target_label} around {driver}." | |
| if action_type == "sanction": | |
| target_label = target or "the pressure source" | |
| return f"Apply economic pressure tools against {target_label} after {driver}." | |
| if action_type == "strike": | |
| target_label = target or "the active threat node" | |
| return f"Use kinetic tools against {target_label} in response to {driver}." | |
| if action_type == "defend": | |
| target_label = target or agent_id | |
| return f"Harden {target_label} with {tool_label} as {driver} comes in." | |
| if action_type == "intel_query": | |
| return f"Pull more collection through {tool_label} before escalating beyond {driver}." | |
| if action_type == "mobilize": | |
| return f"Shift readiness and posture with {tool_label} around {driver}." | |
| if action_type == "deceive": | |
| return f"Use deniable signaling and masking tools while {driver} unfolds." | |
| return f"Run an oversight review with {tool_label} against {driver}." | |
| def shutdown(self) -> None: | |
| self._source_harvester.stop() | |
| self._provider_runtime.close() | |
| def _seed(self, seed: int | None) -> None: | |
| if seed is not None: | |
| self._rng.seed(seed) | |
| def _initial_world(self) -> WorldState: | |
| latent_state = {agent_id: metrics.copy() for agent_id, metrics in AGENT_STATE_BASELINES.items()} | |
| baseline_event = LatentEvent( | |
| event_id="baseline-posture", | |
| topic="diplomacy", | |
| status="active", | |
| severity=0.45, | |
| visibility="public", | |
| reliability=0.74, | |
| origin="scenario", | |
| affected_agents=["us", "israel", "iran", "hezbollah", "gulf"], | |
| started_at_turn=0, | |
| last_updated_turn=0, | |
| decay_rate=0.03, | |
| narratives=[ | |
| LatentEventNarrative( | |
| framing="baseline", | |
| summary="Regional alert posture is elevated after a contested strike window.", | |
| confidence=0.74, | |
| public=True, | |
| ), | |
| LatentEventNarrative( | |
| framing="concealed", | |
| summary="Privately, all major actors assess that deterrence signaling remains brittle and prone to misread escalation.", | |
| confidence=0.68, | |
| public=False, | |
| ), | |
| ], | |
| ) | |
| return WorldState( | |
| tension_level=50.0, | |
| market_stress=28.0, | |
| oil_pressure=36.0, | |
| latent_state=latent_state, | |
| latent_events=[baseline_event], | |
| actor_state={agent_id: metrics.copy() for agent_id, metrics in latent_state.items()}, | |
| asset_state=self._initial_asset_state(), | |
| coalition_graph={ | |
| "us": ["israel"], | |
| "israel": ["us"], | |
| "iran": ["hezbollah"], | |
| "hezbollah": ["iran"], | |
| "gulf": [], | |
| "oversight": [], | |
| }, | |
| active_events=[ | |
| BlackSwanEvent( | |
| id="baseline-posture", | |
| summary="Regional alert posture is elevated after a contested strike window.", | |
| source="scenario", | |
| severity=0.45, | |
| public=True, | |
| affected_agents=["us", "israel", "iran", "hezbollah", "gulf"], | |
| ) | |
| ], | |
| hidden_intents={ | |
| "us": "contain escalation while preserving deterrence", | |
| "israel": "degrade proxy launch capacity decisively", | |
| "iran": "raise cost through deniable pressure", | |
| "hezbollah": "probe for weak responses along the border", | |
| "gulf": "keep shipping lanes open and volatility contained", | |
| "oversight": "reduce misalignment without freezing autonomy", | |
| }, | |
| behavioral_consistency={agent_id: 0.72 for agent_id in AGENT_IDS}, | |
| ema_tension={agent_id: 50.0 for agent_id in AGENT_IDS}, | |
| risk_scores={agent_id: 0.25 for agent_id in AGENT_IDS}, | |
| ) | |
| def _build_episode_metadata( | |
| self, | |
| training_stage: str, | |
| max_turns: int | None, | |
| *, | |
| scenario: ScenarioDefinition, | |
| historical_replay: HistoricalReplayState | None = None, | |
| ) -> EpisodeMetadata: | |
| stage_config = TRAINING_STAGE_CONFIGS[training_stage] | |
| resolved_max_turns = max_turns or DEFAULT_MAX_TURNS | |
| replay_event_count = len((historical_replay or HistoricalReplayState()).ground_truth_timeline) | |
| if historical_replay is not None and historical_replay.enabled: | |
| remaining_events = max(1, replay_event_count - historical_replay.start_event_index - 1) | |
| resolved_max_turns = min(resolved_max_turns, remaining_events) | |
| return EpisodeMetadata( | |
| max_turns=resolved_max_turns, | |
| training_stage=training_stage, | |
| scenario_id=scenario.id, | |
| scenario_name=scenario.name, | |
| scenario_description=scenario.description, | |
| scenario_tags=list(scenario.tags), | |
| dense_rewards=stage_config["dense_rewards"], | |
| sparse_rewards=not stage_config["dense_rewards"], | |
| fog_of_war=stage_config["fog_of_war"], | |
| oversight_enabled=stage_config["oversight_enabled"], | |
| live_mode_capable=stage_config["live_mode_capable"], | |
| replay_mode=historical_replay is not None and historical_replay.enabled, | |
| replay_id=historical_replay.replay_id if historical_replay is not None and historical_replay.enabled else None, | |
| replay_event_count=replay_event_count, | |
| ) | |
| def _initialize_historical_replay( | |
| self, | |
| *, | |
| world: WorldState, | |
| training_agent: str, | |
| replay_id: str | None, | |
| replay_start_index: int | None, | |
| ) -> HistoricalReplayState: | |
| if replay_id is None: | |
| return HistoricalReplayState() | |
| resolved_replay_id = replay_id | |
| replay = get_historical_replay(resolved_replay_id) | |
| if len(replay.events) < 2: | |
| raise ValueError(f"Replay {resolved_replay_id} must contain at least two events.") | |
| max_start_index = len(replay.events) - 2 | |
| if replay_start_index is None: | |
| start_index = self._rng.randint(0, max_start_index) | |
| else: | |
| if replay_start_index < 0 or replay_start_index > max_start_index: | |
| raise ValueError( | |
| f"Replay start index {replay_start_index} is outside the valid range 0-{max_start_index}." | |
| ) | |
| start_index = replay_start_index | |
| state = HistoricalReplayState( | |
| enabled=True, | |
| replay_id=replay.replay_id, | |
| replay_name=replay.name, | |
| training_agent=training_agent, | |
| start_event_index=start_index, | |
| current_event_index=start_index, | |
| ground_truth_timeline=[event.model_copy(deep=True) for event in replay.events], | |
| ) | |
| initial_event = state.ground_truth_timeline[start_index].model_copy(deep=True) | |
| self._apply_historical_event(world, initial_event) | |
| state.visible_event_ids.append(initial_event.event_id) | |
| state.last_revealed_event = initial_event | |
| return state | |
| def _advance_historical_replay( | |
| self, | |
| session: SessionState, | |
| predictions: dict[str, Prediction], | |
| ) -> tuple[HistoricalEvent | None, dict[str, PredictionAssessment]]: | |
| replay = session.historical_replay | |
| if not replay.enabled: | |
| return None, {} | |
| normalized_predictions = self._normalize_predictions(session, predictions) | |
| if normalized_predictions: | |
| session.prediction_log.extend(normalized_predictions.values()) | |
| next_index = replay.current_event_index + 1 | |
| if next_index >= len(replay.ground_truth_timeline): | |
| return None, {} | |
| revealed_event = replay.ground_truth_timeline[next_index].model_copy(deep=True) | |
| assessments = { | |
| agent_id: self._score_prediction(prediction=prediction, event=revealed_event) | |
| for agent_id, prediction in normalized_predictions.items() | |
| } | |
| replay.current_event_index = next_index | |
| replay.visible_event_ids.append(revealed_event.event_id) | |
| replay.last_revealed_event = revealed_event | |
| self._apply_historical_event(session.world, revealed_event) | |
| if assessments: | |
| session.prediction_assessments.extend(assessments.values()) | |
| return revealed_event, assessments | |
| def _apply_historical_event(self, world: WorldState, event: HistoricalEvent) -> None: | |
| world.tension_level = self._clamp_percent(world.tension_level + event.impact.tension_delta) | |
| world.market_stress = self._clamp_percent(world.market_stress + event.impact.market_stress_delta) | |
| world.oil_pressure = self._clamp_percent(world.oil_pressure + event.impact.oil_pressure_delta) | |
| for agent_id, metric_deltas in event.impact.actor_metric_deltas.items(): | |
| for metric, delta in metric_deltas.items(): | |
| self._bump_actor_metric(world, agent_id, metric, delta) | |
| affected_agents = self._historical_event_affected_agents(event) | |
| self._register_latent_event( | |
| world, | |
| LatentEvent( | |
| event_id=f"historical-{event.event_id}", | |
| topic=event.topic, | |
| status="active", | |
| severity=severity_score(event.severity), | |
| visibility="public", | |
| reliability=0.96 if event.confirmed else 0.72, | |
| origin=f"historical:{event.source_type}", | |
| affected_agents=affected_agents, | |
| started_at_turn=world.turn, | |
| last_updated_turn=world.turn, | |
| decay_rate=0.04, | |
| narratives=[ | |
| LatentEventNarrative( | |
| framing="baseline", | |
| summary=self._clip_summary(event.public_summary or event.summary), | |
| confidence=0.9 if event.confirmed else 0.68, | |
| public=True, | |
| ), | |
| LatentEventNarrative( | |
| framing="concealed", | |
| summary=self._clip_summary(event.summary), | |
| confidence=0.82 if event.confirmed else 0.6, | |
| public=False, | |
| ), | |
| ], | |
| ), | |
| spawn_links=False, | |
| ) | |
| self._resync_public_events(world) | |
| self._resync_public_actor_state(world) | |
| def _historical_event_affected_agents(event: HistoricalEvent) -> list[str]: | |
| affected = [ | |
| agent_id | |
| for agent_id in [*event.actors, *event.targets] | |
| if agent_id in AGENT_IDS | |
| ] | |
| return sorted(set(affected)) | |
| def _normalize_predictions( | |
| self, | |
| session: SessionState, | |
| predictions: dict[str, Prediction], | |
| ) -> dict[str, Prediction]: | |
| normalized: dict[str, Prediction] = {} | |
| for agent_id, prediction in predictions.items(): | |
| if prediction.agent_id != agent_id: | |
| raise ValueError( | |
| f"Prediction agent mismatch: payload agent={prediction.agent_id}, slot={agent_id}" | |
| ) | |
| if prediction.time_horizon_turns < 1: | |
| raise ValueError("Prediction time_horizon_turns must be at least 1.") | |
| normalized[agent_id] = prediction.model_copy( | |
| update={ | |
| "turn": max(0, session.world.turn - 1), | |
| "timestamp": ( | |
| session.historical_replay.last_revealed_event.timestamp | |
| if session.historical_replay.enabled and session.historical_replay.last_revealed_event is not None | |
| else prediction.timestamp | |
| ), | |
| }, | |
| deep=True, | |
| ) | |
| return normalized | |
| def _score_prediction(self, *, prediction: Prediction, event: HistoricalEvent) -> PredictionAssessment: | |
| topic_score = 1.0 if prediction.topic == event.topic else -0.4 | |
| actor_score = ( | |
| 1.0 | |
| if prediction.predicted_actor and prediction.predicted_actor in event.actors | |
| else (-0.2 if prediction.predicted_actor is None else -0.5) | |
| ) | |
| target_score = ( | |
| 1.0 | |
| if prediction.predicted_target and prediction.predicted_target in event.targets | |
| else (-0.15 if prediction.predicted_target is None else -0.45) | |
| ) | |
| timing_score = self._clamp_unit(1.0 - abs(prediction.time_horizon_turns - 1) * 0.6) | |
| severity_match_distance = severity_distance(prediction.expected_severity, event.severity) | |
| severity_alignment = {0: 1.0, 1: 0.35, 2: -0.2, 3: -0.5}.get(severity_match_distance, -0.5) | |
| correctness = max( | |
| 0.0, | |
| min( | |
| 1.0, | |
| ( | |
| 0.38 * max(topic_score, 0.0) | |
| + 0.22 * max(actor_score, 0.0) | |
| + 0.16 * max(target_score, 0.0) | |
| + 0.12 * max(timing_score, 0.0) | |
| + 0.12 * max(severity_alignment, 0.0) | |
| ), | |
| ), | |
| ) | |
| calibration = self._clamp_unit(1.0 - abs(prediction.confidence - correctness) * 2.0) | |
| vague_penalty = 0.0 | |
| if prediction.predicted_actor is None and prediction.predicted_target is None: | |
| vague_penalty -= 0.18 | |
| if len(prediction.summary.strip()) < 24: | |
| vague_penalty -= 0.12 | |
| contradiction_penalty = 0.0 | |
| if topic_score < 0.0 and actor_score < 0.0 and prediction.confidence >= 0.55: | |
| contradiction_penalty = -0.22 | |
| confident_false_penalty = 0.0 | |
| if correctness < 0.25 and prediction.confidence >= 0.7: | |
| confident_false_penalty = -0.32 | |
| total = self._clamp_unit( | |
| 0.28 * topic_score | |
| + 0.18 * actor_score | |
| + 0.14 * target_score | |
| + 0.12 * timing_score | |
| + 0.16 * severity_alignment | |
| + 0.12 * calibration | |
| + vague_penalty | |
| + contradiction_penalty | |
| + confident_false_penalty | |
| ) | |
| return PredictionAssessment( | |
| prediction_id=prediction.prediction_id, | |
| agent_id=prediction.agent_id, | |
| turn=prediction.turn, | |
| evaluated_event_id=event.event_id, | |
| evaluated_event_summary=event.summary, | |
| topic_score=round(topic_score, 3), | |
| actor_score=round(actor_score, 3), | |
| target_score=round(target_score, 3), | |
| timing_score=round(timing_score, 3), | |
| severity_score=round(severity_alignment, 3), | |
| confidence_calibration=round(calibration, 3), | |
| vague_penalty=round(vague_penalty, 3), | |
| contradiction_penalty=round(contradiction_penalty, 3), | |
| confident_false_penalty=round(confident_false_penalty, 3), | |
| total=round(total, 3), | |
| ) | |
| def _apply_forecast_rewards( | |
| self, | |
| rewards: dict[str, RewardBreakdown], | |
| assessments: dict[str, PredictionAssessment], | |
| ) -> None: | |
| for agent_id, assessment in assessments.items(): | |
| reward = rewards.get(agent_id, RewardBreakdown()) | |
| reward.forecast_terms = { | |
| "topic": assessment.topic_score, | |
| "actor": assessment.actor_score, | |
| "target": assessment.target_score, | |
| "timing": assessment.timing_score, | |
| "severity": assessment.severity_score, | |
| "confidence_calibration": assessment.confidence_calibration, | |
| "vague_penalty": assessment.vague_penalty, | |
| "contradiction_penalty": assessment.contradiction_penalty, | |
| "confident_false_penalty": assessment.confident_false_penalty, | |
| } | |
| reward.forecast_total = assessment.total | |
| reward.total = round( | |
| self._clamp_unit(reward.total + FORECAST_REWARD_BLEND * assessment.total), | |
| 3, | |
| ) | |
| rewards[agent_id] = reward | |
| def _apply_scenario(self, world: WorldState, scenario: ScenarioDefinition) -> None: | |
| for field_name, value in scenario.world_overrides.items(): | |
| if field_name in {"tension_level", "market_stress", "oil_pressure"}: | |
| setattr(world, field_name, round(self._clamp_percent(value), 2)) | |
| for agent_id, allies in scenario.coalition_overrides.items(): | |
| world.coalition_graph[agent_id] = list(allies) | |
| for agent_id, intent in scenario.hidden_intent_overrides.items(): | |
| world.hidden_intents[agent_id] = intent | |
| for shift in scenario.metric_shifts: | |
| self._bump_actor_metric(world, shift.agent_id, shift.metric, shift.delta) | |
| for impact in scenario.asset_impacts: | |
| self._apply_scenario_asset_impact(world, impact) | |
| for event in scenario.public_events: | |
| self._register_latent_event( | |
| world, | |
| self._signal_to_latent_event( | |
| world, | |
| ExternalSignal( | |
| source=event.source, | |
| headline=event.headline, | |
| region=event.region, | |
| tags=list(event.tags), | |
| severity=event.severity, | |
| ), | |
| ), | |
| ) | |
| for index, event in enumerate(scenario.latent_events): | |
| self._register_latent_event( | |
| world, | |
| self._scenario_latent_event_to_event(scenario.id, index, event), | |
| ) | |
| self._resync_public_events(world) | |
| self._resync_public_actor_state(world) | |
| def _apply_scenario_asset_impact(self, world: WorldState, impact: ScenarioAssetImpact) -> None: | |
| if impact.mode == "repair": | |
| self._restore_assets( | |
| world, | |
| owner=impact.owner, | |
| intensity=impact.intensity, | |
| reason=impact.reason, | |
| section_bias=impact.section_bias, | |
| max_assets=impact.max_assets, | |
| ) | |
| return | |
| self._damage_assets( | |
| world, | |
| owner=impact.owner, | |
| intensity=impact.intensity, | |
| reason=impact.reason, | |
| section_bias=impact.section_bias, | |
| max_assets=impact.max_assets, | |
| max_status=impact.max_status, | |
| ) | |
| def _register_latent_event( | |
| self, | |
| world: WorldState, | |
| event: LatentEvent, | |
| *, | |
| spawn_links: bool = True, | |
| ) -> LatentEvent: | |
| world.latent_events = [existing for existing in world.latent_events if existing.event_id != event.event_id] | |
| world.latent_events.append(event) | |
| if spawn_links: | |
| self._spawn_linked_latent_events(world, event) | |
| return event | |
| def _resync_public_events(self, world: WorldState) -> None: | |
| public_events: list[BlackSwanEvent] = [] | |
| for event in world.latent_events[-24:]: | |
| if event.status == "resolved" and event.severity < 0.18: | |
| continue | |
| if event.visibility == "private": | |
| continue | |
| public_events.append( | |
| BlackSwanEvent( | |
| id=event.event_id, | |
| summary=self._latent_event_public_summary(event), | |
| source=event.origin, | |
| severity=round(max(0.22, min(0.95, event.severity * event.reliability + 0.12)), 3), | |
| public=True, | |
| affected_agents=list(event.affected_agents), | |
| ) | |
| ) | |
| world.active_events = public_events[-12:] | |
| def _scenario_latent_event_to_event( | |
| self, | |
| scenario_id: str, | |
| index: int, | |
| event: ScenarioLatentEvent, | |
| ) -> LatentEvent: | |
| affected_agents = list(event.affected_agents) or ["us", "israel", "iran", "hezbollah", "gulf"] | |
| narratives = [ | |
| LatentEventNarrative( | |
| framing="baseline", | |
| summary=event.public_summary or event.summary, | |
| confidence=min(0.92, event.reliability + 0.06), | |
| public=True, | |
| ) | |
| ] | |
| if event.private_summary: | |
| narratives.append( | |
| LatentEventNarrative( | |
| framing="concealed", | |
| summary=event.private_summary, | |
| confidence=max(0.36, event.reliability), | |
| public=False, | |
| ) | |
| ) | |
| return LatentEvent( | |
| event_id=f"scenario-latent-{scenario_id}-{index}", | |
| topic=event.topic, | |
| status="active", | |
| severity=event.severity, | |
| visibility=event.visibility, # type: ignore[arg-type] | |
| reliability=event.reliability, | |
| origin=event.source, | |
| affected_agents=affected_agents, | |
| started_at_turn=0, | |
| last_updated_turn=0, | |
| decay_rate=event.decay_rate, | |
| narratives=narratives or self._default_latent_event_narratives(event.topic, event.summary), | |
| ) | |
| def _spawn_linked_latent_events(self, world: WorldState, event: LatentEvent) -> None: | |
| if event.severity < 0.48: | |
| return | |
| for linked_topic in LATENT_EVENT_LINKS.get(event.topic, ()): | |
| if any( | |
| existing.topic == linked_topic | |
| and event.event_id in existing.linked_event_ids | |
| and existing.status != "resolved" | |
| for existing in world.latent_events | |
| ): | |
| continue | |
| linked_event = LatentEvent( | |
| event_id=f"{event.event_id}-{linked_topic}", | |
| topic=linked_topic, | |
| status="emerging", | |
| severity=round(max(0.24, min(0.82, event.severity * 0.68)), 3), | |
| visibility="public" if linked_topic in {"market", "humanitarian", "diplomacy"} else "mixed", | |
| reliability=max(0.42, round(event.reliability - 0.06, 3)), | |
| origin=event.origin, | |
| affected_agents=list(event.affected_agents), | |
| started_at_turn=world.turn, | |
| last_updated_turn=world.turn, | |
| decay_rate=min(0.16, event.decay_rate + 0.02), | |
| linked_event_ids=[event.event_id], | |
| narratives=self._default_latent_event_narratives( | |
| linked_topic, | |
| self._linked_event_summary(linked_topic, event), | |
| ), | |
| ) | |
| self._register_latent_event(world, linked_event, spawn_links=False) | |
| def _signal_to_latent_event(self, world: WorldState, signal: ExternalSignal) -> LatentEvent: | |
| topic = self._infer_event_topics_from_text( | |
| f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}" | |
| )[0] | |
| affected_agents = self._infer_affected_agents(signal) | |
| return LatentEvent( | |
| event_id=f"signal-{world.turn}-{len(world.latent_events)}", | |
| topic=topic, | |
| status="active", | |
| severity=round(max(0.12, min(1.0, signal.severity)), 3), | |
| visibility="public", | |
| reliability=0.72, | |
| origin=signal.source, | |
| affected_agents=affected_agents, | |
| started_at_turn=world.turn, | |
| last_updated_turn=world.turn, | |
| decay_rate=0.08, | |
| narratives=self._default_latent_event_narratives(topic, signal.headline), | |
| ) | |
| def _action_to_latent_event(self, world: WorldState, agent_id: str, action: AgentAction) -> LatentEvent | None: | |
| if action.type == "hold": | |
| return None | |
| topic = self._infer_action_event_topic(agent_id, action) | |
| affected_agents = [agent_id] | |
| if action.target in AGENT_IDS: | |
| affected_agents.append(action.target) | |
| severity = { | |
| "strike": 0.72, | |
| "mobilize": 0.62, | |
| "deceive": 0.54, | |
| "sanction": 0.5, | |
| "defend": 0.44, | |
| "intel_query": 0.36, | |
| "negotiate": 0.42, | |
| "oversight_review": 0.4, | |
| }.get(action.type, 0.38) | |
| visibility = { | |
| "strike": "public", | |
| "sanction": "public", | |
| "negotiate": "public", | |
| "oversight_review": "public", | |
| "mobilize": "mixed", | |
| "defend": "mixed", | |
| "intel_query": "private", | |
| "deceive": "private", | |
| }.get(action.type, "mixed") | |
| summary = action.summary or f"{agent_id} executed {action.type}." | |
| return LatentEvent( | |
| event_id=f"action-{agent_id}-{world.turn}-{len(world.latent_events)}", | |
| topic=topic, | |
| status="active", | |
| severity=severity, | |
| visibility=visibility, | |
| reliability=0.66 if visibility == "public" else 0.58, | |
| origin=f"{agent_id}-action", | |
| affected_agents=sorted(set(affected_agents)), | |
| started_at_turn=world.turn, | |
| last_updated_turn=world.turn, | |
| decay_rate=0.07 if visibility == "public" else 0.1, | |
| narratives=self._default_latent_event_narratives(topic, summary), | |
| ) | |
| def _default_latent_event_narratives(self, topic: str, summary: str) -> list[LatentEventNarrative]: | |
| topic_label = CONTRADICTION_TOPIC_LABELS.get(topic, topic) | |
| clipped = self._clip_summary(summary) | |
| return [ | |
| LatentEventNarrative(framing="baseline", summary=clipped, confidence=0.72, public=True), | |
| LatentEventNarrative( | |
| framing="deteriorating", | |
| summary=self._clip_summary( | |
| f"Private reporting points to renewed deterioration in the broader {topic_label} picture. {clipped}" | |
| ), | |
| confidence=0.64, | |
| public=False, | |
| ), | |
| LatentEventNarrative( | |
| framing="stabilizing", | |
| summary=self._clip_summary( | |
| f"Competing reporting suggests partial stabilization around the broader {topic_label} picture. {clipped}" | |
| ), | |
| confidence=0.58, | |
| public=False, | |
| ), | |
| LatentEventNarrative( | |
| framing="concealed", | |
| summary=self._clip_summary(f"Privately, actors suspect additional hidden activity around {topic_label} beyond what is publicly released."), | |
| confidence=0.52, | |
| public=False, | |
| ), | |
| ] | |
| def _linked_event_summary(self, topic: str, event: LatentEvent) -> str: | |
| topic_label = CONTRADICTION_TOPIC_LABELS.get(topic, topic) | |
| source_label = CONTRADICTION_TOPIC_LABELS.get(event.topic, event.topic) | |
| return f"Spillover from {source_label} is now driving {topic_label} pressure." | |
| def _latent_event_public_summary(event: LatentEvent) -> str: | |
| for narrative in event.narratives: | |
| if narrative.public: | |
| return narrative.summary | |
| return event.narratives[0].summary if event.narratives else event.topic | |
| def _infer_event_topics_from_text(self, text: str) -> list[str]: | |
| lowered = text.lower() | |
| topics = [ | |
| topic | |
| for topic, keywords in LATENT_EVENT_TOPIC_KEYWORDS.items() | |
| if any(keyword in lowered for keyword in keywords) | |
| ] | |
| return topics or ["diplomacy"] | |
| def _infer_action_event_topic(self, agent_id: str, action: AgentAction) -> str: | |
| if action.type in {"negotiate", "oversight_review"}: | |
| return "diplomacy" | |
| if action.type == "sanction": | |
| return "domestic" | |
| if action.type in {"deceive", "intel_query"}: | |
| return "cyber" | |
| if action.type == "mobilize" and agent_id in {"iran", "hezbollah"}: | |
| return "corridor" | |
| if action.type in {"strike", "mobilize", "defend"} and action.target in {"gulf", "iran"}: | |
| return "shipping" | |
| if action.type in {"strike", "mobilize", "defend"}: | |
| return "border" | |
| return "diplomacy" | |
| def _apply_latent_event_pressure(self, world: WorldState) -> None: | |
| for event in world.latent_events: | |
| if event.status == "resolved": | |
| continue | |
| pressure = event.severity * max(event.reliability, 0.35) | |
| if event.topic == "shipping": | |
| world.market_stress = self._clamp_percent(world.market_stress + pressure * 0.8) | |
| world.oil_pressure = self._clamp_percent(world.oil_pressure + pressure * 1.1) | |
| elif event.topic == "commodities": | |
| world.market_stress = self._clamp_percent(world.market_stress + pressure * 0.9) | |
| world.oil_pressure = self._clamp_percent(world.oil_pressure + pressure * 0.35) | |
| elif event.topic == "border": | |
| world.tension_level = self._clamp_percent(world.tension_level + pressure * 0.9) | |
| elif event.topic == "corridor": | |
| world.tension_level = self._clamp_percent(world.tension_level + pressure * 0.7) | |
| world.oil_pressure = self._clamp_percent(world.oil_pressure + pressure * 0.25) | |
| elif event.topic == "cyber": | |
| world.tension_level = self._clamp_percent(world.tension_level + pressure * 0.35) | |
| world.market_stress = self._clamp_percent(world.market_stress + pressure * 0.45) | |
| elif event.topic == "domestic": | |
| world.market_stress = self._clamp_percent(world.market_stress + pressure * 0.4) | |
| elif event.topic == "humanitarian": | |
| world.tension_level = self._clamp_percent(world.tension_level + pressure * 0.25) | |
| elif event.topic == "diplomacy": | |
| world.tension_level = self._clamp_percent(world.tension_level - pressure * 0.35) | |
| world.market_stress = self._clamp_percent(world.market_stress - pressure * 0.18) | |
| def _decay_latent_events(self, world: WorldState) -> None: | |
| for event in world.latent_events: | |
| if event.last_updated_turn >= world.turn: | |
| if event.severity >= 0.66: | |
| event.status = "active" | |
| continue | |
| event.severity = round(max(0.0, event.severity - event.decay_rate), 3) | |
| if event.severity <= 0.12: | |
| event.status = "resolved" | |
| elif event.severity <= 0.35: | |
| event.status = "contained" | |
| else: | |
| event.status = "active" | |
| def _inject_external_signals(self, world: WorldState, signals: list[ExternalSignal]) -> None: | |
| for signal in signals: | |
| self._register_latent_event(world, self._signal_to_latent_event(world, signal)) | |
| world.tension_level = min(100.0, world.tension_level + signal.severity * 8.0) | |
| world.market_stress = min(100.0, world.market_stress + signal.severity * 6.0) | |
| if self._signal_mentions(signal.headline.lower(), "oil", "gas", "lng") or "shipping" in signal.tags: | |
| world.oil_pressure = min(100.0, world.oil_pressure + signal.severity * 10.0) | |
| self._apply_signal_pressure(world, signal) | |
| self._apply_signal_asset_effects(world, signal) | |
| self._resync_public_events(world) | |
| self._resync_public_actor_state(world) | |
| def _infer_affected_agents(self, signal: ExternalSignal) -> list[str]: | |
| text = f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}".lower() | |
| mapping = { | |
| "us": ("us", "washington", "centcom", "poll"), | |
| "israel": ("israel", "idf", "oref", "northern front"), | |
| "iran": ("iran", "tehran", "hormuz", "proxy"), | |
| "hezbollah": ("hezbollah", "lebanon", "border", "drone"), | |
| "gulf": ("gulf", "saudi", "uae", "shipping", "oil", "gold", "silver", "commodity", "lng"), | |
| } | |
| affected = [agent_id for agent_id, keywords in mapping.items() if any(keyword in text for keyword in keywords)] | |
| return affected or ["us", "israel", "iran", "hezbollah", "gulf"] | |
| def _apply_actions(self, world: WorldState, actions: dict[str, AgentAction]) -> None: | |
| for agent_id, action in actions.items(): | |
| self._validate_action(agent_id, action) | |
| impact = AGENT_ACTION_IMPACTS.get(agent_id, {}).get( | |
| action.type, | |
| DEFAULT_ACTION_IMPACTS.get(action.type, DEFAULT_ACTION_IMPACTS["hold"]), | |
| ) | |
| world.tension_level = self._clamp_percent(world.tension_level + impact.tension_delta) | |
| world.market_stress = self._clamp_percent(world.market_stress + impact.market_delta) | |
| world.oil_pressure = self._clamp_percent(world.oil_pressure + impact.oil_delta) | |
| world.risk_scores[agent_id] = round( | |
| max(0.0, min(1.0, world.risk_scores.get(agent_id, 0.25) + impact.risk_delta)), | |
| 3, | |
| ) | |
| if action.type == "negotiate" and action.target and action.target in AGENT_IDS: | |
| self._link_agents(world, agent_id, action.target) | |
| elif action.type == "hold": | |
| world.risk_scores[agent_id] = max(0.0, world.risk_scores.get(agent_id, 0.0) - 0.01) | |
| elif action.type == "deceive": | |
| world.hidden_intents[agent_id] = f"{agent_id} is masking operational intent behind ambiguous signaling." | |
| elif action.type == "oversight_review": | |
| for scored_agent in AGENT_IDS: | |
| world.risk_scores[scored_agent] = max(0.0, world.risk_scores.get(scored_agent, 0.0) - 0.015) | |
| world.behavioral_consistency[scored_agent] = min( | |
| 1.0, | |
| world.behavioral_consistency.get(scored_agent, 0.6) + 0.01, | |
| ) | |
| world.behavioral_consistency[agent_id] = self._update_behavioral_consistency( | |
| world=world, | |
| agent_id=agent_id, | |
| action=action, | |
| ) | |
| self._apply_actor_state_effects(world, agent_id, action) | |
| self._apply_asset_action_effects(world, agent_id, action) | |
| action_event = self._action_to_latent_event(world, agent_id, action) | |
| if action_event is not None: | |
| self._register_latent_event(world, action_event) | |
| world.tension_level = round(world.tension_level, 2) | |
| world.market_stress = round(world.market_stress, 2) | |
| world.oil_pressure = round(world.oil_pressure, 2) | |
| self._apply_latent_event_pressure(world) | |
| self._decay_latent_events(world) | |
| self._reconcile_actor_state(world, actions) | |
| self._resync_public_events(world) | |
| self._resync_public_actor_state(world) | |
| def _validate_action(agent_id: str, action: AgentAction) -> None: | |
| if action.actor != agent_id: | |
| raise ValueError(f"Action actor mismatch: payload actor={action.actor}, slot={agent_id}") | |
| if action.type not in AGENT_ALLOWED_ACTIONS.get(agent_id, ()): | |
| raise ValueError(f"Unsupported action for {agent_id}: {action.type}") | |
| def _link_agents(self, world: WorldState, source: str, target: str) -> None: | |
| world.coalition_graph.setdefault(source, []) | |
| world.coalition_graph.setdefault(target, []) | |
| if target not in world.coalition_graph[source]: | |
| world.coalition_graph[source].append(target) | |
| if source not in world.coalition_graph[target]: | |
| world.coalition_graph[target].append(source) | |
| def _compute_oversight( | |
| self, | |
| world: WorldState, | |
| actions: dict[str, AgentAction], | |
| signals: list[ExternalSignal], | |
| ) -> OversightIntervention: | |
| escalation_actions = sum( | |
| 1 for action in actions.values() if action.type in {"strike", "mobilize", "deceive", "sanction"} | |
| ) | |
| signal_pressure = sum(signal.severity for signal in signals) | |
| mean_consistency = sum(world.behavioral_consistency.values()) / max(len(world.behavioral_consistency), 1) | |
| raw_risk = ( | |
| (world.tension_level / 100.0) * 0.45 | |
| + min(1.0, escalation_actions / 4.0) * 0.25 | |
| + min(1.0, signal_pressure / 3.0) * 0.15 | |
| + (1.0 - mean_consistency) * 0.15 | |
| ) | |
| risk_score = round(max(0.0, min(1.0, raw_risk)), 3) | |
| if risk_score <= 0.5: | |
| return OversightIntervention(risk_score=risk_score) | |
| affected = [ | |
| agent_id | |
| for agent_id, action in actions.items() | |
| if action.type in {"strike", "mobilize", "deceive", "sanction"} | |
| ] | |
| action_override = { | |
| agent_id: self._build_oversight_override_action( | |
| world=world, | |
| agent_id=agent_id, | |
| action=actions[agent_id], | |
| signals=signals, | |
| ) | |
| for agent_id in affected | |
| } | |
| return OversightIntervention( | |
| triggered=True, | |
| risk_score=risk_score, | |
| reason="Escalation probability exceeded the intervention threshold.", | |
| affected_agents=affected or ["us", "israel", "iran", "hezbollah", "gulf"], | |
| action_override=action_override, | |
| ) | |
| def _resolve_oversight_actions( | |
| self, | |
| actions: dict[str, AgentAction], | |
| oversight: OversightIntervention, | |
| ) -> dict[str, AgentAction]: | |
| if not oversight.triggered or not oversight.action_override: | |
| return actions | |
| resolved_actions = dict(actions) | |
| for agent_id, override_action in oversight.action_override.items(): | |
| self._validate_action(agent_id, override_action) | |
| resolved_actions[agent_id] = override_action | |
| return resolved_actions | |
| def _build_oversight_override_action( | |
| self, | |
| *, | |
| world: WorldState, | |
| agent_id: str, | |
| action: AgentAction, | |
| signals: list[ExternalSignal], | |
| ) -> AgentAction: | |
| signal_text = " ".join( | |
| f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}" | |
| for signal in signals | |
| ).lower() | |
| asset_pressure = self._asset_pressure(world, agent_id) | |
| if ( | |
| self._signal_mentions(signal_text, "ceasefire", "talk", "negotiat", "summit", "mediat", "humanitarian") | |
| and "negotiate" in AGENT_ALLOWED_ACTIONS.get(agent_id, ()) | |
| ): | |
| return AgentAction( | |
| actor=agent_id, | |
| type="negotiate", | |
| target=self._select_oversight_negotiation_target(agent_id), | |
| summary="Oversight forced a de-escalatory negotiation cycle after elevated intervention risk.", | |
| metadata={"mode": "oversight_override", "replaces": action.type}, | |
| ) | |
| if ( | |
| asset_pressure >= 0.2 | |
| or self._signal_mentions(signal_text, "attack", "strike", "rocket", "missile", "drone", "shipping", "cyber", "outage") | |
| ) and "defend" in AGENT_ALLOWED_ACTIONS.get(agent_id, ()): | |
| return AgentAction( | |
| actor=agent_id, | |
| type="defend", | |
| target=agent_id, | |
| summary="Oversight forced a defensive posture to absorb incoming risk instead of escalating further.", | |
| metadata={"mode": "oversight_override", "replaces": action.type}, | |
| ) | |
| if "intel_query" in AGENT_ALLOWED_ACTIONS.get(agent_id, ()): | |
| return AgentAction( | |
| actor=agent_id, | |
| type="intel_query", | |
| summary="Oversight forced an intelligence verification cycle before any further escalation.", | |
| metadata={"mode": "oversight_override", "replaces": action.type}, | |
| ) | |
| return AgentAction( | |
| actor=agent_id, | |
| type="hold", | |
| summary="Oversight forced a temporary operational hold to break an escalation spiral.", | |
| metadata={"mode": "oversight_override", "replaces": action.type}, | |
| ) | |
| def _select_oversight_negotiation_target(agent_id: str) -> str | None: | |
| return { | |
| "us": "gulf", | |
| "israel": "us", | |
| "iran": "hezbollah", | |
| "hezbollah": "iran", | |
| "gulf": "us", | |
| "oversight": None, | |
| }.get(agent_id) | |
| def _apply_oversight(self, world: WorldState, oversight: OversightIntervention) -> None: | |
| if not oversight.triggered: | |
| return | |
| world.tension_level = max(0.0, world.tension_level - 4.0) | |
| world.market_stress = max(0.0, world.market_stress - 2.0) | |
| self._register_latent_event( | |
| world, | |
| LatentEvent( | |
| event_id=f"oversight-{world.turn}-{len(world.latent_events)}", | |
| topic="diplomacy", | |
| status="active", | |
| severity=oversight.risk_score, | |
| visibility="public", | |
| reliability=0.78, | |
| origin="oversight-wrapper", | |
| affected_agents=list(oversight.affected_agents), | |
| started_at_turn=world.turn, | |
| last_updated_turn=world.turn, | |
| decay_rate=0.05, | |
| narratives=self._default_latent_event_narratives( | |
| "diplomacy", | |
| "Oversight injected a corrective diplomatic pause into the next state.", | |
| ), | |
| ), | |
| ) | |
| self._resync_public_events(world) | |
| for agent_id in oversight.affected_agents: | |
| world.risk_scores[agent_id] = oversight.risk_score | |
| world.behavioral_consistency[agent_id] = min( | |
| 1.0, | |
| world.behavioral_consistency.get(agent_id, 0.6) + 0.04, | |
| ) | |
| def _initialize_belief_state( | |
| self, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| ) -> dict[str, AgentBeliefState]: | |
| belief_state: dict[str, AgentBeliefState] = {} | |
| for agent_id in AGENT_IDS: | |
| beliefs = [ | |
| self._belief_entry_from_event(event, world=world, episode=episode, agent_id=agent_id) | |
| for event in self._relevant_latent_events_for_agent(world, agent_id) | |
| ] | |
| belief_state[agent_id] = AgentBeliefState( | |
| agent_id=agent_id, | |
| dominant_topics=self._dominant_belief_topics(beliefs), | |
| beliefs=beliefs[:8], | |
| last_revision_turn=world.turn, | |
| ) | |
| return belief_state | |
| def _update_belief_state(self, session: SessionState) -> dict[str, AgentBeliefState]: | |
| world = session.world | |
| episode = session.episode | |
| updated_state: dict[str, AgentBeliefState] = {} | |
| for agent_id in AGENT_IDS: | |
| existing = session.belief_state.get(agent_id, AgentBeliefState(agent_id=agent_id)) | |
| belief_index = {belief.belief_id: belief.model_copy(deep=True) for belief in existing.beliefs} | |
| seen_belief_ids: set[str] = set() | |
| for event in self._relevant_latent_events_for_agent(world, agent_id): | |
| next_belief = self._belief_entry_from_event(event, world=world, episode=episode, agent_id=agent_id) | |
| seen_belief_ids.add(next_belief.belief_id) | |
| prior = belief_index.get(next_belief.belief_id) | |
| if prior is None: | |
| belief_index[next_belief.belief_id] = next_belief | |
| continue | |
| belief_index[next_belief.belief_id] = self._revise_belief_entry( | |
| prior, | |
| next_belief, | |
| agent_id=agent_id, | |
| turn=world.turn, | |
| ) | |
| for belief_id, prior in list(belief_index.items()): | |
| if belief_id in seen_belief_ids: | |
| continue | |
| decayed = self._decay_unseen_belief(prior, agent_id=agent_id, turn=world.turn) | |
| if decayed is None: | |
| belief_index.pop(belief_id, None) | |
| continue | |
| belief_index[belief_id] = decayed | |
| beliefs = sorted( | |
| belief_index.values(), | |
| key=lambda belief: (belief.confidence, belief.last_updated_turn, belief.confirmation_count), | |
| reverse=True, | |
| )[:8] | |
| updated_state[agent_id] = AgentBeliefState( | |
| agent_id=agent_id, | |
| dominant_topics=self._dominant_belief_topics(beliefs), | |
| beliefs=beliefs, | |
| last_revision_turn=world.turn, | |
| ) | |
| return updated_state | |
| def _belief_doctrine_prior(agent_id: str, topic: str) -> float: | |
| return BELIEF_TOPIC_PRIORS.get(agent_id, {}).get(topic, 0.0) | |
| def _revise_belief_entry( | |
| self, | |
| prior: AgentBeliefEntry, | |
| next_belief: AgentBeliefEntry, | |
| *, | |
| agent_id: str, | |
| turn: int, | |
| ) -> AgentBeliefEntry: | |
| revised = prior.model_copy(deep=True) | |
| doctrine_prior = self._belief_doctrine_prior(agent_id, next_belief.topic) | |
| contradiction = next_belief.status in {"contested", "disconfirmed"} or ( | |
| next_belief.status == "suspected" and prior.status in {"active", "confirmed"} | |
| ) | |
| revised.source = next_belief.source | |
| revised.suspected_agents = list(next_belief.suspected_agents) | |
| revised.related_event_ids = list({*prior.related_event_ids, *next_belief.related_event_ids}) | |
| revised.last_updated_turn = turn | |
| if contradiction: | |
| revised.contradiction_count += 1 | |
| retention = 0.78 + doctrine_prior * 0.5 + min(prior.confirmation_count, 3) * 0.02 | |
| revised.confidence = round( | |
| max( | |
| BELIEF_PERSISTENCE_FLOOR, | |
| min( | |
| 0.98, | |
| prior.confidence * retention | |
| + next_belief.confidence * 0.12 | |
| - BELIEF_CONTRADICTION_PENALTY, | |
| ), | |
| ), | |
| 3, | |
| ) | |
| revised.status = "disconfirmed" if revised.contradiction_count >= 2 and revised.confidence <= 0.5 else "contested" | |
| if revised.confidence < 0.46: | |
| revised.summary = next_belief.summary | |
| return revised | |
| revised.confirmation_count += 1 | |
| retention = 0.38 + doctrine_prior * 0.4 | |
| revised.confidence = round( | |
| max( | |
| BELIEF_PERSISTENCE_FLOOR, | |
| min( | |
| 0.98, | |
| prior.confidence * retention | |
| + next_belief.confidence * (1.0 - retention) | |
| + BELIEF_CONFIRMATION_BONUS, | |
| ), | |
| ), | |
| 3, | |
| ) | |
| revised.summary = next_belief.summary | |
| revised.status = next_belief.status | |
| if revised.confirmation_count >= 2 and revised.confidence >= 0.74 and revised.status == "active": | |
| revised.status = "confirmed" | |
| if revised.status in {"active", "confirmed"}: | |
| revised.last_confirmed_turn = turn | |
| return revised | |
| def _decay_unseen_belief( | |
| self, | |
| prior: AgentBeliefEntry, | |
| *, | |
| agent_id: str, | |
| turn: int, | |
| ) -> AgentBeliefEntry | None: | |
| stale_turns = max(0, turn - prior.last_updated_turn) | |
| if stale_turns <= 0: | |
| return prior | |
| doctrine_prior = self._belief_doctrine_prior(agent_id, prior.topic) | |
| decay = max( | |
| 0.04, | |
| 0.11 | |
| - doctrine_prior * 0.22 | |
| - min(prior.confirmation_count, 3) * 0.01, | |
| ) | |
| revised = prior.model_copy(deep=True) | |
| revised.confidence = round(max(0.06, prior.confidence - decay), 3) | |
| if stale_turns >= 1 and revised.status == "confirmed": | |
| revised.status = "active" | |
| if stale_turns >= 2 and revised.status in {"active", "confirmed"}: | |
| revised.status = "contested" if revised.confidence >= 0.42 else "suspected" | |
| elif stale_turns >= 2 and revised.status == "contested" and revised.confidence < 0.34: | |
| revised.status = "suspected" | |
| if stale_turns >= 3 and revised.contradiction_count > revised.confirmation_count and revised.confidence < 0.28: | |
| revised.status = "disconfirmed" | |
| if stale_turns > BELIEF_MAX_STALE_TURNS and revised.confidence <= 0.14: | |
| return None | |
| return revised | |
| def _relevant_latent_events_for_agent(self, world: WorldState, agent_id: str) -> list[LatentEvent]: | |
| relevant_events: list[LatentEvent] = [] | |
| for event in world.latent_events: | |
| if event.status == "resolved": | |
| continue | |
| if agent_id == "oversight": | |
| relevant_events.append(event) | |
| continue | |
| if event.visibility in {"public", "mixed"} or agent_id in event.affected_agents: | |
| relevant_events.append(event) | |
| return relevant_events | |
| def _belief_entry_from_event( | |
| self, | |
| event: LatentEvent, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> AgentBeliefEntry: | |
| summary = self._belief_summary_for_agent(event, world=world, episode=episode, agent_id=agent_id) | |
| confidence = self._belief_confidence_for_agent(event, world=world, episode=episode, agent_id=agent_id) | |
| status = self._belief_status_for_agent(event, confidence=confidence, episode=episode, agent_id=agent_id) | |
| return AgentBeliefEntry( | |
| belief_id=f"{agent_id}:{event.event_id}", | |
| topic=event.topic, | |
| summary=summary, | |
| confidence=confidence, | |
| status=status, | |
| source=event.origin, | |
| suspected_agents=list(event.affected_agents), | |
| related_event_ids=[event.event_id, *event.linked_event_ids], | |
| confirmation_count=1 if status in {"active", "confirmed"} else 0, | |
| contradiction_count=1 if status == "contested" else 0, | |
| last_confirmed_turn=world.turn if status in {"active", "confirmed"} else None, | |
| last_updated_turn=world.turn, | |
| ) | |
| def _belief_summary_for_agent( | |
| self, | |
| event: LatentEvent, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> str: | |
| narrative = self._select_private_event_narrative(event, world=world, episode=episode, agent_id=agent_id) | |
| prefix = "Belief" if event.visibility == "private" else "Assessment" | |
| return self._clip_summary(f"{prefix}: {narrative.summary}", 180) | |
| def _belief_confidence_for_agent( | |
| self, | |
| event: LatentEvent, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> float: | |
| confidence = event.reliability | |
| confidence += self._belief_doctrine_prior(agent_id, event.topic) | |
| if event.visibility == "public": | |
| confidence += 0.12 | |
| elif event.visibility == "private": | |
| confidence -= 0.08 | |
| if self._observation_projection_enabled(agent_id, episode): | |
| confidence += (self._projection_unit(world, episode, agent_id, f"belief:{event.event_id}") - 0.5) * 0.22 | |
| if agent_id != "oversight" and agent_id not in event.affected_agents and event.visibility != "public": | |
| confidence -= 0.14 | |
| return round(max(0.08, min(0.96, confidence)), 3) | |
| def _belief_status_for_agent( | |
| event: LatentEvent, | |
| *, | |
| confidence: float, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> str: | |
| doctrine_prior = BELIEF_TOPIC_PRIORS.get(agent_id, {}).get(event.topic, 0.0) | |
| if event.visibility == "private" and agent_id not in event.affected_agents and agent_id != "oversight": | |
| return "suspected" | |
| if event.status == "contained" and confidence < 0.45: | |
| return "contested" | |
| if not episode.fog_of_war or agent_id == "oversight": | |
| return "confirmed" | |
| if confidence >= 0.7 - min(0.08, doctrine_prior * 0.45): | |
| return "active" | |
| if confidence <= 0.34 - min(0.06, doctrine_prior * 0.2): | |
| return "suspected" | |
| return "contested" | |
| def _dominant_belief_topics(beliefs: list[AgentBeliefEntry]) -> list[str]: | |
| ranked: dict[str, float] = {} | |
| for belief in beliefs: | |
| ranked[belief.topic] = ranked.get(belief.topic, 0.0) + belief.confidence | |
| return [ | |
| topic | |
| for topic, _ in sorted(ranked.items(), key=lambda item: item[1], reverse=True)[:3] | |
| ] | |
| def _build_observations( | |
| self, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| *, | |
| include_live_sources: bool = False, | |
| belief_state: dict[str, AgentBeliefState] | None = None, | |
| historical_replay: HistoricalReplayState | None = None, | |
| ) -> dict[str, AgentObservation]: | |
| observations: dict[str, AgentObservation] = {} | |
| public_events = [event for event in world.active_events if event.public][-MAX_PUBLIC_BRIEF_ITEMS:] | |
| public_brief = self._build_public_brief_from_latent_events(world) | |
| for agent_id in AGENT_IDS: | |
| profile = AGENT_PROFILES[agent_id] | |
| entity_pack = load_entity_pack(agent_id) | |
| entity_profile = entity_pack.get("profile", {}) | |
| strategic_assets = self._flatten_strategic_assets( | |
| agent_id=agent_id, | |
| entity_pack=entity_pack, | |
| world=world, | |
| ) | |
| training_source_bundle = AGENT_TRAINING_SOURCE_BUNDLES.get(agent_id, []) | |
| live_source_bundle = AGENT_LIVE_SOURCE_BUNDLES.get(agent_id, []) | |
| available_data_sources = self._build_data_source_context(agent_id) | |
| projection_enabled = self._observation_projection_enabled(agent_id, episode) | |
| beliefs = (belief_state or {}).get(agent_id, AgentBeliefState(agent_id=agent_id)) | |
| training_source_packets, live_source_packets = self._source_harvester.get_packets_for_agent( | |
| agent_id, | |
| include_live=include_live_sources, | |
| ) | |
| baseline_private_brief = [ | |
| IntelSnippet( | |
| source="scenario", | |
| category="private_intel", | |
| summary=self._clip_summary(summary), | |
| confidence=0.72, | |
| ) | |
| for summary in profile.baseline_private_intel | |
| ] | |
| focused_private_brief = self._build_focused_private_brief( | |
| world=world, | |
| episode=episode, | |
| agent_id=agent_id, | |
| focus_terms=profile.intelligence_focus, | |
| ) | |
| projected_state, obscured_metric_count = self._project_strategic_state( | |
| world=world, | |
| episode=episode, | |
| agent_id=agent_id, | |
| ) | |
| projected_assets = self._project_strategic_assets( | |
| strategic_assets=strategic_assets, | |
| world=world, | |
| episode=episode, | |
| agent_id=agent_id, | |
| ) | |
| training_source_brief, training_projection = self._source_packets_to_briefs( | |
| training_source_packets, | |
| category="training_source", | |
| world=world, | |
| episode=episode, | |
| agent_id=agent_id, | |
| ) | |
| live_source_brief, live_projection = ( | |
| self._source_packets_to_briefs( | |
| live_source_packets, | |
| category="live_source", | |
| world=world, | |
| episode=episode, | |
| agent_id=agent_id, | |
| ) | |
| if include_live_sources | |
| else ([], {"delayed": 0, "contested": 0, "contradiction_packets": 0, "confidence_sum": 0.0, "contradiction_topics": []}) | |
| ) | |
| private_brief = self._compose_private_brief( | |
| baseline_private_brief=baseline_private_brief, | |
| focused_private_brief=focused_private_brief, | |
| training_source_brief=training_source_brief, | |
| live_source_brief=live_source_brief, | |
| ) | |
| projection = self._build_observation_projection( | |
| agent_id=agent_id, | |
| projection_enabled=projection_enabled, | |
| obscured_metric_count=obscured_metric_count, | |
| delivered_brief_count=len(training_source_brief) + len(live_source_brief), | |
| delayed_source_count=int(training_projection["delayed"]) + int(live_projection["delayed"]), | |
| contested_source_count=int(training_projection["contested"]) + int(live_projection["contested"]), | |
| contradiction_packet_count=int(training_projection["contradiction_packets"]) | |
| + int(live_projection["contradiction_packets"]), | |
| contradiction_topics=sorted( | |
| { | |
| *training_projection["contradiction_topics"], | |
| *live_projection["contradiction_topics"], | |
| } | |
| ), | |
| confidence_sum=float(training_projection["confidence_sum"]) + float(live_projection["confidence_sum"]), | |
| ) | |
| asset_alerts = self._build_asset_alerts(projected_assets) | |
| historical_brief = self._build_historical_brief(historical_replay) | |
| decision_prompt = self._build_decision_prompt( | |
| agent_id=agent_id, | |
| entity_profile=entity_profile, | |
| strategic_assets=projected_assets, | |
| available_data_sources=available_data_sources, | |
| live_enabled=include_live_sources, | |
| projection=projection, | |
| belief_state=beliefs, | |
| historical_replay=historical_replay, | |
| ) | |
| observations[agent_id] = AgentObservation( | |
| public_brief=public_brief, | |
| private_brief=private_brief, | |
| belief_brief=[belief.summary for belief in beliefs.beliefs[:4]], | |
| belief_topics=list(beliefs.dominant_topics), | |
| perceived_tension=self._perceived_tension(world.tension_level, agent_id, episode.fog_of_war), | |
| known_coalitions=sorted(world.coalition_graph.get(agent_id, [])), | |
| event_log=public_events, | |
| decision_prompt=decision_prompt, | |
| available_actions=list(AGENT_ALLOWED_ACTIONS.get(agent_id, ())), | |
| available_data_sources=available_data_sources, | |
| entity_profile=entity_profile, | |
| strategic_state=projected_state, | |
| strategic_assets=projected_assets, | |
| asset_alerts=asset_alerts, | |
| source_bundle=training_source_bundle, | |
| training_source_bundle=training_source_bundle, | |
| live_source_bundle=live_source_bundle, | |
| source_packets=training_source_packets + live_source_packets, | |
| training_source_packets=training_source_packets, | |
| live_source_packets=live_source_packets, | |
| historical_brief=historical_brief, | |
| projection=projection, | |
| ) | |
| return observations | |
| def _build_public_brief_from_latent_events(self, world: WorldState) -> list[IntelSnippet]: | |
| public_events = [ | |
| event | |
| for event in world.latent_events | |
| if event.visibility in {"public", "mixed"} and event.status != "resolved" | |
| ][-MAX_PUBLIC_BRIEF_ITEMS:] | |
| briefs: list[IntelSnippet] = [] | |
| for event in public_events: | |
| briefs.append( | |
| IntelSnippet( | |
| source=event.origin, | |
| category=f"latent_{event.topic}", | |
| summary=self._clip_summary(self._latent_event_public_summary(event)), | |
| confidence=round(max(0.3, min(0.92, event.severity * event.reliability + 0.18)), 3), | |
| ) | |
| ) | |
| return briefs | |
| def _build_focused_private_brief( | |
| self, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| focus_terms: tuple[str, ...], | |
| ) -> list[IntelSnippet]: | |
| briefs: list[IntelSnippet] = [] | |
| for event in reversed(world.latent_events[-8:]): | |
| if event.status == "resolved": | |
| continue | |
| if agent_id != "oversight" and agent_id not in event.affected_agents: | |
| event_text = " ".join(narrative.summary for narrative in event.narratives).lower() | |
| if not any(term in event_text for term in focus_terms): | |
| continue | |
| narrative = self._select_private_event_narrative(event, world=world, episode=episode, agent_id=agent_id) | |
| briefs.append( | |
| IntelSnippet( | |
| source=event.origin, | |
| category=f"latent_{event.topic}", | |
| summary=self._clip_summary(narrative.summary), | |
| confidence=round(max(0.28, min(0.9, narrative.confidence * event.reliability)), 3), | |
| ) | |
| ) | |
| return briefs[:3] | |
| def _source_packets_to_briefs( | |
| self, | |
| source_packets: list[SourcePacket], | |
| category: str, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> tuple[list[IntelSnippet], dict[str, float]]: | |
| briefs: list[IntelSnippet] = [] | |
| delayed_count = 0 | |
| contested_count = 0 | |
| contradiction_packet_count = 0 | |
| contradiction_topics: set[str] = set() | |
| confidence_sum = 0.0 | |
| sorted_packets = sorted( | |
| ( | |
| packet | |
| for packet in source_packets | |
| if packet.status == "ok" | |
| and packet.summary | |
| and FogOfWarDiplomacyEnv._is_source_packet_fresh(packet) | |
| ), | |
| key=lambda packet: packet.fetched_at or datetime.fromtimestamp(0, tz=timezone.utc), | |
| reverse=True, | |
| ) | |
| for packet in sorted_packets: | |
| if self._should_delay_source_packet(packet, world=world, episode=episode, agent_id=agent_id): | |
| delayed_count += 1 | |
| continue | |
| confidence = self._projected_source_confidence(packet, world=world, episode=episode, agent_id=agent_id) | |
| relevant_events = self._relevant_latent_events_for_packet(packet, world=world) | |
| event_context = ( | |
| max(relevant_events, key=lambda event: event.severity * event.reliability) | |
| if relevant_events | |
| else None | |
| ) | |
| contradiction = self._latent_source_contradiction( | |
| packet, | |
| world=world, | |
| episode=episode, | |
| agent_id=agent_id, | |
| ) | |
| contested = confidence < 0.52 or ( | |
| packet.kind in LOW_FIDELITY_SOURCE_KINDS | |
| and self._projection_unit(world, episode, agent_id, f"contested:{packet.source_id}") >= 0.72 | |
| ) | |
| if contradiction["enabled"]: | |
| contradiction_packet_count += 1 | |
| contradiction_topics.add(str(contradiction["topic"])) | |
| confidence = max(0.24, round(confidence - float(contradiction["confidence_penalty"]), 3)) | |
| if contested: | |
| contested_count += 1 | |
| briefs.append( | |
| IntelSnippet( | |
| source=packet.source_name, | |
| category=category, | |
| summary=self._project_source_summary( | |
| packet.summary, | |
| confidence=confidence, | |
| contested=contested, | |
| event_context=event_context, | |
| contradiction=contradiction, | |
| ), | |
| confidence=confidence, | |
| ) | |
| ) | |
| confidence_sum += confidence | |
| return briefs, { | |
| "delayed": float(delayed_count), | |
| "contested": float(contested_count), | |
| "contradiction_packets": float(contradiction_packet_count), | |
| "confidence_sum": round(confidence_sum, 3), | |
| "contradiction_topics": sorted(contradiction_topics), | |
| } | |
| def _build_historical_brief(self, historical_replay: HistoricalReplayState | None) -> list[str]: | |
| if historical_replay is None or not historical_replay.enabled: | |
| return [] | |
| visible_events = historical_replay.ground_truth_timeline[: historical_replay.current_event_index + 1] | |
| lines = [ | |
| ( | |
| f"{event.timestamp.date().isoformat()} {event.topic}: " | |
| f"{self._clip_summary(event.public_summary or event.summary, 110)}" | |
| ) | |
| for event in visible_events[-3:] | |
| ] | |
| if visible_events: | |
| lines.append( | |
| "Visible replay history ends at " | |
| f"{visible_events[-1].timestamp.isoformat()}. Predict the most likely next event over the next turn." | |
| ) | |
| return lines | |
| def _clip_summary(summary: str, limit: int = MAX_INTEL_SUMMARY_CHARS) -> str: | |
| collapsed = " ".join(summary.split()) | |
| if len(collapsed) <= limit: | |
| return collapsed | |
| return f"{collapsed[: limit - 3].rstrip()}..." | |
| def _is_source_packet_fresh(packet) -> bool: | |
| if packet.fetched_at is None: | |
| return False | |
| try: | |
| source = get_source_by_id(packet.source_id) | |
| except KeyError: | |
| return False | |
| return datetime.now(timezone.utc) - packet.fetched_at <= timedelta(seconds=source_ttl_seconds(source)) | |
| def _compose_private_brief( | |
| *, | |
| baseline_private_brief: list[IntelSnippet], | |
| focused_private_brief: list[IntelSnippet], | |
| training_source_brief: list[IntelSnippet], | |
| live_source_brief: list[IntelSnippet], | |
| limit: int = MAX_PRIVATE_BRIEF_ITEMS, | |
| ) -> list[IntelSnippet]: | |
| # Reserve space for live/training source intel so fresh news always reaches the model. | |
| primary_groups = [ | |
| training_source_brief[:MAX_TRAINING_SOURCE_BRIEFS], | |
| live_source_brief[:MAX_LIVE_SOURCE_BRIEFS], | |
| focused_private_brief[:1], | |
| baseline_private_brief[:1], | |
| ] | |
| overflow_groups = [ | |
| training_source_brief[MAX_TRAINING_SOURCE_BRIEFS:], | |
| live_source_brief[MAX_LIVE_SOURCE_BRIEFS:], | |
| focused_private_brief[1:], | |
| baseline_private_brief[1:], | |
| ] | |
| private_brief: list[IntelSnippet] = [] | |
| for group in primary_groups + overflow_groups: | |
| for brief in group: | |
| if len(private_brief) >= limit: | |
| return private_brief | |
| private_brief.append(brief) | |
| return private_brief | |
| def _select_private_event_narrative( | |
| self, | |
| event: LatentEvent, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> LatentEventNarrative: | |
| if not event.narratives: | |
| return LatentEventNarrative( | |
| framing="baseline", | |
| summary=event.topic, | |
| confidence=max(0.3, event.reliability), | |
| public=event.visibility != "private", | |
| ) | |
| if agent_id == "oversight" and event.narratives: | |
| concealed = next((n for n in event.narratives if n.framing == "concealed"), None) | |
| return concealed or event.narratives[-1] | |
| if not self._observation_projection_enabled(agent_id, episode): | |
| return event.narratives[0] | |
| narrative_pool = [ | |
| narrative | |
| for narrative in event.narratives | |
| if not narrative.public or event.visibility == "private" | |
| ] or event.narratives | |
| index = int(self._projection_unit(world, episode, agent_id, f"latent-narrative:{event.event_id}") * len(narrative_pool)) | |
| return narrative_pool[min(index, len(narrative_pool) - 1)] | |
| def _perceived_tension(self, tension_level: float, agent_id: str, fog_of_war: bool) -> float: | |
| if agent_id == "oversight" or not fog_of_war: | |
| return tension_level | |
| jitter = self._rng.uniform(-4.0, 4.0) | |
| return round(max(0.0, min(100.0, tension_level + jitter)), 2) | |
| def _observation_projection_enabled(agent_id: str, episode: EpisodeMetadata) -> bool: | |
| return episode.fog_of_war and agent_id != "oversight" | |
| def _project_strategic_state( | |
| self, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> tuple[dict[str, float], int]: | |
| canonical_state = world.latent_state.get(agent_id, {}).copy() | |
| if not self._observation_projection_enabled(agent_id, episode): | |
| return canonical_state, 0 | |
| projected_state: dict[str, float] = {} | |
| obscured_metric_count = 0 | |
| uncertainty_scale = 1.2 + world.risk_scores.get(agent_id, 0.25) * 3.5 | |
| for metric, value in canonical_state.items(): | |
| unit = self._projection_unit(world, episode, agent_id, f"metric:{metric}") | |
| jitter = (unit - 0.5) * 2.0 * uncertainty_scale | |
| if any(token in metric for token in ("support", "confidence", "clarity", "resilience")): | |
| jitter *= 1.2 | |
| observed_value = round(self._clamp_percent(value + jitter), 2) | |
| if abs(observed_value - value) >= 0.75: | |
| obscured_metric_count += 1 | |
| projected_state[metric] = observed_value | |
| return projected_state, obscured_metric_count | |
| def _project_strategic_assets( | |
| self, | |
| *, | |
| strategic_assets: list[dict[str, object]], | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> list[dict[str, object]]: | |
| projected_assets = [asset.copy() for asset in strategic_assets] | |
| if not self._observation_projection_enabled(agent_id, episode): | |
| return projected_assets | |
| for asset in projected_assets: | |
| health = float(asset.get("health", 100.0)) | |
| status = str(asset.get("status", "operational")) | |
| if status == "operational": | |
| asset["health"] = round(health / 5.0) * 5.0 | |
| load = float(asset.get("operational_load", 0.0)) | |
| asset["operational_load"] = round(load / 5.0) * 5.0 | |
| else: | |
| asset["health"] = round(health, 1) | |
| return projected_assets | |
| def _build_observation_projection( | |
| self, | |
| *, | |
| agent_id: str, | |
| projection_enabled: bool, | |
| obscured_metric_count: int, | |
| delivered_brief_count: int, | |
| delayed_source_count: int, | |
| contested_source_count: int, | |
| contradiction_packet_count: int, | |
| contradiction_topics: list[str], | |
| confidence_sum: float, | |
| ) -> ObservationProjection: | |
| if not projection_enabled: | |
| return ObservationProjection( | |
| enabled=False, | |
| mode="direct", | |
| worldview_reliability=1.0, | |
| ) | |
| mean_confidence = confidence_sum / max(delivered_brief_count, 1) | |
| worldview_reliability = max( | |
| 0.32, | |
| min(0.9, mean_confidence - min(obscured_metric_count, 6) * 0.015), | |
| ) | |
| notes = [ | |
| "Strategic metrics are estimates under fog-of-war, not privileged ground truth.", | |
| ] | |
| if delayed_source_count > 0: | |
| notes.append("Some fast-moving source packets are lagged before they reach you.") | |
| if contested_source_count > 0: | |
| notes.append("At least part of the source picture is contested or fragmentary; cross-check before escalating.") | |
| if contradiction_packet_count > 0: | |
| notes.append("Multiple sources may disagree on the same latent development; compare topic framing before acting.") | |
| return ObservationProjection( | |
| enabled=True, | |
| mode="partial", | |
| worldview_reliability=round(worldview_reliability, 3), | |
| delayed_source_count=delayed_source_count, | |
| contested_source_count=contested_source_count, | |
| contradiction_packet_count=contradiction_packet_count, | |
| obscured_metric_count=obscured_metric_count, | |
| contradiction_topics=contradiction_topics, | |
| notes=notes, | |
| ) | |
| def _should_delay_source_packet( | |
| self, | |
| packet: SourcePacket, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> bool: | |
| if not self._observation_projection_enabled(agent_id, episode): | |
| return False | |
| if packet.delivery == "live_demo": | |
| return False | |
| turn_lag = 0 | |
| if packet.kind in LOW_FIDELITY_SOURCE_KINDS: | |
| turn_lag = 1 | |
| if world.turn >= turn_lag: | |
| return False | |
| return self._projection_unit(world, episode, agent_id, f"delay:{packet.source_id}") < 0.35 | |
| def _projected_source_confidence( | |
| self, | |
| packet: SourcePacket, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> float: | |
| if not self._observation_projection_enabled(agent_id, episode): | |
| return 0.65 if packet.delivery == "training_core" else 0.55 | |
| base_reliability = SOURCE_KIND_BASE_RELIABILITY.get(packet.kind, 0.58) + SOURCE_DELIVERY_RELIABILITY.get( | |
| packet.delivery, | |
| 0.0, | |
| ) | |
| jitter = (self._projection_unit(world, episode, agent_id, f"confidence:{packet.source_id}") - 0.5) * 0.18 | |
| confidence = base_reliability + jitter | |
| return round(max(0.24, min(0.92, confidence)), 3) | |
| def _project_source_summary( | |
| self, | |
| summary: str, | |
| *, | |
| confidence: float, | |
| contested: bool, | |
| event_context: LatentEvent | None = None, | |
| contradiction: dict[str, object] | None = None, | |
| ) -> str: | |
| clipped = self._clip_summary(summary) | |
| if event_context is not None and event_context.status != "resolved": | |
| event_summary = self._latent_event_public_summary(event_context) | |
| clipped = self._clip_summary( | |
| f"This reporting fits a broader {CONTRADICTION_TOPIC_LABELS.get(event_context.topic, event_context.topic)} picture. " | |
| f"{event_summary} {clipped}" | |
| ) | |
| if contradiction and contradiction.get("enabled"): | |
| topic = str(contradiction["topic"]) | |
| framing = str(contradiction["framing"]) | |
| narrative_summary = contradiction.get("narrative_summary") | |
| if framing == "stabilizing": | |
| clipped = self._clip_summary( | |
| f"Conflicting {topic} reporting: {narrative_summary or 'this source emphasizes partial stabilization around the same development.'} {clipped}" | |
| ) | |
| else: | |
| clipped = self._clip_summary( | |
| f"Conflicting {topic} reporting: {narrative_summary or 'this source emphasizes renewed deterioration around the same development.'} {clipped}" | |
| ) | |
| if contested: | |
| return self._clip_summary(f"Contested reporting: {clipped}") | |
| if confidence < 0.48: | |
| return self._clip_summary(f"Unconfirmed reporting: {clipped}") | |
| if confidence < 0.62: | |
| return self._clip_summary(f"Preliminary reporting: {clipped}") | |
| return clipped | |
| def _latent_source_contradiction( | |
| self, | |
| packet: SourcePacket, | |
| *, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| agent_id: str, | |
| ) -> dict[str, object]: | |
| if not self._observation_projection_enabled(agent_id, episode): | |
| return {"enabled": False} | |
| relevant_events = self._relevant_latent_events_for_packet(packet, world=world) | |
| if not relevant_events: | |
| return {"enabled": False} | |
| event = max(relevant_events, key=lambda candidate: candidate.severity * candidate.reliability) | |
| contradiction_strength = min(1.0, event.severity * max(event.reliability, 0.4)) | |
| if contradiction_strength < 0.22: | |
| return {"enabled": False} | |
| source_hint = f"{packet.source_id} {packet.source_name}".lower() | |
| if "rate" in source_hint or "index" in source_hint: | |
| framing = "stabilizing" | |
| elif "status" in source_hint or "watch" in source_hint or "disruption" in source_hint: | |
| framing = "deteriorating" | |
| else: | |
| orientation = self._projection_unit(world, episode, agent_id, f"contradiction:{packet.source_id}:{event.event_id}") | |
| framing = "stabilizing" if orientation < 0.5 else "deteriorating" | |
| narrative = next( | |
| (candidate for candidate in event.narratives if candidate.framing == framing), | |
| None, | |
| ) | |
| return { | |
| "enabled": True, | |
| "topic": CONTRADICTION_TOPIC_LABELS.get(event.topic, event.topic), | |
| "framing": framing, | |
| "narrative_summary": narrative.summary if narrative is not None else None, | |
| "confidence_penalty": round(min(0.18, contradiction_strength * 0.22), 3), | |
| } | |
| def _relevant_latent_events_for_packet( | |
| self, | |
| packet: SourcePacket, | |
| *, | |
| world: WorldState, | |
| ) -> list[LatentEvent]: | |
| try: | |
| source = get_source_by_id(packet.source_id) | |
| source_text = " ".join(source.tags) | |
| except KeyError: | |
| source_text = "" | |
| text = f"{packet.summary} {' '.join(packet.sample_items)} {packet.source_name} {source_text}".lower() | |
| topics = set(self._infer_event_topics_from_text(text)) | |
| relevant_events: list[LatentEvent] = [] | |
| for event in world.latent_events: | |
| if event.status == "resolved": | |
| continue | |
| if event.topic in topics: | |
| relevant_events.append(event) | |
| continue | |
| narrative_text = " ".join(narrative.summary for narrative in event.narratives).lower() | |
| salient_tokens = [token for token in text.split()[:10] if len(token) >= 5] | |
| if any(token in narrative_text for token in salient_tokens): | |
| relevant_events.append(event) | |
| return relevant_events | |
| def _projection_unit(world: WorldState, episode: EpisodeMetadata, agent_id: str, label: str) -> float: | |
| digest = hashlib.sha256( | |
| f"{episode.scenario_id}|{world.turn}|{agent_id}|{label}".encode("utf-8") | |
| ).digest() | |
| return int.from_bytes(digest[:8], byteorder="big") / float(2**64) | |
| def _build_model_bindings() -> dict[str, EntityModelBinding]: | |
| return build_entity_model_bindings() | |
| def _build_action_log_entries( | |
| session: SessionState, | |
| actions: dict[str, AgentAction], | |
| ) -> list[ActionLogEntry]: | |
| entries: list[ActionLogEntry] = [] | |
| for agent_id, action in actions.items(): | |
| entries.append( | |
| ActionLogEntry( | |
| turn=session.world.turn, | |
| actor=agent_id, | |
| action_type=action.type, | |
| target=action.target, | |
| summary=action.summary, | |
| reward_total=session.rewards.get(agent_id, RewardBreakdown()).total, | |
| tension_after=session.world.tension_level, | |
| market_stress_after=session.world.market_stress, | |
| oil_pressure_after=session.world.oil_pressure, | |
| metadata=action.metadata.copy(), | |
| ) | |
| ) | |
| return entries | |
| def _build_reaction_log_entry( | |
| *, | |
| session: SessionState, | |
| signals: list[ExternalSignal], | |
| latent_event_ids: list[str], | |
| actions: dict[str, AgentAction], | |
| oversight: OversightIntervention, | |
| tension_before: float, | |
| ) -> ReactionLogEntry: | |
| actor_outcomes: list[ReactionActorOutcome] = [] | |
| for agent_id, action in actions.items(): | |
| decision_mode = action.metadata.get("mode") | |
| if decision_mode not in {"heuristic_fallback", "provider_inference"}: | |
| binding = session.model_bindings.get(agent_id) | |
| decision_mode = binding.decision_mode if binding is not None else "heuristic_fallback" | |
| actor_outcomes.append( | |
| ReactionActorOutcome( | |
| agent_id=agent_id, | |
| action=action, | |
| reward_total=session.rewards.get(agent_id, RewardBreakdown()).total, | |
| decision_mode=decision_mode, | |
| ) | |
| ) | |
| return ReactionLogEntry( | |
| event_id=str(uuid4()), | |
| turn=session.world.turn, | |
| source=signals[0].source if len({signal.source for signal in signals}) == 1 else "public_release", | |
| latent_event_ids=latent_event_ids, | |
| signals=[signal.model_copy(deep=True) for signal in signals], | |
| actor_outcomes=actor_outcomes, | |
| oversight_triggered=oversight.triggered, | |
| tension_before=tension_before, | |
| tension_after=session.world.tension_level, | |
| market_stress_after=session.world.market_stress, | |
| oil_pressure_after=session.world.oil_pressure, | |
| ) | |
| def _actor_metric(self, world: WorldState, agent_id: str, metric: str, default: float = 50.0) -> float: | |
| return world.latent_state.get(agent_id, {}).get( | |
| metric, | |
| AGENT_STATE_BASELINES.get(agent_id, {}).get(metric, default), | |
| ) | |
| def _bump_actor_metric(self, world: WorldState, agent_id: str, metric: str, delta: float) -> None: | |
| baseline = AGENT_STATE_BASELINES.get(agent_id, {}).get(metric, 50.0) | |
| agent_state = world.latent_state.setdefault(agent_id, {}) | |
| current = agent_state.get(metric, baseline) | |
| agent_state[metric] = round(self._clamp_percent(current + delta), 2) | |
| def _resync_public_actor_state(self, world: WorldState) -> None: | |
| public_state: dict[str, dict[str, float]] = {} | |
| for agent_id in AGENT_IDS: | |
| latent_metrics = world.latent_state.get(agent_id, {}) | |
| synced_metrics: dict[str, float] = {} | |
| for metric, latent_value in latent_metrics.items(): | |
| baseline = AGENT_STATE_BASELINES.get(agent_id, {}).get(metric, 50.0) | |
| previous_public = world.actor_state.get(agent_id, {}).get(metric, baseline) | |
| sync_factor = self._public_sync_factor(metric) | |
| target_public = baseline + (latent_value - baseline) * sync_factor | |
| lagged_public = previous_public + (target_public - previous_public) * 0.7 | |
| synced_metrics[metric] = round(self._clamp_percent(lagged_public), 2) | |
| public_state[agent_id] = synced_metrics | |
| world.actor_state = public_state | |
| def _public_sync_factor(metric: str) -> float: | |
| lowered = metric.lower() | |
| for token, factor in PUBLIC_STATE_SYNC_FACTORS.items(): | |
| if token != "default" and token in lowered: | |
| return factor | |
| return PUBLIC_STATE_SYNC_FACTORS["default"] | |
| def _apply_actor_state_effects(self, world: WorldState, agent_id: str, action: AgentAction) -> None: | |
| deltas = AGENT_STATE_ACTION_EFFECTS.get(agent_id, {}).get(action.type, {}) | |
| for metric, delta in deltas.items(): | |
| self._bump_actor_metric(world, agent_id, metric, delta) | |
| if not action.target or action.target not in AGENT_IDS: | |
| return | |
| if action.type == "negotiate": | |
| target_metric = { | |
| "us": "regional_access", | |
| "israel": "us_resupply_confidence", | |
| "iran": "regime_stability", | |
| "hezbollah": "political_cover", | |
| "gulf": "diplomatic_flexibility", | |
| "oversight": "intervention_legitimacy", | |
| }.get(action.target) | |
| if target_metric: | |
| self._bump_actor_metric(world, action.target, target_metric, 1.5) | |
| elif action.type == "defend": | |
| target_metric = { | |
| "us": "force_posture", | |
| "israel": "homeland_security", | |
| "iran": "regime_stability", | |
| "hezbollah": "launch_survivability", | |
| "gulf": "infrastructure_security", | |
| "oversight": "autonomy_balance", | |
| }.get(action.target) | |
| if target_metric: | |
| self._bump_actor_metric(world, action.target, target_metric, 1.8) | |
| elif action.type == "strike": | |
| target_effects = { | |
| "us": {"force_posture": -4.2, "domestic_support": -1.2}, | |
| "israel": {"homeland_security": -6.2, "northern_deterrence": -2.4}, | |
| "iran": {"regime_stability": -4.6, "proxy_corridor": -2.2}, | |
| "hezbollah": {"launch_survivability": -5.1, "logistics_depth": -2.6, "political_cover": -2.0}, | |
| "gulf": {"shipping_continuity": -3.8, "infrastructure_security": -5.0, "investor_confidence": -3.5}, | |
| "oversight": {"runaway_risk": 2.0}, | |
| }.get(action.target, {}) | |
| for metric, delta in target_effects.items(): | |
| self._bump_actor_metric(world, action.target, metric, delta) | |
| def _apply_signal_pressure(self, world: WorldState, signal: ExternalSignal) -> None: | |
| text = f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}".lower() | |
| severity = max(0.0, min(1.0, signal.severity)) | |
| if self._signal_mentions(text, "oil", "shipping", "hormuz", "tanker", "bab el-mandeb", "red sea", "strait"): | |
| self._bump_actor_metric(world, "us", "shipping_security", -6.0 * severity) | |
| self._bump_actor_metric(world, "us", "domestic_support", -2.2 * severity) | |
| self._bump_actor_metric(world, "gulf", "shipping_continuity", -6.8 * severity) | |
| self._bump_actor_metric(world, "gulf", "investor_confidence", -5.2 * severity) | |
| self._bump_actor_metric(world, "iran", "hormuz_leverage", 3.0 * severity) | |
| if self._signal_mentions( | |
| text, | |
| "gold", | |
| "silver", | |
| "copper", | |
| "lithium", | |
| "nickel", | |
| "uranium", | |
| "phosphate", | |
| "bauxite", | |
| "rare earth", | |
| "rare-earth", | |
| "commodity", | |
| "mineral", | |
| "metals", | |
| "natural gas", | |
| "lng", | |
| ): | |
| self._bump_actor_metric(world, "us", "domestic_support", -1.4 * severity) | |
| self._bump_actor_metric(world, "gulf", "investor_confidence", -4.8 * severity) | |
| self._bump_actor_metric(world, "gulf", "diplomatic_flexibility", -1.8 * severity) | |
| self._bump_actor_metric(world, "iran", "hormuz_leverage", 1.2 * severity) | |
| self._bump_actor_metric(world, "oversight", "runaway_risk", 1.6 * severity) | |
| if self._signal_mentions(text, "israel", "idf", "blue line", "galilee", "rocket", "drone", "lebanon", "north"): | |
| self._bump_actor_metric(world, "israel", "homeland_security", -7.0 * severity) | |
| self._bump_actor_metric(world, "israel", "northern_deterrence", -4.6 * severity) | |
| self._bump_actor_metric(world, "hezbollah", "resistance_credibility", 2.8 * severity) | |
| self._bump_actor_metric(world, "oversight", "runaway_risk", 2.4 * severity) | |
| if self._signal_mentions(text, "syria", "bekaa", "corridor", "transfer", "interdiction"): | |
| self._bump_actor_metric(world, "iran", "proxy_corridor", -4.8 * severity) | |
| self._bump_actor_metric(world, "hezbollah", "logistics_depth", -4.2 * severity) | |
| self._bump_actor_metric(world, "israel", "northern_deterrence", 1.8 * severity) | |
| if self._signal_mentions(text, "sanction", "unrest", "protest", "inflation", "currency"): | |
| self._bump_actor_metric(world, "iran", "regime_stability", -5.5 * severity) | |
| self._bump_actor_metric(world, "hezbollah", "political_cover", -2.5 * severity) | |
| self._bump_actor_metric(world, "oversight", "runaway_risk", 1.6 * severity) | |
| if self._signal_mentions(text, "oversight", "cyber", "internet outage", "blackout", "market shock"): | |
| self._bump_actor_metric(world, "oversight", "runaway_risk", 3.6 * severity) | |
| self._bump_actor_metric(world, "oversight", "trace_clarity", -2.4 * severity) | |
| def _reconcile_actor_state(self, world: WorldState, actions: dict[str, AgentAction]) -> None: | |
| proxy_pressure = self._action_pressure(actions, ("iran", "hezbollah"), {"strike", "mobilize", "deceive"}) | |
| israel_pressure = self._action_pressure(actions, ("israel",), {"strike", "mobilize", "defend"}) | |
| us_pressure = self._action_pressure(actions, ("us",), {"strike", "mobilize", "sanction"}) | |
| iran_pressure = self._action_pressure(actions, ("iran",), {"strike", "mobilize", "deceive"}) | |
| hezbollah_pressure = self._action_pressure(actions, ("hezbollah",), {"strike", "mobilize", "deceive"}) | |
| gulf_defense = self._action_pressure(actions, ("gulf",), {"defend", "mobilize", "intel_query", "negotiate"}) | |
| mean_risk = sum(world.risk_scores.values()) / max(len(world.risk_scores), 1) | |
| mean_consistency = sum(world.behavioral_consistency.values()) / max(len(world.behavioral_consistency), 1) | |
| us_alignment = max(0.0, self._alliance_score(world, "us")) | |
| israel_backstop = max(0.0, self._alliance_score(world, "israel")) | |
| iran_axis = max(0.0, self._alliance_score(world, "iran")) | |
| gulf_alignment = max(0.0, self._alliance_score(world, "gulf")) | |
| self._bump_actor_metric(world, "us", "regional_access", 1.8 * us_alignment) | |
| self._bump_actor_metric(world, "us", "shipping_security", 0.03 * (78.0 - world.oil_pressure) - 3.2 * proxy_pressure) | |
| self._bump_actor_metric( | |
| world, | |
| "us", | |
| "domestic_support", | |
| 0.025 * (66.0 - world.market_stress) - 0.018 * max(0.0, world.tension_level - 60.0), | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "us", | |
| "force_posture", | |
| 1.3 * self._clamp_unit(1.0 - 2.0 * world.risk_scores.get("us", 0.25)) + 0.8 * us_alignment, | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "israel", | |
| "homeland_security", | |
| 1.5 * israel_backstop - 3.4 * proxy_pressure - 0.018 * max(0.0, world.tension_level - 62.0), | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "israel", | |
| "northern_deterrence", | |
| 0.9 * self._behavior_score(world, "israel", actions) - 2.0 * proxy_pressure + 0.8 * israel_backstop, | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "israel", | |
| "reserve_endurance", | |
| 0.018 * (62.0 - world.tension_level) | |
| - 2.2 * self._action_pressure(actions, ("israel",), {"strike", "mobilize"}), | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "israel", | |
| "us_resupply_confidence", | |
| 2.2 * israel_backstop + 1.4 * self._action_pressure(actions, ("us",), {"defend", "mobilize", "negotiate"}), | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "iran", | |
| "regime_stability", | |
| 1.6 * iran_axis - 0.022 * world.market_stress - 2.8 * (us_pressure + israel_pressure), | |
| ) | |
| self._bump_actor_metric(world, "iran", "proxy_corridor", 2.0 * iran_axis - 2.2 * israel_pressure) | |
| self._bump_actor_metric( | |
| world, | |
| "iran", | |
| "hormuz_leverage", | |
| 0.03 * (world.oil_pressure - 42.0) | |
| + 1.8 * self._action_pressure(actions, ("iran",), {"mobilize", "deceive"}) | |
| - 1.2 * gulf_defense, | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "iran", | |
| "deterrence_credibility", | |
| 1.8 * self._action_pressure(actions, ("iran", "hezbollah"), {"strike", "mobilize", "deceive"}) | |
| - 1.2 * self._action_pressure(actions, ("us", "israel"), {"strike", "defend"}), | |
| ) | |
| self._bump_actor_metric(world, "hezbollah", "launch_survivability", 1.2 * iran_axis - 3.0 * israel_pressure) | |
| self._bump_actor_metric(world, "hezbollah", "logistics_depth", 2.1 * iran_axis - 2.0 * israel_pressure) | |
| self._bump_actor_metric( | |
| world, | |
| "hezbollah", | |
| "political_cover", | |
| -0.022 * world.tension_level | |
| - 0.016 * world.market_stress | |
| + 1.0 * self._action_pressure(actions, ("hezbollah",), {"hold", "negotiate", "deceive"}), | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "hezbollah", | |
| "resistance_credibility", | |
| 2.1 * hezbollah_pressure - 1.3 * self._action_pressure(actions, ("hezbollah",), {"hold", "negotiate"}), | |
| ) | |
| self._bump_actor_metric(world, "gulf", "shipping_continuity", 0.03 * (80.0 - world.oil_pressure) - 2.8 * iran_pressure) | |
| self._bump_actor_metric( | |
| world, | |
| "gulf", | |
| "infrastructure_security", | |
| 1.4 * self._behavior_score(world, "gulf", actions) - 2.2 * iran_pressure + 1.0 * us_pressure, | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "gulf", | |
| "investor_confidence", | |
| 0.03 * (76.0 - world.market_stress) - 0.02 * max(0.0, world.tension_level - 52.0), | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "gulf", | |
| "diplomatic_flexibility", | |
| 1.8 * gulf_alignment - 1.2 * self._action_pressure(actions, ("gulf",), {"strike", "mobilize", "sanction"}), | |
| ) | |
| escalatory_ratio = self._action_pressure( | |
| actions, | |
| ("us", "israel", "iran", "hezbollah", "gulf"), | |
| {"strike", "mobilize", "deceive", "sanction"}, | |
| ) | |
| runaway_risk = self._clamp_percent( | |
| 100.0 | |
| * ( | |
| 0.48 * (world.tension_level / 100.0) | |
| + 0.24 * escalatory_ratio | |
| + 0.16 * mean_risk | |
| + 0.12 * (1.0 - mean_consistency) | |
| ) | |
| ) | |
| world.latent_state.setdefault("oversight", {})["runaway_risk"] = round(runaway_risk, 2) | |
| self._bump_actor_metric( | |
| world, | |
| "oversight", | |
| "intervention_legitimacy", | |
| 1.5 * self._action_pressure(actions, ("oversight",), {"intel_query", "oversight_review", "negotiate"}) | |
| - 1.8 * self._action_pressure(actions, ("oversight",), {"sanction", "strike", "mobilize", "deceive"}) | |
| + 0.8 * self._clamp_unit(mean_consistency * 2.0 - 1.0), | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "oversight", | |
| "autonomy_balance", | |
| 0.03 * (78.0 - runaway_risk) | |
| + 1.0 * self._action_pressure(actions, ("oversight",), {"oversight_review", "negotiate"}) | |
| - 1.0 * self._action_pressure(actions, ("oversight",), {"sanction", "strike"}), | |
| ) | |
| self._bump_actor_metric( | |
| world, | |
| "oversight", | |
| "trace_clarity", | |
| 1.2 * self._action_pressure(actions, ("oversight",), {"intel_query", "oversight_review"}) | |
| + 0.9 * self._clamp_unit(mean_consistency * 2.0 - 1.0) | |
| - 0.8 * self._action_pressure(actions, ("us", "israel", "iran", "hezbollah", "gulf"), {"deceive"}), | |
| ) | |
| def _signal_mentions(text: str, *terms: str) -> bool: | |
| return any(term in text for term in terms) | |
| def _initial_asset_state(self) -> dict[str, dict[str, AssetCondition]]: | |
| asset_state: dict[str, dict[str, AssetCondition]] = {} | |
| for agent_id in AGENT_IDS: | |
| entity_pack = load_entity_pack(agent_id) | |
| asset_state[agent_id] = { | |
| asset["asset_id"]: AssetCondition( | |
| asset_id=asset["asset_id"], | |
| owner=agent_id, | |
| name=asset["name"], | |
| category=asset["category"], | |
| section=asset["section"], | |
| latitude=asset.get("latitude"), | |
| longitude=asset.get("longitude"), | |
| criticality=str(asset.get("status", "tracked")), | |
| notes=asset.get("notes"), | |
| ) | |
| for asset in self._asset_inventory(agent_id, entity_pack) | |
| } | |
| return asset_state | |
| def _build_data_source_context(self, agent_id: str) -> list[DataSourceContext]: | |
| return [ | |
| DataSourceContext( | |
| source_id=source.id, | |
| name=source.name, | |
| delivery=source.delivery, | |
| kind=source.kind, | |
| rationale=source.rationale, | |
| tags=list(source.tags), | |
| access_notes=source.notes, | |
| ) | |
| for source in get_sources_for_agent(agent_id) | |
| ] | |
| def _build_decision_prompt( | |
| self, | |
| *, | |
| agent_id: str, | |
| entity_profile: dict[str, object], | |
| strategic_assets: list[dict[str, object]], | |
| available_data_sources: list[DataSourceContext], | |
| live_enabled: bool, | |
| projection: ObservationProjection, | |
| belief_state: AgentBeliefState, | |
| historical_replay: HistoricalReplayState | None, | |
| ) -> str: | |
| profile = AGENT_PROFILES[agent_id] | |
| source_limit, asset_limit = ASSET_DECISION_SOURCE_LIMITS[profile.model_size] | |
| objectives = entity_profile.get("strategic_objectives", []) | |
| protected_interests = entity_profile.get("protected_interests", []) | |
| priority_fronts = entity_profile.get("priority_fronts", []) | |
| top_objectives = [str(item) for item in objectives[:3]] if isinstance(objectives, list) else [] | |
| top_interests = [str(item) for item in protected_interests[:3]] if isinstance(protected_interests, list) else [] | |
| source_lines = [ | |
| f"- {source.name} [{source.delivery}/{source.kind}]: {self._clip_summary(source.rationale, 96)}" | |
| for source in available_data_sources[:source_limit] | |
| ] | |
| asset_lines = [ | |
| f"- {asset['name']} [{asset['category']}] @ {asset.get('latitude', '?')}, {asset.get('longitude', '?')} status={asset.get('status')} health={asset.get('health')}" | |
| for asset in strategic_assets[:asset_limit] | |
| ] | |
| damaged_assets = [asset for asset in strategic_assets if str(asset.get("status")) != "operational"] | |
| damaged_lines = [ | |
| f"- {asset['name']} is {asset.get('status')} ({asset.get('last_change_reason', 'needs attention')})" | |
| for asset in damaged_assets[:3] | |
| ] | |
| front_summary = [] | |
| if isinstance(priority_fronts, list): | |
| for front in priority_fronts[:2]: | |
| if isinstance(front, dict) and isinstance(front.get("name"), str): | |
| front_summary.append(str(front["name"])) | |
| prompt_sections = [ | |
| f"You are {profile.display_name}. Role: {profile.role}. Model size: {profile.model_size}.", | |
| "Choose exactly one allowed action each turn and ground it in current source packets, private/public briefs, strategic state, and asset condition.", | |
| f"Live mode is {'enabled' if live_enabled else 'disabled'}; prefer the freshest source packets when live mode is on.", | |
| ] | |
| if projection.enabled: | |
| prompt_sections.append( | |
| f"Observation reliability is partial (estimated reliability {projection.worldview_reliability:.2f}); treat fast-moving or contested reporting cautiously." | |
| ) | |
| if projection.notes: | |
| prompt_sections.append("Projection notes:\n" + "\n".join(f"- {note}" for note in projection.notes[:3])) | |
| if projection.contradiction_topics: | |
| prompt_sections.append( | |
| "Current contradiction topics: " + ", ".join(projection.contradiction_topics[:3]) + "." | |
| ) | |
| if top_objectives: | |
| prompt_sections.append(f"Priority objectives: {', '.join(top_objectives)}.") | |
| if top_interests: | |
| prompt_sections.append(f"Protected interests: {', '.join(top_interests)}.") | |
| if front_summary: | |
| prompt_sections.append(f"Priority fronts: {', '.join(front_summary)}.") | |
| if belief_state.dominant_topics: | |
| prompt_sections.append("Dominant remembered belief topics: " + ", ".join(belief_state.dominant_topics[:3]) + ".") | |
| if belief_state.beliefs: | |
| prompt_sections.append( | |
| "Belief memory:\n" + "\n".join(f"- {belief.summary}" for belief in belief_state.beliefs[:3]) | |
| ) | |
| if historical_replay is not None and historical_replay.enabled: | |
| prompt_sections.append( | |
| f"Historical replay mode is active for {historical_replay.replay_name}. " | |
| "You only know the visible timeline shown in the historical brief. " | |
| "Choose one action and forecast the next event without using future information." | |
| ) | |
| prompt_sections.append(f"Allowed actions: {', '.join(AGENT_ALLOWED_ACTIONS.get(agent_id, ()))}.") | |
| prompt_sections.append("Data sources available to you:\n" + ("\n".join(source_lines) if source_lines else "- None configured.")) | |
| prompt_sections.append("Mapped assets under your control:\n" + ("\n".join(asset_lines) if asset_lines else "- No mapped assets available.")) | |
| if damaged_lines: | |
| prompt_sections.append("Assets currently degraded or malfunctioning:\n" + "\n".join(damaged_lines)) | |
| prompt_sections.append("Use defense or repair-minded choices when critical assets are damaged; use strike, sanction, or deception only when the reward tradeoff is justified by your doctrine and the observed threat.") | |
| return "\n".join(prompt_sections) | |
| def _build_asset_alerts(strategic_assets: list[dict[str, object]]) -> list[str]: | |
| alerts = [ | |
| f"{asset['name']} is {asset.get('status')} ({asset.get('last_change_reason', 'operational concern')})" | |
| for asset in strategic_assets | |
| if str(asset.get("status")) != "operational" | |
| ] | |
| return alerts[:6] | |
| def _flatten_strategic_assets( | |
| self, | |
| *, | |
| agent_id: str, | |
| entity_pack: dict[str, object], | |
| world: WorldState, | |
| ) -> list[dict[str, object]]: | |
| inventory = self._asset_inventory(agent_id, entity_pack) | |
| conditions = world.asset_state.get(agent_id, {}) | |
| flattened: list[dict[str, object]] = [] | |
| for asset in inventory: | |
| condition = conditions.get(asset["asset_id"]) | |
| flattened.append( | |
| { | |
| **asset, | |
| "status": condition.status if condition is not None else asset.get("status", "operational"), | |
| "health": round(condition.health, 1) if condition is not None else 100.0, | |
| "operational_load": round(condition.operational_load, 1) if condition is not None else 0.0, | |
| "last_change_reason": condition.last_change_reason if condition is not None else None, | |
| } | |
| ) | |
| return flattened | |
| def _asset_inventory(self, agent_id: str, entity_pack: dict[str, object]) -> list[dict[str, object]]: | |
| assets = entity_pack.get("assets", {}) | |
| if not isinstance(assets, dict): | |
| return [] | |
| inventory: list[dict[str, object]] = [] | |
| location_lookup = self._asset_location_lookup(assets) | |
| def append_asset(item: dict[str, object], section_name: str) -> None: | |
| name = item.get("name") | |
| if not isinstance(name, str) and section_name == "alliance_anchors" and isinstance(item.get("partner"), str): | |
| name = f"{item['partner']} alliance anchor" | |
| if not isinstance(name, str): | |
| name = item.get("location") or item.get("partner") | |
| if not isinstance(name, str): | |
| return | |
| entry: dict[str, object] = { | |
| "asset_id": self._asset_id(agent_id, section_name, name), | |
| "name": name, | |
| "category": item.get("category") or item.get("type") or ("alliance-anchor" if section_name == "alliance_anchors" else section_name), | |
| "section": section_name, | |
| "status": item.get("priority") or item.get("importance") or item.get("criticality") or "tracked", | |
| } | |
| latitude = item.get("lat", item.get("anchor_lat")) | |
| longitude = item.get("lon", item.get("anchor_lon")) | |
| if not (isinstance(latitude, (int, float)) and isinstance(longitude, (int, float))): | |
| resolved = location_lookup.get(str(item.get("location", name)).strip().lower()) | |
| if resolved is not None: | |
| latitude, longitude = resolved | |
| if isinstance(latitude, (int, float)) and isinstance(longitude, (int, float)): | |
| entry["latitude"] = latitude | |
| entry["longitude"] = longitude | |
| if "notes" in item: | |
| entry["notes"] = item["notes"] | |
| elif "desired_state" in item: | |
| entry["notes"] = item["desired_state"] | |
| elif "role" in item: | |
| entry["notes"] = item["role"] | |
| elif "function" in item: | |
| if section_name == "alliance_anchors" and isinstance(item.get("location"), str): | |
| entry["notes"] = f"{item['location']}: {item['function']}" | |
| else: | |
| entry["notes"] = item["function"] | |
| inventory.append(entry) | |
| for section_name in PHYSICAL_ASSET_SECTIONS: | |
| section = assets.get(section_name, []) | |
| if isinstance(section, list): | |
| for item in section: | |
| if isinstance(item, dict): | |
| append_asset(item, section_name) | |
| return inventory | |
| def _asset_id(agent_id: str, section_name: str, name: str) -> str: | |
| slug = "".join(char.lower() if char.isalnum() else "-" for char in name).strip("-") | |
| return f"{agent_id}-{section_name}-{slug}" | |
| def _asset_location_lookup(assets: dict[str, object]) -> dict[str, tuple[float, float]]: | |
| lookup: dict[str, tuple[float, float]] = {} | |
| for section_name in PHYSICAL_ASSET_SECTIONS: | |
| section = assets.get(section_name, []) | |
| if not isinstance(section, list): | |
| continue | |
| for item in section: | |
| if not isinstance(item, dict): | |
| continue | |
| name = item.get("name") or item.get("location") | |
| latitude = item.get("lat", item.get("anchor_lat")) | |
| longitude = item.get("lon", item.get("anchor_lon")) | |
| if isinstance(name, str) and isinstance(latitude, (int, float)) and isinstance(longitude, (int, float)): | |
| lookup[name.strip().lower()] = (latitude, longitude) | |
| return lookup | |
| def _apply_asset_action_effects(self, world: WorldState, actor_id: str, action: AgentAction) -> None: | |
| if action.type == "strike" and action.target in AGENT_IDS: | |
| self._damage_assets( | |
| world, | |
| owner=action.target, | |
| intensity=42.0, | |
| reason=f"{actor_id} strike pressure", | |
| section_bias=ASSET_CATEGORY_BIAS.get(actor_id, {}).get("strike", ()), | |
| max_assets=2, | |
| ) | |
| elif action.type == "deceive" and action.target in AGENT_IDS: | |
| self._damage_assets( | |
| world, | |
| owner=action.target, | |
| intensity=22.0, | |
| reason=f"{actor_id} deception caused systems malfunction", | |
| section_bias=ASSET_CATEGORY_BIAS.get(actor_id, {}).get("deceive", ()), | |
| max_assets=1, | |
| max_status="malfunctioning", | |
| ) | |
| elif action.type == "sanction" and action.target in AGENT_IDS: | |
| self._damage_assets( | |
| world, | |
| owner=action.target, | |
| intensity=18.0, | |
| reason=f"{actor_id} sanction pressure degraded asset availability", | |
| section_bias=ASSET_CATEGORY_BIAS.get(actor_id, {}).get("sanction", ()), | |
| max_assets=1, | |
| max_status="malfunctioning", | |
| ) | |
| elif action.type in {"defend", "oversight_review"}: | |
| defended_owner = actor_id if action.type == "oversight_review" or action.target not in AGENT_IDS else action.target | |
| self._restore_assets( | |
| world, | |
| owner=defended_owner, | |
| intensity=18.0 if action.type == "defend" else 12.0, | |
| reason=f"{actor_id} {action.type} hardened critical assets", | |
| section_bias=ASSET_CATEGORY_BIAS.get(actor_id, {}).get("defend", ()), | |
| max_assets=2, | |
| ) | |
| def _apply_signal_asset_effects(self, world: WorldState, signal: ExternalSignal) -> None: | |
| text = f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}".lower() | |
| severity = max(0.0, min(1.0, signal.severity)) | |
| if self._signal_mentions(text, "strike", "rocket", "missile", "drone", "attack", "explosion", "raid"): | |
| for owner in self._infer_affected_agents(signal): | |
| self._damage_assets( | |
| world, | |
| owner=owner, | |
| intensity=24.0 * severity, | |
| reason=f"fresh reporting from {signal.source}", | |
| section_bias=("front", "airbase", "base", "launch-network", "launch-zone", "energy-port"), | |
| max_assets=1, | |
| max_status="malfunctioning", | |
| ) | |
| if self._signal_mentions(text, "shipping", "tanker", "hormuz", "port", "terminal", "oil"): | |
| for owner in ("us", "gulf", "iran"): | |
| self._damage_assets( | |
| world, | |
| owner=owner, | |
| intensity=16.0 * severity, | |
| reason=f"{signal.source} reported shipping disruption", | |
| section_bias=("port", "energy", "energy-port", "chokepoint", "maritime-box"), | |
| max_assets=1, | |
| max_status="malfunctioning", | |
| ) | |
| if self._signal_mentions(text, "cyber", "outage", "blackout", "internet"): | |
| for owner in self._infer_affected_agents(signal): | |
| self._damage_assets( | |
| world, | |
| owner=owner, | |
| intensity=14.0 * severity, | |
| reason=f"{signal.source} reported systems disruption", | |
| section_bias=("command", "command-system", "command-network", "civil-center"), | |
| max_assets=1, | |
| max_status="malfunctioning", | |
| ) | |
| def _damage_assets( | |
| self, | |
| world: WorldState, | |
| *, | |
| owner: str, | |
| intensity: float, | |
| reason: str, | |
| section_bias: tuple[str, ...], | |
| max_assets: int, | |
| max_status: str | None = None, | |
| ) -> None: | |
| if owner not in world.asset_state: | |
| return | |
| selected_assets = self._select_assets_for_effect(world, owner, section_bias, max_assets=max_assets, reverse=False) | |
| for asset in selected_assets: | |
| current = world.asset_state[owner][asset.asset_id] | |
| previous_status = current.status | |
| current.health = round(self._clamp_percent(current.health - intensity), 2) | |
| current.operational_load = round(min(100.0, current.operational_load + intensity * 0.7), 2) | |
| current.status = self._derive_asset_status(current.health, current.operational_load, max_status=max_status) | |
| current.last_change_reason = reason | |
| self._apply_asset_metric_impacts(world, owner, current, direction="damage") | |
| if current.status != previous_status: | |
| self._register_latent_event( | |
| world, | |
| LatentEvent( | |
| event_id=f"asset-{owner}-{asset.asset_id}-{world.turn}-{len(world.latent_events)}", | |
| topic="shipping" if any(token in current.section.lower() or token in current.category.lower() for token in ("port", "maritime", "chokepoint", "energy")) else "border", | |
| status="active", | |
| severity=min(1.0, max(0.35, (100.0 - current.health) / 100.0)), | |
| visibility="public", | |
| reliability=0.7, | |
| origin="asset-state", | |
| affected_agents=[owner], | |
| affected_assets=[current.asset_id], | |
| started_at_turn=world.turn, | |
| last_updated_turn=world.turn, | |
| decay_rate=0.06, | |
| narratives=self._default_latent_event_narratives( | |
| "shipping" if any(token in current.section.lower() or token in current.category.lower() for token in ("port", "maritime", "chokepoint", "energy")) else "border", | |
| f"{current.name} is now {current.status} after {reason}.", | |
| ), | |
| ), | |
| ) | |
| def _restore_assets( | |
| self, | |
| world: WorldState, | |
| *, | |
| owner: str, | |
| intensity: float, | |
| reason: str, | |
| section_bias: tuple[str, ...], | |
| max_assets: int, | |
| ) -> None: | |
| if owner not in world.asset_state: | |
| return | |
| selected_assets = self._select_assets_for_effect(world, owner, section_bias, max_assets=max_assets, reverse=True) | |
| for asset in selected_assets: | |
| current = world.asset_state[owner][asset.asset_id] | |
| current.health = round(self._clamp_percent(current.health + intensity), 2) | |
| current.operational_load = round(max(0.0, current.operational_load - intensity * 0.8), 2) | |
| current.status = self._derive_asset_status(current.health, current.operational_load) | |
| current.last_change_reason = reason | |
| self._apply_asset_metric_impacts(world, owner, current, direction="repair") | |
| def _select_assets_for_effect( | |
| self, | |
| world: WorldState, | |
| owner: str, | |
| section_bias: tuple[str, ...], | |
| *, | |
| max_assets: int, | |
| reverse: bool, | |
| ) -> list[AssetCondition]: | |
| assets = list(world.asset_state.get(owner, {}).values()) | |
| if not assets: | |
| return [] | |
| bias_terms = tuple(term.lower() for term in section_bias) | |
| def asset_key(asset: AssetCondition) -> tuple[float, float, str]: | |
| priority = float(ASSET_PRIORITY_SCORES.get(asset.criticality.lower(), 1)) | |
| bias_score = 1.0 if any(term in asset.category.lower() or term in asset.section.lower() for term in bias_terms) else 0.0 | |
| damage_score = 100.0 - asset.health if reverse else asset.health | |
| return (priority + bias_score, damage_score, asset.name) | |
| sorted_assets = sorted(assets, key=asset_key, reverse=True) | |
| if reverse: | |
| sorted_assets = [asset for asset in sorted_assets if asset.status != "operational"] or sorted_assets | |
| else: | |
| sorted_assets = [asset for asset in sorted_assets if asset.status != "destroyed"] or sorted_assets | |
| return sorted_assets[:max_assets] | |
| def _derive_asset_status(health: float, operational_load: float, max_status: str | None = None) -> str: | |
| derived = "operational" | |
| for threshold, status in ASSET_STATUS_DAMAGE_THRESHOLDS: | |
| if health <= threshold: | |
| derived = status | |
| break | |
| if derived == "operational" and operational_load >= 72.0: | |
| derived = "malfunctioning" | |
| if max_status == "malfunctioning" and derived == "destroyed": | |
| return "malfunctioning" | |
| return derived | |
| def _apply_asset_metric_impacts( | |
| self, | |
| world: WorldState, | |
| owner: str, | |
| asset: AssetCondition, | |
| *, | |
| direction: str, | |
| ) -> None: | |
| scale = -1.0 if direction == "damage" else 0.6 | |
| category = f"{asset.section} {asset.category}".lower() | |
| owner_metric_map = { | |
| "us": { | |
| "regional_access": ("base", "airbase", "logistics", "command"), | |
| "shipping_security": ("naval", "port", "maritime", "chokepoint"), | |
| "force_posture": ("base", "airbase", "command"), | |
| "domestic_support": ("command", "capital"), | |
| }, | |
| "israel": { | |
| "homeland_security": ("front", "civil", "infrastructure", "air-defense"), | |
| "northern_deterrence": ("front", "launch", "command"), | |
| "reserve_endurance": ("depth", "logistics", "port"), | |
| "us_resupply_confidence": ("port", "offshore", "command"), | |
| }, | |
| "iran": { | |
| "regime_stability": ("command", "capital", "civil"), | |
| "proxy_corridor": ("corridor", "logistics", "front"), | |
| "hormuz_leverage": ("chokepoint", "maritime", "energy-port"), | |
| "deterrence_credibility": ("front", "command", "maritime"), | |
| }, | |
| "hezbollah": { | |
| "launch_survivability": ("launch", "front", "depth"), | |
| "logistics_depth": ("logistics", "corridor", "reserve"), | |
| "political_cover": ("political", "command", "civil"), | |
| "resistance_credibility": ("launch", "front", "command"), | |
| }, | |
| "gulf": { | |
| "shipping_continuity": ("port", "chokepoint", "maritime"), | |
| "infrastructure_security": ("energy", "capital", "port"), | |
| "investor_confidence": ("energy", "capital", "port"), | |
| "diplomatic_flexibility": ("capital", "port", "chokepoint"), | |
| }, | |
| "oversight": { | |
| "runaway_risk": ("chokepoint", "theater", "civil"), | |
| "intervention_legitimacy": ("civil", "theater"), | |
| "autonomy_balance": ("theater", "command"), | |
| "trace_clarity": ("command", "civil", "theater"), | |
| }, | |
| } | |
| for metric, keywords in owner_metric_map.get(owner, {}).items(): | |
| if any(keyword in category for keyword in keywords): | |
| magnitude = 2.8 if direction == "damage" else 1.4 | |
| self._bump_actor_metric(world, owner, metric, scale * magnitude) | |
| def _asset_pressure(self, world: WorldState, agent_id: str) -> float: | |
| assets = list(world.asset_state.get(agent_id, {}).values()) | |
| if not assets: | |
| return 0.0 | |
| weighted_damage = 0.0 | |
| max_weight = 0.0 | |
| for asset in assets: | |
| priority = float(ASSET_PRIORITY_SCORES.get(asset.criticality.lower(), 1)) | |
| weighted_damage += priority * max(0.0, 100.0 - asset.health) | |
| max_weight += priority * 100.0 | |
| if max_weight == 0.0: | |
| return 0.0 | |
| return min(1.0, weighted_damage / max_weight) | |
| def _compute_rewards(self, world: WorldState, episode: EpisodeMetadata) -> dict[str, RewardBreakdown]: | |
| rewards: dict[str, RewardBreakdown] = {} | |
| recent_actions = {action.actor: action for action in world.last_actions} | |
| for agent_id in AGENT_IDS: | |
| world.ema_tension[agent_id] = round( | |
| 0.08 * world.tension_level + 0.92 * world.ema_tension.get(agent_id, world.tension_level), | |
| 3, | |
| ) | |
| rewards["us"] = self._reward_us(world, episode, recent_actions) | |
| rewards["israel"] = self._reward_israel(world, episode, recent_actions) | |
| rewards["iran"] = self._reward_iran(world, episode, recent_actions) | |
| rewards["hezbollah"] = self._reward_hezbollah(world, episode, recent_actions) | |
| rewards["gulf"] = self._reward_gulf(world, episode, recent_actions) | |
| rewards["oversight"] = self._reward_oversight(world, episode, recent_actions) | |
| return rewards | |
| def _clamp_percent(value: float) -> float: | |
| return max(0.0, min(100.0, value)) | |
| def _clamp_unit(value: float) -> float: | |
| return max(-1.0, min(1.0, value)) | |
| def _target_score(self, value: float, target: float, tolerance: float) -> float: | |
| return self._clamp_unit(1.0 - abs(value - target) / max(tolerance, 1.0)) | |
| def _state_score(self, world: WorldState, agent_id: str, metric: str, target: float, tolerance: float) -> float: | |
| return self._target_score(self._actor_metric(world, agent_id, metric), target, tolerance) | |
| def _alliance_score(self, world: WorldState, agent_id: str) -> float: | |
| preferred = set(AGENT_PREFERRED_COALITIONS.get(agent_id, ())) | |
| allies = set(world.coalition_graph.get(agent_id, [])) | |
| if not preferred: | |
| return self._target_score(float(len(allies)), 0.0, 1.5) | |
| return self._clamp_unit((2.0 * len(allies & preferred) / max(len(preferred), 1)) - 1.0) | |
| def _selective_alignment_score(self, world: WorldState, agent_id: str, desired_allies: float) -> float: | |
| return self._target_score(float(len(world.coalition_graph.get(agent_id, []))), desired_allies, 1.6) | |
| def _behavior_score(self, world: WorldState, agent_id: str, recent_actions: dict[str, AgentAction]) -> float: | |
| baseline = self._clamp_unit(world.behavioral_consistency.get(agent_id, 0.5) * 2.0 - 1.0) | |
| action = recent_actions.get(agent_id) | |
| if action is None: | |
| return baseline | |
| doctrinal_fit = AGENT_ACTION_ALIGNMENT[agent_id].get(action.type, 0.0) | |
| return self._clamp_unit(baseline * 0.6 + doctrinal_fit * 0.4) | |
| def _reward_metric(self, world: WorldState, agent_id: str, metric: str) -> float: | |
| config = AGENT_REWARD_METRIC_CONFIGS[agent_id][metric] | |
| return self._state_score(world, agent_id, metric, config.target, config.tolerance) | |
| def _action_response_score(self, world: WorldState, agent_id: str, recent_actions: dict[str, AgentAction]) -> float: | |
| action = recent_actions.get(agent_id) | |
| if action is None: | |
| return 0.0 | |
| effects = AGENT_STATE_ACTION_EFFECTS.get(agent_id, {}).get(action.type, {}) | |
| if not effects: | |
| return -0.25 | |
| weighted_total = 0.0 | |
| total_weight = 0.0 | |
| metric_configs = AGENT_REWARD_METRIC_CONFIGS[agent_id] | |
| for metric, delta in effects.items(): | |
| config = metric_configs.get(metric) | |
| if config is None: | |
| continue | |
| metric_score = self._reward_metric(world, agent_id, metric) | |
| direction = 1.0 if delta >= 0 else -1.0 | |
| magnitude = min(abs(delta) / 4.0, 1.0) | |
| weight = config.weight * magnitude | |
| weighted_total += direction * metric_score * weight | |
| total_weight += weight | |
| if total_weight == 0.0: | |
| return -0.25 | |
| return self._clamp_unit(weighted_total / total_weight) | |
| def _blend_reward_total( | |
| metric_weights: dict[str, float], | |
| metric_scores: dict[str, float], | |
| behavior: float, | |
| action_response: float, | |
| ) -> float: | |
| metric_total = sum(metric_weights[metric] * metric_scores[metric] for metric in metric_scores) | |
| total_weight = sum(metric_weights.values()) + 0.10 + 0.08 | |
| return max(-1.0, min(1.0, (metric_total + 0.10 * behavior + 0.08 * action_response) / total_weight)) | |
| def _action_pressure( | |
| self, | |
| recent_actions: dict[str, AgentAction], | |
| actors: tuple[str, ...], | |
| escalatory_types: set[str], | |
| ) -> float: | |
| hits = sum( | |
| 1 | |
| for actor in actors | |
| if actor in recent_actions and recent_actions[actor].type in escalatory_types | |
| ) | |
| return hits / max(len(actors), 1) | |
| def _finalize_reward( | |
| self, | |
| *, | |
| episode: EpisodeMetadata, | |
| turn: int, | |
| coalition: float, | |
| escalation: float, | |
| market: float, | |
| behavior: float, | |
| total: float, | |
| goal_terms: dict[str, float], | |
| ) -> RewardBreakdown: | |
| scale = 1.0 if episode.dense_rewards or turn == 0 or turn % 10 == 0 else 0.35 | |
| scaled_goal_terms = {name: round(self._clamp_unit(value) * scale, 3) for name, value in goal_terms.items()} | |
| return RewardBreakdown( | |
| coalition_stability=round(self._clamp_unit(coalition) * scale, 3), | |
| escalation_penalty=round(self._clamp_unit(escalation) * scale, 3), | |
| market_gain=round(self._clamp_unit(market) * scale, 3), | |
| behavioral_consistency=round(self._clamp_unit(behavior) * scale, 3), | |
| goal_terms=scaled_goal_terms, | |
| total=round(self._clamp_unit(total) * scale, 3), | |
| ) | |
| def _reward_us( | |
| self, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| recent_actions: dict[str, AgentAction], | |
| ) -> RewardBreakdown: | |
| metric_weights = { | |
| metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["us"].items() | |
| } | |
| regional_access = self._reward_metric(world, "us", "regional_access") | |
| shipping_stability = self._reward_metric(world, "us", "shipping_security") | |
| domestic_resilience = self._reward_metric(world, "us", "domestic_support") | |
| force_posture = self._reward_metric(world, "us", "force_posture") | |
| behavior = self._behavior_score(world, "us", recent_actions) | |
| action_response = self._action_response_score(world, "us", recent_actions) | |
| total = self._blend_reward_total( | |
| metric_weights, | |
| { | |
| "regional_access": regional_access, | |
| "shipping_security": shipping_stability, | |
| "domestic_support": domestic_resilience, | |
| "force_posture": force_posture, | |
| }, | |
| behavior, | |
| action_response, | |
| ) | |
| return self._finalize_reward( | |
| episode=episode, | |
| turn=world.turn, | |
| coalition=regional_access, | |
| escalation=domestic_resilience, | |
| market=shipping_stability, | |
| behavior=behavior, | |
| total=total, | |
| goal_terms={ | |
| "regional_access": regional_access, | |
| "shipping_stability": shipping_stability, | |
| "domestic_resilience": domestic_resilience, | |
| "force_posture": force_posture, | |
| "action_response": action_response, | |
| }, | |
| ) | |
| def _reward_israel( | |
| self, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| recent_actions: dict[str, AgentAction], | |
| ) -> RewardBreakdown: | |
| metric_weights = { | |
| metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["israel"].items() | |
| } | |
| homeland_security = self._reward_metric(world, "israel", "homeland_security") | |
| northern_deterrence = self._reward_metric(world, "israel", "northern_deterrence") | |
| reserve_endurance = self._reward_metric(world, "israel", "reserve_endurance") | |
| us_backstop = self._reward_metric(world, "israel", "us_resupply_confidence") | |
| behavior = self._behavior_score(world, "israel", recent_actions) | |
| action_response = self._action_response_score(world, "israel", recent_actions) | |
| total = self._blend_reward_total( | |
| metric_weights, | |
| { | |
| "homeland_security": homeland_security, | |
| "northern_deterrence": northern_deterrence, | |
| "us_resupply_confidence": us_backstop, | |
| "reserve_endurance": reserve_endurance, | |
| }, | |
| behavior, | |
| action_response, | |
| ) | |
| return self._finalize_reward( | |
| episode=episode, | |
| turn=world.turn, | |
| coalition=us_backstop, | |
| escalation=homeland_security, | |
| market=reserve_endurance, | |
| behavior=behavior, | |
| total=total, | |
| goal_terms={ | |
| "homeland_security": homeland_security, | |
| "northern_deterrence": northern_deterrence, | |
| "us_backstop": us_backstop, | |
| "reserve_endurance": reserve_endurance, | |
| "action_response": action_response, | |
| }, | |
| ) | |
| def _reward_iran( | |
| self, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| recent_actions: dict[str, AgentAction], | |
| ) -> RewardBreakdown: | |
| metric_weights = { | |
| metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["iran"].items() | |
| } | |
| regime_survival = self._reward_metric(world, "iran", "regime_stability") | |
| proxy_axis_integrity = self._reward_metric(world, "iran", "proxy_corridor") | |
| chokepoint_leverage = self._reward_metric(world, "iran", "hormuz_leverage") | |
| deterrence_credibility = self._reward_metric(world, "iran", "deterrence_credibility") | |
| behavior = self._behavior_score(world, "iran", recent_actions) | |
| action_response = self._action_response_score(world, "iran", recent_actions) | |
| total = self._blend_reward_total( | |
| metric_weights, | |
| { | |
| "regime_stability": regime_survival, | |
| "proxy_corridor": proxy_axis_integrity, | |
| "hormuz_leverage": chokepoint_leverage, | |
| "deterrence_credibility": deterrence_credibility, | |
| }, | |
| behavior, | |
| action_response, | |
| ) | |
| return self._finalize_reward( | |
| episode=episode, | |
| turn=world.turn, | |
| coalition=proxy_axis_integrity, | |
| escalation=regime_survival, | |
| market=chokepoint_leverage, | |
| behavior=behavior, | |
| total=total, | |
| goal_terms={ | |
| "regime_survival": regime_survival, | |
| "proxy_axis_integrity": proxy_axis_integrity, | |
| "chokepoint_leverage": chokepoint_leverage, | |
| "deterrence_credibility": deterrence_credibility, | |
| "action_response": action_response, | |
| }, | |
| ) | |
| def _reward_hezbollah( | |
| self, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| recent_actions: dict[str, AgentAction], | |
| ) -> RewardBreakdown: | |
| metric_weights = { | |
| metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["hezbollah"].items() | |
| } | |
| launch_survivability = self._reward_metric(world, "hezbollah", "launch_survivability") | |
| logistics_depth = self._reward_metric(world, "hezbollah", "logistics_depth") | |
| political_cover = self._reward_metric(world, "hezbollah", "political_cover") | |
| resistance_credibility = self._reward_metric(world, "hezbollah", "resistance_credibility") | |
| iran_backing = self._clamp_unit(0.6 * self._alliance_score(world, "hezbollah") + 0.4 * logistics_depth) | |
| behavior = self._behavior_score(world, "hezbollah", recent_actions) | |
| action_response = self._action_response_score(world, "hezbollah", recent_actions) | |
| total = self._blend_reward_total( | |
| metric_weights, | |
| { | |
| "launch_survivability": launch_survivability, | |
| "logistics_depth": logistics_depth, | |
| "resistance_credibility": resistance_credibility, | |
| "political_cover": political_cover, | |
| }, | |
| behavior, | |
| action_response, | |
| ) | |
| return self._finalize_reward( | |
| episode=episode, | |
| turn=world.turn, | |
| coalition=iran_backing, | |
| escalation=launch_survivability, | |
| market=resistance_credibility, | |
| behavior=behavior, | |
| total=total, | |
| goal_terms={ | |
| "iran_backing": iran_backing, | |
| "launch_survivability": launch_survivability, | |
| "logistics_depth": logistics_depth, | |
| "political_cover": political_cover, | |
| "resistance_credibility": resistance_credibility, | |
| "action_response": action_response, | |
| }, | |
| ) | |
| def _reward_gulf( | |
| self, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| recent_actions: dict[str, AgentAction], | |
| ) -> RewardBreakdown: | |
| metric_weights = { | |
| metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["gulf"].items() | |
| } | |
| shipping_continuity = self._reward_metric(world, "gulf", "shipping_continuity") | |
| infrastructure_security = self._reward_metric(world, "gulf", "infrastructure_security") | |
| investor_confidence = self._reward_metric(world, "gulf", "investor_confidence") | |
| diplomatic_flexibility = self._reward_metric(world, "gulf", "diplomatic_flexibility") | |
| behavior = self._behavior_score(world, "gulf", recent_actions) | |
| action_response = self._action_response_score(world, "gulf", recent_actions) | |
| total = self._blend_reward_total( | |
| metric_weights, | |
| { | |
| "shipping_continuity": shipping_continuity, | |
| "investor_confidence": investor_confidence, | |
| "infrastructure_security": infrastructure_security, | |
| "diplomatic_flexibility": diplomatic_flexibility, | |
| }, | |
| behavior, | |
| action_response, | |
| ) | |
| return self._finalize_reward( | |
| episode=episode, | |
| turn=world.turn, | |
| coalition=diplomatic_flexibility, | |
| escalation=shipping_continuity, | |
| market=investor_confidence, | |
| behavior=behavior, | |
| total=total, | |
| goal_terms={ | |
| "shipping_continuity": shipping_continuity, | |
| "infrastructure_security": infrastructure_security, | |
| "investor_confidence": investor_confidence, | |
| "diplomatic_flexibility": diplomatic_flexibility, | |
| "action_response": action_response, | |
| }, | |
| ) | |
| def _reward_oversight( | |
| self, | |
| world: WorldState, | |
| episode: EpisodeMetadata, | |
| recent_actions: dict[str, AgentAction], | |
| ) -> RewardBreakdown: | |
| metric_weights = { | |
| metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["oversight"].items() | |
| } | |
| risk_reduction = self._reward_metric(world, "oversight", "runaway_risk") | |
| intervention_legitimacy = self._reward_metric(world, "oversight", "intervention_legitimacy") | |
| autonomy_preservation = self._reward_metric(world, "oversight", "autonomy_balance") | |
| trace_clarity = self._reward_metric(world, "oversight", "trace_clarity") | |
| behavior = self._behavior_score(world, "oversight", recent_actions) | |
| action_response = self._action_response_score(world, "oversight", recent_actions) | |
| total = self._blend_reward_total( | |
| metric_weights, | |
| { | |
| "runaway_risk": risk_reduction, | |
| "autonomy_balance": autonomy_preservation, | |
| "intervention_legitimacy": intervention_legitimacy, | |
| "trace_clarity": trace_clarity, | |
| }, | |
| behavior, | |
| action_response, | |
| ) | |
| return self._finalize_reward( | |
| episode=episode, | |
| turn=world.turn, | |
| coalition=autonomy_preservation, | |
| escalation=risk_reduction, | |
| market=trace_clarity, | |
| behavior=behavior, | |
| total=total, | |
| goal_terms={ | |
| "runaway_risk_reduction": risk_reduction, | |
| "intervention_legitimacy": intervention_legitimacy, | |
| "autonomy_preservation": autonomy_preservation, | |
| "trace_clarity": trace_clarity, | |
| "action_response": action_response, | |
| }, | |
| ) | |
| def _update_behavioral_consistency(self, world: WorldState, agent_id: str, action: AgentAction) -> float: | |
| intent_score = self._intent_score(action.summary) | |
| action_score = ACTION_STANCE_SCORES[action.type] | |
| observable_consistency = 1.0 - min(2.0, abs(intent_score - action_score)) / 2.0 | |
| prior = world.behavioral_consistency.get(agent_id, 0.7) | |
| return round(max(0.0, min(1.0, prior * 0.65 + observable_consistency * 0.35)), 3) | |
| def _intent_score(summary: str) -> float: | |
| lowered = summary.lower() | |
| cooperative_hits = sum(marker in lowered for marker in COOPERATIVE_INTENT_MARKERS) | |
| escalatory_hits = sum(marker in lowered for marker in ESCALATORY_INTENT_MARKERS) | |
| total_hits = cooperative_hits + escalatory_hits | |
| if total_hits == 0: | |
| return 0.0 | |
| return max(-1.0, min(1.0, (escalatory_hits - cooperative_hits) / total_hits)) | |