| | from __future__ import annotations |
| |
|
| | import hashlib |
| | import random |
| | from datetime import datetime, timedelta, timezone |
| | from uuid import uuid4 |
| |
|
| | from trenches_env.agents import AGENT_IDS, AGENT_PROFILES |
| | from trenches_env.entity_knowledge import load_entity_pack |
| | from trenches_env.historical_replay import ( |
| | get_historical_replay, |
| | severity_distance, |
| | severity_score, |
| | ) |
| | from trenches_env.model_runtime import build_entity_model_bindings |
| | from trenches_env.models import ( |
| | ActionLogEntry, |
| | AgentAction, |
| | AgentBeliefEntry, |
| | AgentBeliefState, |
| | AgentObservation, |
| | AssetCondition, |
| | BlackSwanEvent, |
| | DataSourceContext, |
| | EpisodeMetadata, |
| | EntityModelBinding, |
| | ExternalSignal, |
| | HistoricalEvent, |
| | HistoricalReplayState, |
| | IntelSnippet, |
| | LatentEvent, |
| | LatentEventNarrative, |
| | LiveControlRequest, |
| | ObservationProjection, |
| | OversightIntervention, |
| | Prediction, |
| | PredictionAssessment, |
| | ProviderDiagnosticsResponse, |
| | ReactionActorOutcome, |
| | ReactionLogEntry, |
| | RewardBreakdown, |
| | SessionState, |
| | SourcePacket, |
| | SourceMonitorReport, |
| | StepTrace, |
| | StepSessionRequest, |
| | StepSessionResponse, |
| | WorldState, |
| | ) |
| | from trenches_env.provider_runtime import ProviderDecisionError, ProviderDecisionRequest, ProviderDecisionRuntime |
| | from trenches_env.rl import ( |
| | AGENT_ALLOWED_ACTIONS, |
| | AGENT_ACTION_ALIGNMENT, |
| | AGENT_ACTION_IMPACTS, |
| | AGENT_PREFERRED_COALITIONS, |
| | AGENT_REWARD_METRIC_CONFIGS, |
| | AGENT_STATE_ACTION_EFFECTS, |
| | AGENT_STATE_BASELINES, |
| | DEFAULT_ACTION_IMPACTS, |
| | DEFAULT_MAX_TURNS, |
| | DEFAULT_TRAINING_STAGE, |
| | TRAINING_STAGE_CONFIGS, |
| | ) |
| | from trenches_env.source_catalog import get_source_by_id, get_sources_for_agent |
| | from trenches_env.source_bundles import AGENT_LIVE_SOURCE_BUNDLES, AGENT_TRAINING_SOURCE_BUNDLES |
| | from trenches_env.source_ingestion import SourceHarvester, source_ttl_seconds |
| | from trenches_env.source_monitor import build_source_monitor_report |
| | from trenches_env.scenarios import ( |
| | ScenarioAssetImpact, |
| | ScenarioDefinition, |
| | ScenarioLatentEvent, |
| | ScenarioSignal, |
| | get_scenario_definition, |
| | list_scenario_definitions, |
| | scenario_signals_for_turn, |
| | ) |
| |
|
| | ACTION_STANCE_SCORES: dict[str, float] = { |
| | "hold": -0.4, |
| | "negotiate": -0.8, |
| | "sanction": 0.3, |
| | "strike": 1.0, |
| | "defend": -0.1, |
| | "intel_query": -0.5, |
| | "mobilize": 0.6, |
| | "deceive": 0.8, |
| | "oversight_review": -0.6, |
| | } |
| |
|
| | COOPERATIVE_INTENT_MARKERS = ( |
| | "deconflict", |
| | "de-escal", |
| | "ceasefire", |
| | "stabil", |
| | "protect", |
| | "defend", |
| | "monitor", |
| | "assess", |
| | "review", |
| | "guarantee", |
| | "humanitarian", |
| | "contain", |
| | "hold", |
| | "query", |
| | ) |
| |
|
| | ESCALATORY_INTENT_MARKERS = ( |
| | "retaliat", |
| | "punish", |
| | "strike", |
| | "degrade", |
| | "launch", |
| | "coerce", |
| | "pressure", |
| | "mobilize", |
| | "disrupt", |
| | "sanction", |
| | "deceive", |
| | ) |
| |
|
| | MAX_PUBLIC_BRIEF_ITEMS = 4 |
| | MAX_PRIVATE_BRIEF_ITEMS = 6 |
| | MAX_INTEL_SUMMARY_CHARS = 220 |
| | MAX_TRAINING_SOURCE_BRIEFS = 2 |
| | MAX_LIVE_SOURCE_BRIEFS = 2 |
| | MAX_AUTO_REACTION_SIGNALS = 8 |
| | MIN_LIVE_AUTO_STEP_MS = 1_000 |
| | FORECAST_REWARD_BLEND = 0.35 |
| | LOW_FIDELITY_SOURCE_KINDS = {"telegram", "scrape", "video"} |
| | SOURCE_KIND_BASE_RELIABILITY = { |
| | "structured": 0.8, |
| | "api": 0.76, |
| | "rss": 0.7, |
| | "scrape": 0.58, |
| | "telegram": 0.46, |
| | "video": 0.38, |
| | } |
| | SOURCE_DELIVERY_RELIABILITY = { |
| | "training_core": 0.06, |
| | "live_demo": 0.0, |
| | } |
| | CONTRADICTION_TOPIC_LABELS = { |
| | "shipping": "shipping disruption", |
| | "commodities": "commodity-market disruption", |
| | "border": "border escalation", |
| | "corridor": "corridor interdiction", |
| | "domestic": "domestic stability", |
| | "cyber": "cyber disruption", |
| | "market": "market dislocation", |
| | "humanitarian": "humanitarian fallout", |
| | "diplomacy": "diplomatic signaling", |
| | } |
| | LATENT_EVENT_TOPIC_KEYWORDS = { |
| | "shipping": ("shipping", "tanker", "hormuz", "oil", "maritime", "terminal", "seaport", "harbor", "ais", "vessel"), |
| | "commodities": ( |
| | "gold", |
| | "silver", |
| | "copper", |
| | "lithium", |
| | "nickel", |
| | "uranium", |
| | "phosphate", |
| | "bauxite", |
| | "rare earth", |
| | "rare-earth", |
| | "commodity", |
| | "mineral", |
| | "metals", |
| | "natural gas", |
| | "lng", |
| | ), |
| | "border": ("rocket", "missile", "border", "galilee", "idf", "drone", "lebanon", "launch", "intercept"), |
| | "corridor": ("corridor", "bekaa", "syria", "transfer", "logistics", "interdiction", "sustainment"), |
| | "domestic": ("sanction", "unrest", "protest", "inflation", "currency", "domestic", "regime"), |
| | "cyber": ("cyber", "outage", "blackout", "cable", "internet", "network", "malware"), |
| | "market": ("market", "investor", "trade", "stocks", "shares", "bond", "premium", "insurance"), |
| | "humanitarian": ("humanitarian", "aid", "displacement", "relief", "civilian", "refugee", "shelter"), |
| | "diplomacy": ("ceasefire", "talk", "negotiat", "summit", "diplomat", "mediat", "channel"), |
| | } |
| | LATENT_EVENT_LINKS = { |
| | "shipping": ("market",), |
| | "commodities": ("market", "shipping"), |
| | "border": ("humanitarian",), |
| | "corridor": ("border",), |
| | "cyber": ("market",), |
| | } |
| | BELIEF_TOPIC_PRIORS = { |
| | "us": {"shipping": 0.12, "commodities": 0.09, "diplomacy": 0.08, "market": 0.06, "border": 0.04}, |
| | "israel": {"border": 0.14, "corridor": 0.08, "diplomacy": 0.04}, |
| | "iran": {"corridor": 0.14, "domestic": 0.1, "shipping": 0.06, "commodities": 0.08}, |
| | "hezbollah": {"border": 0.12, "corridor": 0.1, "domestic": 0.04}, |
| | "gulf": {"shipping": 0.16, "commodities": 0.12, "market": 0.12, "diplomacy": 0.06}, |
| | "oversight": {"cyber": 0.12, "shipping": 0.08, "commodities": 0.08, "border": 0.08, "domestic": 0.08}, |
| | } |
| | BELIEF_PERSISTENCE_FLOOR = 0.12 |
| | BELIEF_MAX_STALE_TURNS = 4 |
| | BELIEF_CONFIRMATION_BONUS = 0.03 |
| | BELIEF_CONTRADICTION_PENALTY = 0.14 |
| | PUBLIC_STATE_SYNC_FACTORS = { |
| | "support": 0.62, |
| | "confidence": 0.6, |
| | "clarity": 0.58, |
| | "resilience": 0.66, |
| | "security": 0.74, |
| | "continuity": 0.74, |
| | "stability": 0.72, |
| | "default": 0.7, |
| | } |
| | ASSET_DECISION_SOURCE_LIMITS: dict[str, tuple[int, int]] = { |
| | "large": (8, 8), |
| | "medium-large": (6, 6), |
| | "medium": (5, 5), |
| | } |
| | PHYSICAL_ASSET_SECTIONS = ( |
| | "locations", |
| | "fronts", |
| | "infrastructure", |
| | "strategic_sites", |
| | "chokepoints", |
| | "geospatial_anchors", |
| | "alliance_anchors", |
| | ) |
| | ASSET_PRIORITY_SCORES = { |
| | "critical": 4, |
| | "high": 3, |
| | "medium": 2, |
| | "tracked": 1, |
| | "linked": 1, |
| | } |
| | ASSET_STATUS_DAMAGE_THRESHOLDS = ( |
| | (25.0, "destroyed"), |
| | (55.0, "malfunctioning"), |
| | (85.0, "degraded"), |
| | ) |
| |
|
| | AGENT_PRIMARY_ADVERSARIES: dict[str, tuple[str, ...]] = { |
| | "us": ("iran",), |
| | "israel": ("hezbollah", "iran"), |
| | "iran": ("israel", "us", "gulf"), |
| | "hezbollah": ("israel",), |
| | "gulf": ("iran",), |
| | "oversight": (), |
| | } |
| |
|
| | AGENT_TOOL_LABELS: dict[str, str] = { |
| | "us": "shipping security, coalition access, and force-posture tools", |
| | "israel": "air-defense, reserve, and northern-front tools", |
| | "iran": "proxy, chokepoint, and regime-security tools", |
| | "hezbollah": "launch-survivability, logistics, and deniable-pressure tools", |
| | "gulf": "shipping, infrastructure, and market-stability tools", |
| | "oversight": "trace, intervention, and autonomy-balancing tools", |
| | } |
| | ASSET_CATEGORY_BIAS: dict[str, dict[str, tuple[str, ...]]] = { |
| | "us": { |
| | "strike": ("energy", "port", "chokepoint", "command", "launch-zone"), |
| | "defend": ("airbase", "base", "naval", "logistics-port", "base-network", "command-system"), |
| | "deceive": ("command", "command-system", "radar", "air-defense", "theater-anchor"), |
| | "sanction": ("energy", "port", "logistics-network", "energy-network"), |
| | }, |
| | "israel": { |
| | "strike": ("launch-network", "launch-zone", "corridor-node", "logistics-network", "logistics"), |
| | "defend": ("command", "front", "civil-core", "infrastructure-zone", "offshore-zone", "air-defense"), |
| | "deceive": ("launch-zone", "command-network", "corridor-node"), |
| | }, |
| | "iran": { |
| | "strike": ("airbase", "base", "energy", "energy-port", "port", "chokepoint"), |
| | "defend": ("command", "energy-network", "maritime-control-zone", "command-and-industry-network"), |
| | "deceive": ("chokepoint", "port", "naval", "maritime-box", "maritime-access"), |
| | }, |
| | "hezbollah": { |
| | "strike": ("front", "civil-core", "command", "offshore-zone", "infrastructure-zone"), |
| | "defend": ("launch-network", "front", "corridor-node", "command-network", "logistics-network"), |
| | "deceive": ("launch-zone", "command-network", "logistics-network"), |
| | }, |
| | "gulf": { |
| | "strike": ("energy-network", "energy", "energy-port", "port", "chokepoint", "base"), |
| | "defend": ("port", "energy", "energy-port", "capital", "infrastructure-protection", "chokepoint"), |
| | "deceive": ("energy-port", "port", "chokepoint"), |
| | "sanction": ("energy", "energy-port", "port", "logistics-network"), |
| | }, |
| | "oversight": { |
| | "defend": ("chokepoint", "theater", "civil-center"), |
| | }, |
| | } |
| |
|
| |
|
| | class FogOfWarDiplomacyEnv: |
| | """OpenEnv-compatible scaffolding for the crisis simulator. |
| | |
| | The concrete OpenEnv inheritance point can be added once the package |
| | dependency is pinned. For now this class owns the transition logic, state |
| | construction, and observation projection needed by the session API. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | source_harvester: SourceHarvester | None = None, |
| | provider_runtime: ProviderDecisionRuntime | None = None, |
| | ) -> None: |
| | self._rng = random.Random() |
| | self._source_harvester = source_harvester or SourceHarvester(auto_start=False) |
| | self._provider_runtime = provider_runtime or ProviderDecisionRuntime() |
| | self._source_warm_start_enabled = False |
| |
|
| | def enable_source_warm_start(self) -> "FogOfWarDiplomacyEnv": |
| | self._source_warm_start_enabled = True |
| | return self |
| |
|
| | def create_session( |
| | self, |
| | seed: int | None = None, |
| | session_id: str | None = None, |
| | training_agent: str = "us", |
| | training_stage: str = DEFAULT_TRAINING_STAGE, |
| | max_turns: int | None = None, |
| | scenario_id: str | None = None, |
| | replay_id: str | None = None, |
| | replay_start_index: int | None = None, |
| | ) -> SessionState: |
| | resolved_session_id = session_id or str(uuid4()) |
| | self._seed(seed) |
| | scenario = get_scenario_definition(scenario_id) |
| | world = self._initial_world() |
| | self._apply_scenario(world, scenario) |
| | historical_replay = self._initialize_historical_replay( |
| | world=world, |
| | training_agent=training_agent, |
| | replay_id=replay_id, |
| | replay_start_index=replay_start_index, |
| | ) |
| | episode = self._build_episode_metadata( |
| | training_stage=training_stage, |
| | max_turns=max_turns, |
| | scenario=scenario, |
| | historical_replay=historical_replay, |
| | ) |
| | if self._source_warm_start_enabled: |
| | self._source_harvester.warm_start_agents( |
| | include_live=False, |
| | max_training_sources=2, |
| | max_live_sources=0, |
| | ) |
| | belief_state = self._initialize_belief_state(world, episode) |
| | observations = self._build_observations( |
| | world, |
| | episode, |
| | belief_state=belief_state, |
| | historical_replay=historical_replay, |
| | ) |
| | rewards = {agent_id: RewardBreakdown() for agent_id in AGENT_IDS} |
| | session = SessionState( |
| | session_id=resolved_session_id, |
| | seed=seed, |
| | world=world, |
| | observations=observations, |
| | belief_state=belief_state, |
| | rewards=rewards, |
| | historical_replay=historical_replay, |
| | model_bindings=self._build_model_bindings(), |
| | episode=episode, |
| | ) |
| | last_sync_at = self._source_harvester.last_sync_at() |
| | if last_sync_at is not None: |
| | session.live.last_source_sync_at = last_sync_at |
| | return session |
| |
|
| | def reset_session( |
| | self, |
| | session_id: str, |
| | seed: int | None = None, |
| | training_agent: str = "us", |
| | training_stage: str = DEFAULT_TRAINING_STAGE, |
| | max_turns: int | None = None, |
| | scenario_id: str | None = None, |
| | replay_id: str | None = None, |
| | replay_start_index: int | None = None, |
| | ) -> SessionState: |
| | return self.create_session( |
| | seed=seed, |
| | session_id=session_id, |
| | training_agent=training_agent, |
| | training_stage=training_stage, |
| | max_turns=max_turns, |
| | scenario_id=scenario_id, |
| | replay_id=replay_id, |
| | replay_start_index=replay_start_index, |
| | ) |
| |
|
| | def configure_live_session(self, session: SessionState, request: LiveControlRequest) -> SessionState: |
| | updated = session.model_copy(deep=True) |
| | if request.enabled and not updated.episode.live_mode_capable: |
| | raise ValueError( |
| | f"Live mode is only supported for {DEFAULT_TRAINING_STAGE} sessions." |
| | ) |
| | updated.live.enabled = request.enabled |
| | updated.live.auto_step = request.auto_step |
| | updated.live.poll_interval_ms = max(request.poll_interval_ms, MIN_LIVE_AUTO_STEP_MS) |
| | updated.live.started_at = datetime.now(timezone.utc) if request.enabled else None |
| | updated.live.last_source_sync_at = datetime.now(timezone.utc) if request.enabled else None |
| | updated.live.last_auto_step_at = None |
| | updated.live.reacted_packet_fetched_at = {} |
| | updated.live.source_queue_sizes = ( |
| | { |
| | agent_id: len(AGENT_LIVE_SOURCE_BUNDLES.get(agent_id, [])) |
| | for agent_id in AGENT_IDS |
| | } |
| | if request.enabled |
| | else {} |
| | ) |
| | if not request.enabled: |
| | updated.live.source_queue_sizes = {} |
| | updated.updated_at = datetime.now(timezone.utc) |
| | if request.enabled: |
| | if self._source_warm_start_enabled: |
| | self._source_harvester.warm_start_agents( |
| | include_live=True, |
| | max_training_sources=1, |
| | max_live_sources=1, |
| | ) |
| | else: |
| | self._source_harvester.refresh_due_batch(include_live=True) |
| | return self.refresh_session_sources(updated) |
| |
|
| | def step_session(self, session: SessionState, request: StepSessionRequest) -> StepSessionResponse: |
| | next_session = session.model_copy(deep=True) |
| | before_tension = next_session.world.tension_level |
| | prior_latent_event_ids = {event.event_id for event in next_session.world.latent_events} |
| | next_session.world.turn += 1 |
| | if next_session.live.enabled: |
| | self._source_harvester.refresh_due_batch(include_live=True) |
| | else: |
| | self._source_harvester.refresh_due_batch(include_live=False) |
| |
|
| | self._inject_external_signals(next_session.world, request.external_signals) |
| | resolved_actions = dict(request.actions) |
| | oversight = OversightIntervention() |
| | if next_session.episode.oversight_enabled: |
| | oversight = self._compute_oversight(next_session.world, resolved_actions, request.external_signals) |
| | resolved_actions = self._resolve_oversight_actions(resolved_actions, oversight) |
| | next_session.world.last_actions = list(resolved_actions.values()) |
| | self._apply_actions(next_session.world, resolved_actions) |
| | if next_session.episode.oversight_enabled: |
| | self._apply_oversight(next_session.world, oversight) |
| | self._resync_public_events(next_session.world) |
| | latent_event_ids = [ |
| | event.event_id for event in next_session.world.latent_events if event.event_id not in prior_latent_event_ids |
| | ] |
| | revealed_event, prediction_assessments = self._advance_historical_replay( |
| | next_session, |
| | request.predictions, |
| | ) |
| | next_session.rewards = self._compute_rewards(world=next_session.world, episode=next_session.episode) |
| | if prediction_assessments: |
| | self._apply_forecast_rewards(next_session.rewards, prediction_assessments) |
| | next_session.model_bindings = self._build_model_bindings() |
| | next_session.belief_state = self._update_belief_state(next_session) |
| | next_session.observations = self._build_observations( |
| | next_session.world, |
| | next_session.episode, |
| | include_live_sources=next_session.live.enabled, |
| | belief_state=next_session.belief_state, |
| | historical_replay=next_session.historical_replay, |
| | ) |
| | next_session.recent_traces.append( |
| | StepTrace( |
| | turn=next_session.world.turn, |
| | tension_before=before_tension, |
| | tension_after=next_session.world.tension_level, |
| | actions=resolved_actions, |
| | predictions=request.predictions, |
| | prediction_assessments=prediction_assessments, |
| | revealed_event=revealed_event, |
| | rewards=next_session.rewards, |
| | oversight=oversight, |
| | ) |
| | ) |
| | next_session.recent_traces = next_session.recent_traces[-25:] |
| | next_session.action_log.extend(self._build_action_log_entries(next_session, resolved_actions)) |
| | next_session.action_log = next_session.action_log[-250:] |
| | if request.external_signals: |
| | next_session.reaction_log.append( |
| | self._build_reaction_log_entry( |
| | session=next_session, |
| | signals=request.external_signals, |
| | latent_event_ids=latent_event_ids, |
| | actions=resolved_actions, |
| | oversight=oversight, |
| | tension_before=before_tension, |
| | ) |
| | ) |
| | next_session.reaction_log = next_session.reaction_log[-100:] |
| | next_session.updated_at = datetime.now(timezone.utc) |
| |
|
| | return StepSessionResponse( |
| | session=next_session, |
| | oversight=oversight, |
| | done=( |
| | next_session.world.turn >= next_session.episode.max_turns |
| | or next_session.world.tension_level >= 95.0 |
| | or ( |
| | next_session.historical_replay.enabled |
| | and next_session.historical_replay.current_event_index |
| | >= len(next_session.historical_replay.ground_truth_timeline) - 1 |
| | ) |
| | ), |
| | ) |
| |
|
| | def refresh_session_sources(self, session: SessionState, force: bool = False) -> SessionState: |
| | updated = session.model_copy(deep=True) |
| | if force: |
| | self._source_harvester.refresh_agents(include_live=updated.live.enabled, force=True) |
| | elif self._source_warm_start_enabled: |
| | self._source_harvester.warm_start_agents(include_live=updated.live.enabled, force=False) |
| | updated.model_bindings = self._build_model_bindings() |
| | updated.belief_state = self._update_belief_state(updated) |
| | updated.observations = self._build_observations( |
| | updated.world, |
| | updated.episode, |
| | include_live_sources=updated.live.enabled, |
| | belief_state=updated.belief_state, |
| | historical_replay=updated.historical_replay, |
| | ) |
| | last_sync_at = self._source_harvester.last_sync_at() |
| | if last_sync_at is not None: |
| | updated.live.last_source_sync_at = last_sync_at |
| | updated.updated_at = datetime.now(timezone.utc) |
| | return updated |
| |
|
| | def source_monitor(self, session: SessionState) -> SourceMonitorReport: |
| | return build_source_monitor_report(session, harvester=self._source_harvester) |
| |
|
| | def provider_diagnostics(self, session: SessionState) -> ProviderDiagnosticsResponse: |
| | bindings = session.model_bindings or self._build_model_bindings() |
| | return ProviderDiagnosticsResponse( |
| | agents=self._provider_runtime.diagnostics(bindings) |
| | ) |
| |
|
| | def list_scenarios(self) -> list[ScenarioDefinition]: |
| | return list_scenario_definitions() |
| |
|
| | def scenario_turn_signals(self, scenario_id: str | None, turn: int) -> list[ExternalSignal]: |
| | return scenario_signals_for_turn(scenario_id, turn) |
| |
|
| | def maybe_auto_step_live_session(self, session: SessionState) -> SessionState: |
| | refreshed = self.refresh_session_sources(session) |
| | if not refreshed.live.enabled or not refreshed.live.auto_step: |
| | return refreshed |
| |
|
| | now = datetime.now(timezone.utc) |
| | if not self._live_auto_step_due(refreshed, now): |
| | return refreshed |
| |
|
| | signals, reacted_packets = self._collect_live_external_signals(refreshed) |
| | if not signals: |
| | return refreshed |
| |
|
| | actions = self.resolve_policy_actions(refreshed, signals) |
| | result = self.step_session( |
| | refreshed, |
| | StepSessionRequest(actions=actions, external_signals=signals), |
| | ) |
| | next_session = result.session |
| | next_session.live.last_auto_step_at = now |
| | next_session.live.reacted_packet_fetched_at.update(reacted_packets) |
| | next_session.updated_at = datetime.now(timezone.utc) |
| | return next_session |
| |
|
| | def _live_auto_step_due(self, session: SessionState, now: datetime) -> bool: |
| | last_auto_step_at = session.live.last_auto_step_at |
| | if last_auto_step_at is None: |
| | return True |
| | return now - last_auto_step_at >= timedelta( |
| | milliseconds=max(session.live.poll_interval_ms, MIN_LIVE_AUTO_STEP_MS) |
| | ) |
| |
|
| | def _collect_live_external_signals( |
| | self, |
| | session: SessionState, |
| | ) -> tuple[list[ExternalSignal], dict[str, datetime]]: |
| | newest_packets: dict[str, tuple[str, SourcePacket]] = {} |
| | for agent_id, observation in session.observations.items(): |
| | for packet in observation.source_packets: |
| | if packet.status != "ok" or not packet.summary or packet.fetched_at is None: |
| | continue |
| | if not self._is_source_packet_fresh(packet): |
| | continue |
| | last_reacted = session.live.reacted_packet_fetched_at.get(packet.source_id) |
| | if last_reacted is not None and packet.fetched_at <= last_reacted: |
| | continue |
| | cached = newest_packets.get(packet.source_id) |
| | if cached is None or packet.fetched_at > (cached[1].fetched_at or datetime.fromtimestamp(0, tz=timezone.utc)): |
| | newest_packets[packet.source_id] = (agent_id, packet) |
| |
|
| | ordered_packets = sorted( |
| | newest_packets.values(), |
| | key=lambda item: item[1].fetched_at or datetime.fromtimestamp(0, tz=timezone.utc), |
| | reverse=True, |
| | )[:MAX_AUTO_REACTION_SIGNALS] |
| |
|
| | signals: list[ExternalSignal] = [] |
| | reacted_packets: dict[str, datetime] = {} |
| | for agent_id, packet in ordered_packets: |
| | signal = self._packet_to_external_signal(agent_id, packet) |
| | signals.append(signal) |
| | if packet.fetched_at is not None: |
| | reacted_packets[packet.source_id] = packet.fetched_at |
| | return signals, reacted_packets |
| |
|
| | def _packet_to_external_signal(self, agent_id: str, packet: SourcePacket) -> ExternalSignal: |
| | packet_text = " ".join([packet.source_name, packet.summary, *packet.sample_items]).lower() |
| | categories = self._classify_signal_categories(packet_text) |
| | severity = 0.35 |
| | if "attack" in categories: |
| | severity = max(severity, 0.82) |
| | if "shipping" in categories: |
| | severity = max(severity, 0.72) |
| | if "commodities" in categories: |
| | severity = max(severity, 0.62) |
| | if "cyber" in categories or "unrest" in categories: |
| | severity = max(severity, 0.64) |
| | if "market" in categories: |
| | severity = max(severity, 0.56) |
| | if "humanitarian" in categories or "diplomacy" in categories: |
| | severity = max(severity, 0.44) |
| | if packet.delivery == "live_demo": |
| | severity += 0.05 |
| |
|
| | lead = packet.sample_items[0] if packet.sample_items else packet.summary |
| | return ExternalSignal( |
| | source=packet.source_name, |
| | headline=self._clip_summary(f"{packet.source_name}: {lead}", limit=160), |
| | region=None if agent_id == "oversight" else agent_id, |
| | tags=sorted(categories | {agent_id, packet.kind, packet.delivery}), |
| | severity=round(min(severity, 0.95), 2), |
| | ) |
| |
|
| | def _classify_signal_categories(self, text: str) -> set[str]: |
| | categories: set[str] = set() |
| | if self._signal_mentions( |
| | text, |
| | "strike", |
| | "rocket", |
| | "missile", |
| | "drone", |
| | "attack", |
| | "raid", |
| | "blast", |
| | "explosion", |
| | "intercept", |
| | "retaliat", |
| | "launch", |
| | ): |
| | categories.add("attack") |
| | if self._signal_mentions( |
| | text, |
| | "shipping", |
| | "tanker", |
| | "vessel", |
| | "hormuz", |
| | "oil", |
| | "terminal", |
| | "seaport", |
| | "harbor", |
| | "maritime", |
| | "strait", |
| | "pipeline", |
| | "red sea", |
| | ): |
| | categories.add("shipping") |
| | if self._signal_mentions( |
| | text, |
| | "gold", |
| | "silver", |
| | "copper", |
| | "lithium", |
| | "nickel", |
| | "uranium", |
| | "phosphate", |
| | "bauxite", |
| | "rare earth", |
| | "rare-earth", |
| | "commodity", |
| | "mineral", |
| | "metals", |
| | "natural gas", |
| | "lng", |
| | ): |
| | categories.add("commodities") |
| | if self._signal_mentions(text, "ceasefire", "talk", "negotiat", "summit", "diplomat", "mediat"): |
| | categories.add("diplomacy") |
| | if self._signal_mentions(text, "humanitarian", "aid", "displacement", "relief", "civilian", "refugee"): |
| | categories.add("humanitarian") |
| | if self._signal_mentions(text, "cyber", "internet", "blackout", "outage", "malware", "network"): |
| | categories.add("cyber") |
| | if self._signal_mentions(text, "protest", "unrest", "sanction", "inflation", "currency", "black market"): |
| | categories.add("unrest") |
| | if self._signal_mentions(text, "market", "investor", "trade", "stocks", "shares", "bond", "price", "futures"): |
| | categories.add("market") |
| | return categories or {"general"} |
| |
|
| | def resolve_policy_actions( |
| | self, |
| | session: SessionState, |
| | signals: list[ExternalSignal], |
| | *, |
| | preset_actions: dict[str, AgentAction] | None = None, |
| | agent_ids: list[str] | None = None, |
| | ) -> dict[str, AgentAction]: |
| | actions: dict[str, AgentAction] = dict(preset_actions or {}) |
| | target_agent_ids = agent_ids or list(AGENT_IDS) |
| | for agent_id, action in actions.items(): |
| | self._validate_action(agent_id, action) |
| |
|
| | for agent_id in target_agent_ids: |
| | if agent_id in actions: |
| | continue |
| | provider_action, provider_error = self._resolve_provider_action(session, agent_id, signals) |
| | if provider_action is not None: |
| | actions[agent_id] = provider_action |
| | continue |
| | signal_context, top_headline = self._build_signal_context(agent_id, signals) |
| | allowed_actions = AGENT_ALLOWED_ACTIONS.get(agent_id, ()) |
| | action_type = max( |
| | allowed_actions, |
| | key=lambda candidate: self._score_live_action( |
| | agent_id=agent_id, |
| | action_type=candidate, |
| | session=session, |
| | signal_context=signal_context, |
| | ), |
| | ) |
| | target = self._select_live_action_target(agent_id, action_type, session, signal_context) |
| | driver = self._event_driver_label(signal_context) |
| | metadata: dict[str, object] = { |
| | "mode": "heuristic_fallback", |
| | "driver": driver, |
| | "signal_count": int(signal_context["relevant_count"]), |
| | } |
| | if provider_error is not None: |
| | metadata["provider_error"] = provider_error |
| | if top_headline: |
| | metadata["trigger_headline"] = top_headline |
| | actions[agent_id] = AgentAction( |
| | actor=agent_id, |
| | type=action_type, |
| | target=target, |
| | summary=self._build_auto_action_summary(agent_id, action_type, target, driver), |
| | metadata=metadata, |
| | ) |
| | return actions |
| |
|
| | def _resolve_provider_action( |
| | self, |
| | session: SessionState, |
| | agent_id: str, |
| | signals: list[ExternalSignal], |
| | ) -> tuple[AgentAction | None, str | None]: |
| | binding = session.model_bindings.get(agent_id) |
| | if binding is None or not binding.ready_for_inference: |
| | return None, None |
| |
|
| | try: |
| | action = self._provider_runtime.decide_action( |
| | ProviderDecisionRequest( |
| | agent_id=agent_id, |
| | binding=binding, |
| | observation=session.observations[agent_id], |
| | external_signals=signals, |
| | ) |
| | ) |
| | self._validate_action(agent_id, action) |
| | return action, None |
| | except ProviderDecisionError as exc: |
| | return None, str(exc) |
| |
|
| | def _select_live_actions( |
| | self, |
| | session: SessionState, |
| | signals: list[ExternalSignal], |
| | ) -> dict[str, AgentAction]: |
| | return self.resolve_policy_actions(session, signals) |
| |
|
| | def _build_signal_context( |
| | self, |
| | agent_id: str, |
| | signals: list[ExternalSignal], |
| | ) -> tuple[dict[str, float], str]: |
| | signal_context = { |
| | "attack": 0.0, |
| | "shipping": 0.0, |
| | "commodities": 0.0, |
| | "diplomacy": 0.0, |
| | "humanitarian": 0.0, |
| | "cyber": 0.0, |
| | "unrest": 0.0, |
| | "market": 0.0, |
| | "general": 0.0, |
| | "pressure": 0.0, |
| | "relevant_count": 0.0, |
| | } |
| | top_headline = "" |
| | top_severity = 0.0 |
| |
|
| | for signal in signals: |
| | if agent_id != "oversight": |
| | affected_agents = set(self._infer_affected_agents(signal)) |
| | if agent_id not in affected_agents and signal.region != agent_id: |
| | continue |
| |
|
| | text = f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}".lower() |
| | categories = self._classify_signal_categories(text) |
| | weight = max(0.2, signal.severity) |
| | signal_context["relevant_count"] += 1.0 |
| | for category in categories: |
| | signal_context[category] = signal_context.get(category, 0.0) + weight |
| | if signal.severity >= top_severity: |
| | top_severity = signal.severity |
| | top_headline = signal.headline |
| |
|
| | signal_context["pressure"] = sum( |
| | signal_context[category] |
| | for category in ("attack", "shipping", "commodities", "diplomacy", "humanitarian", "cyber", "unrest", "market") |
| | ) + 0.3 * signal_context["general"] |
| | return signal_context, top_headline |
| |
|
| | def _score_live_action( |
| | self, |
| | *, |
| | agent_id: str, |
| | action_type: str, |
| | session: SessionState, |
| | signal_context: dict[str, float], |
| | ) -> float: |
| | observation = session.observations[agent_id] |
| | metric_gain = 0.0 |
| | action_effects = AGENT_STATE_ACTION_EFFECTS[agent_id].get(action_type, {}) |
| |
|
| | for metric, config in AGENT_REWARD_METRIC_CONFIGS[agent_id].items(): |
| | current_value = observation.strategic_state.get( |
| | metric, |
| | AGENT_STATE_BASELINES[agent_id].get(metric, 50.0), |
| | ) |
| | projected_value = self._clamp_percent(current_value + action_effects.get(metric, 0.0)) |
| | before_score = self._target_score(current_value, config.target, config.tolerance) |
| | after_score = self._target_score(projected_value, config.target, config.tolerance) |
| | metric_gain += (after_score - before_score) * config.weight |
| |
|
| | doctrinal_fit = AGENT_ACTION_ALIGNMENT[agent_id].get(action_type, 0.0) |
| | signal_bias = self._live_signal_action_bias(agent_id, action_type, signal_context) |
| | belief_bias = self._belief_action_bias( |
| | agent_id, |
| | action_type, |
| | session.belief_state.get(agent_id, AgentBeliefState(agent_id=agent_id)), |
| | ) |
| | asset_pressure = self._asset_pressure(session.world, agent_id) |
| | if action_type in {"defend", "intel_query"}: |
| | signal_bias += 0.22 * asset_pressure |
| | elif action_type == "negotiate": |
| | signal_bias += 0.10 * asset_pressure |
| | elif action_type == "hold": |
| | signal_bias -= 0.14 * asset_pressure |
| | continuity_bonus = 0.0 |
| | if any(action.actor == agent_id and action.type == action_type for action in session.world.last_actions): |
| | continuity_bonus = 0.06 |
| |
|
| | escalation_penalty = 0.0 |
| | if session.world.tension_level >= 78.0 and action_type in {"strike", "mobilize", "deceive", "sanction"}: |
| | escalation_penalty += 0.28 |
| | if signal_context["diplomacy"] >= 0.6 and action_type in {"strike", "mobilize", "deceive"}: |
| | escalation_penalty += 0.18 |
| | if signal_context["attack"] >= 0.65 and action_type == "hold": |
| | escalation_penalty += 0.18 |
| | if signal_context["shipping"] >= 0.55 and agent_id in {"us", "gulf"} and action_type in {"strike", "deceive"}: |
| | escalation_penalty += 0.2 |
| | if asset_pressure >= 0.35 and action_type in {"strike", "mobilize", "deceive"}: |
| | escalation_penalty += 0.16 |
| |
|
| | if agent_id == "oversight" and action_type == "oversight_review": |
| | escalation_penalty -= min(0.25, signal_context["pressure"] * 0.2) |
| |
|
| | return metric_gain * 1.8 + doctrinal_fit * 0.28 + signal_bias + belief_bias + continuity_bonus - escalation_penalty |
| |
|
| | @staticmethod |
| | def _belief_action_bias( |
| | agent_id: str, |
| | action_type: str, |
| | belief_state: AgentBeliefState, |
| | ) -> float: |
| | topic_weights = {belief.topic: belief.confidence for belief in belief_state.beliefs[:4]} |
| | shipping = topic_weights.get("shipping", 0.0) |
| | commodities = topic_weights.get("commodities", 0.0) |
| | border = topic_weights.get("border", 0.0) |
| | corridor = topic_weights.get("corridor", 0.0) |
| | diplomacy = topic_weights.get("diplomacy", 0.0) |
| | cyber = topic_weights.get("cyber", 0.0) |
| | domestic = topic_weights.get("domestic", 0.0) |
| |
|
| | if agent_id in {"us", "gulf"}: |
| | return { |
| | "defend": 0.22 * shipping, |
| | "negotiate": 0.16 * diplomacy + 0.08 * shipping + 0.10 * commodities, |
| | "intel_query": 0.14 * cyber + 0.10 * commodities, |
| | "strike": 0.06 * border - 0.12 * diplomacy, |
| | }.get(action_type, 0.0) |
| | if agent_id == "israel": |
| | return { |
| | "defend": 0.2 * border, |
| | "strike": 0.12 * border, |
| | "intel_query": 0.1 * corridor + 0.08 * cyber, |
| | "negotiate": 0.1 * diplomacy - 0.08 * border, |
| | }.get(action_type, 0.0) |
| | if agent_id in {"iran", "hezbollah"}: |
| | return { |
| | "mobilize": 0.16 * corridor + 0.08 * border, |
| | "deceive": 0.12 * cyber + 0.08 * corridor + 0.06 * commodities, |
| | "defend": 0.1 * border + 0.08 * domestic, |
| | "negotiate": 0.1 * diplomacy - 0.08 * corridor - 0.04 * commodities, |
| | }.get(action_type, 0.0) |
| | return { |
| | "oversight_review": 0.18 * (shipping + commodities + border + corridor + cyber + domestic), |
| | "negotiate": 0.12 * diplomacy, |
| | "intel_query": 0.1 * cyber + 0.08 * commodities, |
| | }.get(action_type, 0.0) |
| |
|
| | def _live_signal_action_bias( |
| | self, |
| | agent_id: str, |
| | action_type: str, |
| | signal_context: dict[str, float], |
| | ) -> float: |
| | attack = signal_context["attack"] |
| | shipping = signal_context["shipping"] |
| | commodities = signal_context["commodities"] |
| | diplomacy = signal_context["diplomacy"] |
| | humanitarian = signal_context["humanitarian"] |
| | cyber = signal_context["cyber"] |
| | unrest = signal_context["unrest"] |
| | market = signal_context["market"] |
| | pressure = signal_context["pressure"] |
| |
|
| | if agent_id == "us": |
| | return { |
| | "defend": 0.34 * shipping + 0.22 * attack + 0.18 * cyber + 0.14 * market, |
| | "negotiate": 0.30 * diplomacy + 0.18 * humanitarian + 0.16 * market + 0.08 * attack + 0.10 * commodities, |
| | "intel_query": 0.24 * cyber + 0.18 * attack + 0.12 * unrest + 0.12 * commodities, |
| | "mobilize": 0.16 * attack + 0.12 * shipping + 0.06 * commodities, |
| | "sanction": 0.18 * unrest + 0.10 * cyber, |
| | "strike": 0.08 * attack - 0.18 * diplomacy - 0.12 * humanitarian, |
| | "deceive": 0.04 * attack - 0.10 * diplomacy, |
| | "hold": 0.10 * diplomacy - 0.10 * attack, |
| | }.get(action_type, 0.0) |
| |
|
| | if agent_id == "israel": |
| | return { |
| | "defend": 0.38 * attack + 0.14 * cyber, |
| | "strike": 0.22 * attack - 0.12 * humanitarian, |
| | "mobilize": 0.18 * attack, |
| | "intel_query": 0.20 * cyber + 0.16 * attack, |
| | "negotiate": 0.16 * diplomacy + 0.08 * humanitarian, |
| | "hold": 0.08 * diplomacy - 0.12 * attack, |
| | "deceive": 0.12 * attack, |
| | "sanction": 0.06 * unrest, |
| | }.get(action_type, 0.0) |
| |
|
| | if agent_id == "iran": |
| | return { |
| | "mobilize": 0.26 * shipping + 0.18 * attack + 0.12 * commodities, |
| | "deceive": 0.22 * attack + 0.18 * shipping + 0.14 * cyber + 0.08 * commodities, |
| | "defend": 0.26 * unrest + 0.12 * attack, |
| | "intel_query": 0.18 * cyber + 0.16 * unrest, |
| | "negotiate": 0.14 * diplomacy - 0.14 * attack - 0.06 * commodities, |
| | "strike": 0.12 * attack + 0.10 * shipping + 0.06 * commodities - 0.18 * diplomacy, |
| | "hold": 0.08 * diplomacy - 0.06 * shipping - 0.04 * commodities, |
| | "sanction": 0.08 * unrest, |
| | }.get(action_type, 0.0) |
| |
|
| | if agent_id == "hezbollah": |
| | return { |
| | "defend": 0.28 * attack, |
| | "deceive": 0.24 * attack + 0.14 * cyber, |
| | "mobilize": 0.18 * attack, |
| | "strike": 0.14 * attack - 0.14 * humanitarian, |
| | "negotiate": 0.22 * humanitarian + 0.14 * diplomacy - 0.18 * attack, |
| | "hold": 0.18 * humanitarian + 0.10 * diplomacy - 0.12 * attack, |
| | "intel_query": 0.12 * cyber + 0.10 * attack, |
| | "sanction": 0.04 * unrest, |
| | }.get(action_type, 0.0) |
| |
|
| | if agent_id == "gulf": |
| | return { |
| | "defend": 0.38 * shipping + 0.18 * attack + 0.16 * market, |
| | "negotiate": 0.28 * diplomacy + 0.24 * market + 0.14 * humanitarian + 0.14 * commodities, |
| | "intel_query": 0.22 * shipping + 0.14 * cyber + 0.18 * commodities, |
| | "mobilize": 0.16 * attack + 0.10 * shipping + 0.04 * commodities - 0.12 * market, |
| | "hold": 0.12 * diplomacy - 0.12 * shipping - 0.06 * commodities, |
| | "strike": 0.04 * attack + 0.04 * commodities - 0.24 * market, |
| | "sanction": 0.06 * unrest + 0.04 * commodities - 0.10 * market, |
| | "deceive": 0.04 * commodities - 0.12 * market, |
| | }.get(action_type, 0.0) |
| |
|
| | return { |
| | "oversight_review": 0.34 * pressure + 0.20 * attack + 0.16 * shipping + 0.14 * commodities, |
| | "intel_query": 0.24 * cyber + 0.18 * attack + 0.10 * unrest + 0.14 * commodities, |
| | "negotiate": 0.20 * diplomacy + 0.12 * humanitarian, |
| | "defend": 0.16 * attack + 0.12 * shipping, |
| | "hold": 0.08 * diplomacy - 0.08 * pressure, |
| | }.get(action_type, 0.0) |
| |
|
| | def _select_live_action_target( |
| | self, |
| | agent_id: str, |
| | action_type: str, |
| | session: SessionState, |
| | signal_context: dict[str, float], |
| | ) -> str | None: |
| | if action_type == "defend": |
| | return agent_id if agent_id != "oversight" else None |
| |
|
| | if action_type == "negotiate": |
| | if agent_id == "us": |
| | return "gulf" if signal_context["shipping"] >= signal_context["attack"] else "israel" |
| | if agent_id == "israel": |
| | return "us" |
| | if agent_id == "iran": |
| | return "hezbollah" |
| | if agent_id == "hezbollah": |
| | return "iran" |
| | if agent_id == "gulf": |
| | return "us" |
| | if agent_id == "oversight": |
| | return max( |
| | ("us", "israel", "iran", "hezbollah", "gulf"), |
| | key=lambda candidate: session.world.risk_scores.get(candidate, 0.0), |
| | ) |
| |
|
| | if action_type in {"strike", "sanction"}: |
| | adversaries = AGENT_PRIMARY_ADVERSARIES.get(agent_id, ()) |
| | if not adversaries: |
| | return None |
| | if agent_id == "iran" and signal_context["shipping"] > signal_context["attack"]: |
| | return "gulf" |
| | return adversaries[0] |
| |
|
| | return None |
| |
|
| | def _event_driver_label(self, signal_context: dict[str, float]) -> str: |
| | ranked_categories = sorted( |
| | ( |
| | (category, value) |
| | for category, value in signal_context.items() |
| | if category in {"attack", "shipping", "commodities", "diplomacy", "humanitarian", "cyber", "unrest", "market"} |
| | ), |
| | key=lambda item: item[1], |
| | reverse=True, |
| | ) |
| | if not ranked_categories or ranked_categories[0][1] <= 0.0: |
| | return "regional source refresh" |
| |
|
| | top_category = ranked_categories[0][0] |
| | return { |
| | "attack": "cross-border attack reporting", |
| | "shipping": "shipping-lane disruption reporting", |
| | "commodities": "commodity-market disruption reporting", |
| | "diplomacy": "diplomatic movement", |
| | "humanitarian": "humanitarian pressure", |
| | "cyber": "cyber and trace uncertainty", |
| | "unrest": "domestic instability", |
| | "market": "market stress", |
| | }.get(top_category, "regional source refresh") |
| |
|
| | def _build_auto_action_summary( |
| | self, |
| | agent_id: str, |
| | action_type: str, |
| | target: str | None, |
| | driver: str, |
| | ) -> str: |
| | tool_label = AGENT_TOOL_LABELS[agent_id] |
| | if action_type == "hold": |
| | return f"Hold with {tool_label} in reserve while {driver} clarifies." |
| | if action_type == "negotiate": |
| | target_label = target or "regional counterparts" |
| | return f"Use {tool_label} to negotiate with {target_label} around {driver}." |
| | if action_type == "sanction": |
| | target_label = target or "the pressure source" |
| | return f"Apply economic pressure tools against {target_label} after {driver}." |
| | if action_type == "strike": |
| | target_label = target or "the active threat node" |
| | return f"Use kinetic tools against {target_label} in response to {driver}." |
| | if action_type == "defend": |
| | target_label = target or agent_id |
| | return f"Harden {target_label} with {tool_label} as {driver} comes in." |
| | if action_type == "intel_query": |
| | return f"Pull more collection through {tool_label} before escalating beyond {driver}." |
| | if action_type == "mobilize": |
| | return f"Shift readiness and posture with {tool_label} around {driver}." |
| | if action_type == "deceive": |
| | return f"Use deniable signaling and masking tools while {driver} unfolds." |
| | return f"Run an oversight review with {tool_label} against {driver}." |
| |
|
| | def shutdown(self) -> None: |
| | self._source_harvester.stop() |
| | self._provider_runtime.close() |
| |
|
| | def _seed(self, seed: int | None) -> None: |
| | if seed is not None: |
| | self._rng.seed(seed) |
| |
|
| | def _initial_world(self) -> WorldState: |
| | latent_state = {agent_id: metrics.copy() for agent_id, metrics in AGENT_STATE_BASELINES.items()} |
| | baseline_event = LatentEvent( |
| | event_id="baseline-posture", |
| | topic="diplomacy", |
| | status="active", |
| | severity=0.45, |
| | visibility="public", |
| | reliability=0.74, |
| | origin="scenario", |
| | affected_agents=["us", "israel", "iran", "hezbollah", "gulf"], |
| | started_at_turn=0, |
| | last_updated_turn=0, |
| | decay_rate=0.03, |
| | narratives=[ |
| | LatentEventNarrative( |
| | framing="baseline", |
| | summary="Regional alert posture is elevated after a contested strike window.", |
| | confidence=0.74, |
| | public=True, |
| | ), |
| | LatentEventNarrative( |
| | framing="concealed", |
| | summary="Privately, all major actors assess that deterrence signaling remains brittle and prone to misread escalation.", |
| | confidence=0.68, |
| | public=False, |
| | ), |
| | ], |
| | ) |
| | return WorldState( |
| | tension_level=50.0, |
| | market_stress=28.0, |
| | oil_pressure=36.0, |
| | latent_state=latent_state, |
| | latent_events=[baseline_event], |
| | actor_state={agent_id: metrics.copy() for agent_id, metrics in latent_state.items()}, |
| | asset_state=self._initial_asset_state(), |
| | coalition_graph={ |
| | "us": ["israel"], |
| | "israel": ["us"], |
| | "iran": ["hezbollah"], |
| | "hezbollah": ["iran"], |
| | "gulf": [], |
| | "oversight": [], |
| | }, |
| | active_events=[ |
| | BlackSwanEvent( |
| | id="baseline-posture", |
| | summary="Regional alert posture is elevated after a contested strike window.", |
| | source="scenario", |
| | severity=0.45, |
| | public=True, |
| | affected_agents=["us", "israel", "iran", "hezbollah", "gulf"], |
| | ) |
| | ], |
| | hidden_intents={ |
| | "us": "contain escalation while preserving deterrence", |
| | "israel": "degrade proxy launch capacity decisively", |
| | "iran": "raise cost through deniable pressure", |
| | "hezbollah": "probe for weak responses along the border", |
| | "gulf": "keep shipping lanes open and volatility contained", |
| | "oversight": "reduce misalignment without freezing autonomy", |
| | }, |
| | behavioral_consistency={agent_id: 0.72 for agent_id in AGENT_IDS}, |
| | ema_tension={agent_id: 50.0 for agent_id in AGENT_IDS}, |
| | risk_scores={agent_id: 0.25 for agent_id in AGENT_IDS}, |
| | ) |
| |
|
| | def _build_episode_metadata( |
| | self, |
| | training_stage: str, |
| | max_turns: int | None, |
| | *, |
| | scenario: ScenarioDefinition, |
| | historical_replay: HistoricalReplayState | None = None, |
| | ) -> EpisodeMetadata: |
| | stage_config = TRAINING_STAGE_CONFIGS[training_stage] |
| | resolved_max_turns = max_turns or DEFAULT_MAX_TURNS |
| | replay_event_count = len((historical_replay or HistoricalReplayState()).ground_truth_timeline) |
| | if historical_replay is not None and historical_replay.enabled: |
| | remaining_events = max(1, replay_event_count - historical_replay.start_event_index - 1) |
| | resolved_max_turns = min(resolved_max_turns, remaining_events) |
| | return EpisodeMetadata( |
| | max_turns=resolved_max_turns, |
| | training_stage=training_stage, |
| | scenario_id=scenario.id, |
| | scenario_name=scenario.name, |
| | scenario_description=scenario.description, |
| | scenario_tags=list(scenario.tags), |
| | dense_rewards=stage_config["dense_rewards"], |
| | sparse_rewards=not stage_config["dense_rewards"], |
| | fog_of_war=stage_config["fog_of_war"], |
| | oversight_enabled=stage_config["oversight_enabled"], |
| | live_mode_capable=stage_config["live_mode_capable"], |
| | replay_mode=historical_replay is not None and historical_replay.enabled, |
| | replay_id=historical_replay.replay_id if historical_replay is not None and historical_replay.enabled else None, |
| | replay_event_count=replay_event_count, |
| | ) |
| |
|
| | def _initialize_historical_replay( |
| | self, |
| | *, |
| | world: WorldState, |
| | training_agent: str, |
| | replay_id: str | None, |
| | replay_start_index: int | None, |
| | ) -> HistoricalReplayState: |
| | if replay_id is None: |
| | return HistoricalReplayState() |
| |
|
| | resolved_replay_id = replay_id |
| | replay = get_historical_replay(resolved_replay_id) |
| | if len(replay.events) < 2: |
| | raise ValueError(f"Replay {resolved_replay_id} must contain at least two events.") |
| |
|
| | max_start_index = len(replay.events) - 2 |
| | if replay_start_index is None: |
| | start_index = self._rng.randint(0, max_start_index) |
| | else: |
| | if replay_start_index < 0 or replay_start_index > max_start_index: |
| | raise ValueError( |
| | f"Replay start index {replay_start_index} is outside the valid range 0-{max_start_index}." |
| | ) |
| | start_index = replay_start_index |
| |
|
| | state = HistoricalReplayState( |
| | enabled=True, |
| | replay_id=replay.replay_id, |
| | replay_name=replay.name, |
| | training_agent=training_agent, |
| | start_event_index=start_index, |
| | current_event_index=start_index, |
| | ground_truth_timeline=[event.model_copy(deep=True) for event in replay.events], |
| | ) |
| | initial_event = state.ground_truth_timeline[start_index].model_copy(deep=True) |
| | self._apply_historical_event(world, initial_event) |
| | state.visible_event_ids.append(initial_event.event_id) |
| | state.last_revealed_event = initial_event |
| | return state |
| |
|
| | def _advance_historical_replay( |
| | self, |
| | session: SessionState, |
| | predictions: dict[str, Prediction], |
| | ) -> tuple[HistoricalEvent | None, dict[str, PredictionAssessment]]: |
| | replay = session.historical_replay |
| | if not replay.enabled: |
| | return None, {} |
| |
|
| | normalized_predictions = self._normalize_predictions(session, predictions) |
| | if normalized_predictions: |
| | session.prediction_log.extend(normalized_predictions.values()) |
| |
|
| | next_index = replay.current_event_index + 1 |
| | if next_index >= len(replay.ground_truth_timeline): |
| | return None, {} |
| |
|
| | revealed_event = replay.ground_truth_timeline[next_index].model_copy(deep=True) |
| | assessments = { |
| | agent_id: self._score_prediction(prediction=prediction, event=revealed_event) |
| | for agent_id, prediction in normalized_predictions.items() |
| | } |
| |
|
| | replay.current_event_index = next_index |
| | replay.visible_event_ids.append(revealed_event.event_id) |
| | replay.last_revealed_event = revealed_event |
| | self._apply_historical_event(session.world, revealed_event) |
| |
|
| | if assessments: |
| | session.prediction_assessments.extend(assessments.values()) |
| |
|
| | return revealed_event, assessments |
| |
|
| | def _apply_historical_event(self, world: WorldState, event: HistoricalEvent) -> None: |
| | world.tension_level = self._clamp_percent(world.tension_level + event.impact.tension_delta) |
| | world.market_stress = self._clamp_percent(world.market_stress + event.impact.market_stress_delta) |
| | world.oil_pressure = self._clamp_percent(world.oil_pressure + event.impact.oil_pressure_delta) |
| |
|
| | for agent_id, metric_deltas in event.impact.actor_metric_deltas.items(): |
| | for metric, delta in metric_deltas.items(): |
| | self._bump_actor_metric(world, agent_id, metric, delta) |
| |
|
| | affected_agents = self._historical_event_affected_agents(event) |
| | self._register_latent_event( |
| | world, |
| | LatentEvent( |
| | event_id=f"historical-{event.event_id}", |
| | topic=event.topic, |
| | status="active", |
| | severity=severity_score(event.severity), |
| | visibility="public", |
| | reliability=0.96 if event.confirmed else 0.72, |
| | origin=f"historical:{event.source_type}", |
| | affected_agents=affected_agents, |
| | started_at_turn=world.turn, |
| | last_updated_turn=world.turn, |
| | decay_rate=0.04, |
| | narratives=[ |
| | LatentEventNarrative( |
| | framing="baseline", |
| | summary=self._clip_summary(event.public_summary or event.summary), |
| | confidence=0.9 if event.confirmed else 0.68, |
| | public=True, |
| | ), |
| | LatentEventNarrative( |
| | framing="concealed", |
| | summary=self._clip_summary(event.summary), |
| | confidence=0.82 if event.confirmed else 0.6, |
| | public=False, |
| | ), |
| | ], |
| | ), |
| | spawn_links=False, |
| | ) |
| | self._resync_public_events(world) |
| | self._resync_public_actor_state(world) |
| |
|
| | @staticmethod |
| | def _historical_event_affected_agents(event: HistoricalEvent) -> list[str]: |
| | affected = [ |
| | agent_id |
| | for agent_id in [*event.actors, *event.targets] |
| | if agent_id in AGENT_IDS |
| | ] |
| | return sorted(set(affected)) |
| |
|
| | def _normalize_predictions( |
| | self, |
| | session: SessionState, |
| | predictions: dict[str, Prediction], |
| | ) -> dict[str, Prediction]: |
| | normalized: dict[str, Prediction] = {} |
| | for agent_id, prediction in predictions.items(): |
| | if prediction.agent_id != agent_id: |
| | raise ValueError( |
| | f"Prediction agent mismatch: payload agent={prediction.agent_id}, slot={agent_id}" |
| | ) |
| | if prediction.time_horizon_turns < 1: |
| | raise ValueError("Prediction time_horizon_turns must be at least 1.") |
| | normalized[agent_id] = prediction.model_copy( |
| | update={ |
| | "turn": max(0, session.world.turn - 1), |
| | "timestamp": ( |
| | session.historical_replay.last_revealed_event.timestamp |
| | if session.historical_replay.enabled and session.historical_replay.last_revealed_event is not None |
| | else prediction.timestamp |
| | ), |
| | }, |
| | deep=True, |
| | ) |
| | return normalized |
| |
|
| | def _score_prediction(self, *, prediction: Prediction, event: HistoricalEvent) -> PredictionAssessment: |
| | topic_score = 1.0 if prediction.topic == event.topic else -0.4 |
| | actor_score = ( |
| | 1.0 |
| | if prediction.predicted_actor and prediction.predicted_actor in event.actors |
| | else (-0.2 if prediction.predicted_actor is None else -0.5) |
| | ) |
| | target_score = ( |
| | 1.0 |
| | if prediction.predicted_target and prediction.predicted_target in event.targets |
| | else (-0.15 if prediction.predicted_target is None else -0.45) |
| | ) |
| | timing_score = self._clamp_unit(1.0 - abs(prediction.time_horizon_turns - 1) * 0.6) |
| | severity_match_distance = severity_distance(prediction.expected_severity, event.severity) |
| | severity_alignment = {0: 1.0, 1: 0.35, 2: -0.2, 3: -0.5}.get(severity_match_distance, -0.5) |
| |
|
| | correctness = max( |
| | 0.0, |
| | min( |
| | 1.0, |
| | ( |
| | 0.38 * max(topic_score, 0.0) |
| | + 0.22 * max(actor_score, 0.0) |
| | + 0.16 * max(target_score, 0.0) |
| | + 0.12 * max(timing_score, 0.0) |
| | + 0.12 * max(severity_alignment, 0.0) |
| | ), |
| | ), |
| | ) |
| | calibration = self._clamp_unit(1.0 - abs(prediction.confidence - correctness) * 2.0) |
| |
|
| | vague_penalty = 0.0 |
| | if prediction.predicted_actor is None and prediction.predicted_target is None: |
| | vague_penalty -= 0.18 |
| | if len(prediction.summary.strip()) < 24: |
| | vague_penalty -= 0.12 |
| |
|
| | contradiction_penalty = 0.0 |
| | if topic_score < 0.0 and actor_score < 0.0 and prediction.confidence >= 0.55: |
| | contradiction_penalty = -0.22 |
| |
|
| | confident_false_penalty = 0.0 |
| | if correctness < 0.25 and prediction.confidence >= 0.7: |
| | confident_false_penalty = -0.32 |
| |
|
| | total = self._clamp_unit( |
| | 0.28 * topic_score |
| | + 0.18 * actor_score |
| | + 0.14 * target_score |
| | + 0.12 * timing_score |
| | + 0.16 * severity_alignment |
| | + 0.12 * calibration |
| | + vague_penalty |
| | + contradiction_penalty |
| | + confident_false_penalty |
| | ) |
| |
|
| | return PredictionAssessment( |
| | prediction_id=prediction.prediction_id, |
| | agent_id=prediction.agent_id, |
| | turn=prediction.turn, |
| | evaluated_event_id=event.event_id, |
| | evaluated_event_summary=event.summary, |
| | topic_score=round(topic_score, 3), |
| | actor_score=round(actor_score, 3), |
| | target_score=round(target_score, 3), |
| | timing_score=round(timing_score, 3), |
| | severity_score=round(severity_alignment, 3), |
| | confidence_calibration=round(calibration, 3), |
| | vague_penalty=round(vague_penalty, 3), |
| | contradiction_penalty=round(contradiction_penalty, 3), |
| | confident_false_penalty=round(confident_false_penalty, 3), |
| | total=round(total, 3), |
| | ) |
| |
|
| | def _apply_forecast_rewards( |
| | self, |
| | rewards: dict[str, RewardBreakdown], |
| | assessments: dict[str, PredictionAssessment], |
| | ) -> None: |
| | for agent_id, assessment in assessments.items(): |
| | reward = rewards.get(agent_id, RewardBreakdown()) |
| | reward.forecast_terms = { |
| | "topic": assessment.topic_score, |
| | "actor": assessment.actor_score, |
| | "target": assessment.target_score, |
| | "timing": assessment.timing_score, |
| | "severity": assessment.severity_score, |
| | "confidence_calibration": assessment.confidence_calibration, |
| | "vague_penalty": assessment.vague_penalty, |
| | "contradiction_penalty": assessment.contradiction_penalty, |
| | "confident_false_penalty": assessment.confident_false_penalty, |
| | } |
| | reward.forecast_total = assessment.total |
| | reward.total = round( |
| | self._clamp_unit(reward.total + FORECAST_REWARD_BLEND * assessment.total), |
| | 3, |
| | ) |
| | rewards[agent_id] = reward |
| |
|
| | def _apply_scenario(self, world: WorldState, scenario: ScenarioDefinition) -> None: |
| | for field_name, value in scenario.world_overrides.items(): |
| | if field_name in {"tension_level", "market_stress", "oil_pressure"}: |
| | setattr(world, field_name, round(self._clamp_percent(value), 2)) |
| |
|
| | for agent_id, allies in scenario.coalition_overrides.items(): |
| | world.coalition_graph[agent_id] = list(allies) |
| |
|
| | for agent_id, intent in scenario.hidden_intent_overrides.items(): |
| | world.hidden_intents[agent_id] = intent |
| |
|
| | for shift in scenario.metric_shifts: |
| | self._bump_actor_metric(world, shift.agent_id, shift.metric, shift.delta) |
| |
|
| | for impact in scenario.asset_impacts: |
| | self._apply_scenario_asset_impact(world, impact) |
| |
|
| | for event in scenario.public_events: |
| | self._register_latent_event( |
| | world, |
| | self._signal_to_latent_event( |
| | world, |
| | ExternalSignal( |
| | source=event.source, |
| | headline=event.headline, |
| | region=event.region, |
| | tags=list(event.tags), |
| | severity=event.severity, |
| | ), |
| | ), |
| | ) |
| | for index, event in enumerate(scenario.latent_events): |
| | self._register_latent_event( |
| | world, |
| | self._scenario_latent_event_to_event(scenario.id, index, event), |
| | ) |
| | self._resync_public_events(world) |
| | self._resync_public_actor_state(world) |
| |
|
| | def _apply_scenario_asset_impact(self, world: WorldState, impact: ScenarioAssetImpact) -> None: |
| | if impact.mode == "repair": |
| | self._restore_assets( |
| | world, |
| | owner=impact.owner, |
| | intensity=impact.intensity, |
| | reason=impact.reason, |
| | section_bias=impact.section_bias, |
| | max_assets=impact.max_assets, |
| | ) |
| | return |
| | self._damage_assets( |
| | world, |
| | owner=impact.owner, |
| | intensity=impact.intensity, |
| | reason=impact.reason, |
| | section_bias=impact.section_bias, |
| | max_assets=impact.max_assets, |
| | max_status=impact.max_status, |
| | ) |
| |
|
| | def _register_latent_event( |
| | self, |
| | world: WorldState, |
| | event: LatentEvent, |
| | *, |
| | spawn_links: bool = True, |
| | ) -> LatentEvent: |
| | world.latent_events = [existing for existing in world.latent_events if existing.event_id != event.event_id] |
| | world.latent_events.append(event) |
| | if spawn_links: |
| | self._spawn_linked_latent_events(world, event) |
| | return event |
| |
|
| | def _resync_public_events(self, world: WorldState) -> None: |
| | public_events: list[BlackSwanEvent] = [] |
| | for event in world.latent_events[-24:]: |
| | if event.status == "resolved" and event.severity < 0.18: |
| | continue |
| | if event.visibility == "private": |
| | continue |
| | public_events.append( |
| | BlackSwanEvent( |
| | id=event.event_id, |
| | summary=self._latent_event_public_summary(event), |
| | source=event.origin, |
| | severity=round(max(0.22, min(0.95, event.severity * event.reliability + 0.12)), 3), |
| | public=True, |
| | affected_agents=list(event.affected_agents), |
| | ) |
| | ) |
| | world.active_events = public_events[-12:] |
| |
|
| | def _scenario_latent_event_to_event( |
| | self, |
| | scenario_id: str, |
| | index: int, |
| | event: ScenarioLatentEvent, |
| | ) -> LatentEvent: |
| | affected_agents = list(event.affected_agents) or ["us", "israel", "iran", "hezbollah", "gulf"] |
| | narratives = [ |
| | LatentEventNarrative( |
| | framing="baseline", |
| | summary=event.public_summary or event.summary, |
| | confidence=min(0.92, event.reliability + 0.06), |
| | public=True, |
| | ) |
| | ] |
| | if event.private_summary: |
| | narratives.append( |
| | LatentEventNarrative( |
| | framing="concealed", |
| | summary=event.private_summary, |
| | confidence=max(0.36, event.reliability), |
| | public=False, |
| | ) |
| | ) |
| | return LatentEvent( |
| | event_id=f"scenario-latent-{scenario_id}-{index}", |
| | topic=event.topic, |
| | status="active", |
| | severity=event.severity, |
| | visibility=event.visibility, |
| | reliability=event.reliability, |
| | origin=event.source, |
| | affected_agents=affected_agents, |
| | started_at_turn=0, |
| | last_updated_turn=0, |
| | decay_rate=event.decay_rate, |
| | narratives=narratives or self._default_latent_event_narratives(event.topic, event.summary), |
| | ) |
| |
|
| | def _spawn_linked_latent_events(self, world: WorldState, event: LatentEvent) -> None: |
| | if event.severity < 0.48: |
| | return |
| | for linked_topic in LATENT_EVENT_LINKS.get(event.topic, ()): |
| | if any( |
| | existing.topic == linked_topic |
| | and event.event_id in existing.linked_event_ids |
| | and existing.status != "resolved" |
| | for existing in world.latent_events |
| | ): |
| | continue |
| | linked_event = LatentEvent( |
| | event_id=f"{event.event_id}-{linked_topic}", |
| | topic=linked_topic, |
| | status="emerging", |
| | severity=round(max(0.24, min(0.82, event.severity * 0.68)), 3), |
| | visibility="public" if linked_topic in {"market", "humanitarian", "diplomacy"} else "mixed", |
| | reliability=max(0.42, round(event.reliability - 0.06, 3)), |
| | origin=event.origin, |
| | affected_agents=list(event.affected_agents), |
| | started_at_turn=world.turn, |
| | last_updated_turn=world.turn, |
| | decay_rate=min(0.16, event.decay_rate + 0.02), |
| | linked_event_ids=[event.event_id], |
| | narratives=self._default_latent_event_narratives( |
| | linked_topic, |
| | self._linked_event_summary(linked_topic, event), |
| | ), |
| | ) |
| | self._register_latent_event(world, linked_event, spawn_links=False) |
| |
|
| | def _signal_to_latent_event(self, world: WorldState, signal: ExternalSignal) -> LatentEvent: |
| | topic = self._infer_event_topics_from_text( |
| | f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}" |
| | )[0] |
| | affected_agents = self._infer_affected_agents(signal) |
| | return LatentEvent( |
| | event_id=f"signal-{world.turn}-{len(world.latent_events)}", |
| | topic=topic, |
| | status="active", |
| | severity=round(max(0.12, min(1.0, signal.severity)), 3), |
| | visibility="public", |
| | reliability=0.72, |
| | origin=signal.source, |
| | affected_agents=affected_agents, |
| | started_at_turn=world.turn, |
| | last_updated_turn=world.turn, |
| | decay_rate=0.08, |
| | narratives=self._default_latent_event_narratives(topic, signal.headline), |
| | ) |
| |
|
| | def _action_to_latent_event(self, world: WorldState, agent_id: str, action: AgentAction) -> LatentEvent | None: |
| | if action.type == "hold": |
| | return None |
| | topic = self._infer_action_event_topic(agent_id, action) |
| | affected_agents = [agent_id] |
| | if action.target in AGENT_IDS: |
| | affected_agents.append(action.target) |
| | severity = { |
| | "strike": 0.72, |
| | "mobilize": 0.62, |
| | "deceive": 0.54, |
| | "sanction": 0.5, |
| | "defend": 0.44, |
| | "intel_query": 0.36, |
| | "negotiate": 0.42, |
| | "oversight_review": 0.4, |
| | }.get(action.type, 0.38) |
| | visibility = { |
| | "strike": "public", |
| | "sanction": "public", |
| | "negotiate": "public", |
| | "oversight_review": "public", |
| | "mobilize": "mixed", |
| | "defend": "mixed", |
| | "intel_query": "private", |
| | "deceive": "private", |
| | }.get(action.type, "mixed") |
| | summary = action.summary or f"{agent_id} executed {action.type}." |
| | return LatentEvent( |
| | event_id=f"action-{agent_id}-{world.turn}-{len(world.latent_events)}", |
| | topic=topic, |
| | status="active", |
| | severity=severity, |
| | visibility=visibility, |
| | reliability=0.66 if visibility == "public" else 0.58, |
| | origin=f"{agent_id}-action", |
| | affected_agents=sorted(set(affected_agents)), |
| | started_at_turn=world.turn, |
| | last_updated_turn=world.turn, |
| | decay_rate=0.07 if visibility == "public" else 0.1, |
| | narratives=self._default_latent_event_narratives(topic, summary), |
| | ) |
| |
|
| | def _default_latent_event_narratives(self, topic: str, summary: str) -> list[LatentEventNarrative]: |
| | topic_label = CONTRADICTION_TOPIC_LABELS.get(topic, topic) |
| | clipped = self._clip_summary(summary) |
| | return [ |
| | LatentEventNarrative(framing="baseline", summary=clipped, confidence=0.72, public=True), |
| | LatentEventNarrative( |
| | framing="deteriorating", |
| | summary=self._clip_summary( |
| | f"Private reporting points to renewed deterioration in the broader {topic_label} picture. {clipped}" |
| | ), |
| | confidence=0.64, |
| | public=False, |
| | ), |
| | LatentEventNarrative( |
| | framing="stabilizing", |
| | summary=self._clip_summary( |
| | f"Competing reporting suggests partial stabilization around the broader {topic_label} picture. {clipped}" |
| | ), |
| | confidence=0.58, |
| | public=False, |
| | ), |
| | LatentEventNarrative( |
| | framing="concealed", |
| | summary=self._clip_summary(f"Privately, actors suspect additional hidden activity around {topic_label} beyond what is publicly released."), |
| | confidence=0.52, |
| | public=False, |
| | ), |
| | ] |
| |
|
| | def _linked_event_summary(self, topic: str, event: LatentEvent) -> str: |
| | topic_label = CONTRADICTION_TOPIC_LABELS.get(topic, topic) |
| | source_label = CONTRADICTION_TOPIC_LABELS.get(event.topic, event.topic) |
| | return f"Spillover from {source_label} is now driving {topic_label} pressure." |
| |
|
| | @staticmethod |
| | def _latent_event_public_summary(event: LatentEvent) -> str: |
| | for narrative in event.narratives: |
| | if narrative.public: |
| | return narrative.summary |
| | return event.narratives[0].summary if event.narratives else event.topic |
| |
|
| | def _infer_event_topics_from_text(self, text: str) -> list[str]: |
| | lowered = text.lower() |
| | topics = [ |
| | topic |
| | for topic, keywords in LATENT_EVENT_TOPIC_KEYWORDS.items() |
| | if any(keyword in lowered for keyword in keywords) |
| | ] |
| | return topics or ["diplomacy"] |
| |
|
| | def _infer_action_event_topic(self, agent_id: str, action: AgentAction) -> str: |
| | if action.type in {"negotiate", "oversight_review"}: |
| | return "diplomacy" |
| | if action.type == "sanction": |
| | return "domestic" |
| | if action.type in {"deceive", "intel_query"}: |
| | return "cyber" |
| | if action.type == "mobilize" and agent_id in {"iran", "hezbollah"}: |
| | return "corridor" |
| | if action.type in {"strike", "mobilize", "defend"} and action.target in {"gulf", "iran"}: |
| | return "shipping" |
| | if action.type in {"strike", "mobilize", "defend"}: |
| | return "border" |
| | return "diplomacy" |
| |
|
| | def _apply_latent_event_pressure(self, world: WorldState) -> None: |
| | for event in world.latent_events: |
| | if event.status == "resolved": |
| | continue |
| | pressure = event.severity * max(event.reliability, 0.35) |
| | if event.topic == "shipping": |
| | world.market_stress = self._clamp_percent(world.market_stress + pressure * 0.8) |
| | world.oil_pressure = self._clamp_percent(world.oil_pressure + pressure * 1.1) |
| | elif event.topic == "commodities": |
| | world.market_stress = self._clamp_percent(world.market_stress + pressure * 0.9) |
| | world.oil_pressure = self._clamp_percent(world.oil_pressure + pressure * 0.35) |
| | elif event.topic == "border": |
| | world.tension_level = self._clamp_percent(world.tension_level + pressure * 0.9) |
| | elif event.topic == "corridor": |
| | world.tension_level = self._clamp_percent(world.tension_level + pressure * 0.7) |
| | world.oil_pressure = self._clamp_percent(world.oil_pressure + pressure * 0.25) |
| | elif event.topic == "cyber": |
| | world.tension_level = self._clamp_percent(world.tension_level + pressure * 0.35) |
| | world.market_stress = self._clamp_percent(world.market_stress + pressure * 0.45) |
| | elif event.topic == "domestic": |
| | world.market_stress = self._clamp_percent(world.market_stress + pressure * 0.4) |
| | elif event.topic == "humanitarian": |
| | world.tension_level = self._clamp_percent(world.tension_level + pressure * 0.25) |
| | elif event.topic == "diplomacy": |
| | world.tension_level = self._clamp_percent(world.tension_level - pressure * 0.35) |
| | world.market_stress = self._clamp_percent(world.market_stress - pressure * 0.18) |
| |
|
| | def _decay_latent_events(self, world: WorldState) -> None: |
| | for event in world.latent_events: |
| | if event.last_updated_turn >= world.turn: |
| | if event.severity >= 0.66: |
| | event.status = "active" |
| | continue |
| | event.severity = round(max(0.0, event.severity - event.decay_rate), 3) |
| | if event.severity <= 0.12: |
| | event.status = "resolved" |
| | elif event.severity <= 0.35: |
| | event.status = "contained" |
| | else: |
| | event.status = "active" |
| |
|
| | def _inject_external_signals(self, world: WorldState, signals: list[ExternalSignal]) -> None: |
| | for signal in signals: |
| | self._register_latent_event(world, self._signal_to_latent_event(world, signal)) |
| | world.tension_level = min(100.0, world.tension_level + signal.severity * 8.0) |
| | world.market_stress = min(100.0, world.market_stress + signal.severity * 6.0) |
| | if self._signal_mentions(signal.headline.lower(), "oil", "gas", "lng") or "shipping" in signal.tags: |
| | world.oil_pressure = min(100.0, world.oil_pressure + signal.severity * 10.0) |
| | self._apply_signal_pressure(world, signal) |
| | self._apply_signal_asset_effects(world, signal) |
| | self._resync_public_events(world) |
| | self._resync_public_actor_state(world) |
| |
|
| | def _infer_affected_agents(self, signal: ExternalSignal) -> list[str]: |
| | text = f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}".lower() |
| | mapping = { |
| | "us": ("us", "washington", "centcom", "poll"), |
| | "israel": ("israel", "idf", "oref", "northern front"), |
| | "iran": ("iran", "tehran", "hormuz", "proxy"), |
| | "hezbollah": ("hezbollah", "lebanon", "border", "drone"), |
| | "gulf": ("gulf", "saudi", "uae", "shipping", "oil", "gold", "silver", "commodity", "lng"), |
| | } |
| | affected = [agent_id for agent_id, keywords in mapping.items() if any(keyword in text for keyword in keywords)] |
| | return affected or ["us", "israel", "iran", "hezbollah", "gulf"] |
| |
|
| | def _apply_actions(self, world: WorldState, actions: dict[str, AgentAction]) -> None: |
| | for agent_id, action in actions.items(): |
| | self._validate_action(agent_id, action) |
| | impact = AGENT_ACTION_IMPACTS.get(agent_id, {}).get( |
| | action.type, |
| | DEFAULT_ACTION_IMPACTS.get(action.type, DEFAULT_ACTION_IMPACTS["hold"]), |
| | ) |
| | world.tension_level = self._clamp_percent(world.tension_level + impact.tension_delta) |
| | world.market_stress = self._clamp_percent(world.market_stress + impact.market_delta) |
| | world.oil_pressure = self._clamp_percent(world.oil_pressure + impact.oil_delta) |
| | world.risk_scores[agent_id] = round( |
| | max(0.0, min(1.0, world.risk_scores.get(agent_id, 0.25) + impact.risk_delta)), |
| | 3, |
| | ) |
| |
|
| | if action.type == "negotiate" and action.target and action.target in AGENT_IDS: |
| | self._link_agents(world, agent_id, action.target) |
| | elif action.type == "hold": |
| | world.risk_scores[agent_id] = max(0.0, world.risk_scores.get(agent_id, 0.0) - 0.01) |
| | elif action.type == "deceive": |
| | world.hidden_intents[agent_id] = f"{agent_id} is masking operational intent behind ambiguous signaling." |
| | elif action.type == "oversight_review": |
| | for scored_agent in AGENT_IDS: |
| | world.risk_scores[scored_agent] = max(0.0, world.risk_scores.get(scored_agent, 0.0) - 0.015) |
| | world.behavioral_consistency[scored_agent] = min( |
| | 1.0, |
| | world.behavioral_consistency.get(scored_agent, 0.6) + 0.01, |
| | ) |
| |
|
| | world.behavioral_consistency[agent_id] = self._update_behavioral_consistency( |
| | world=world, |
| | agent_id=agent_id, |
| | action=action, |
| | ) |
| | self._apply_actor_state_effects(world, agent_id, action) |
| | self._apply_asset_action_effects(world, agent_id, action) |
| | action_event = self._action_to_latent_event(world, agent_id, action) |
| | if action_event is not None: |
| | self._register_latent_event(world, action_event) |
| |
|
| | world.tension_level = round(world.tension_level, 2) |
| | world.market_stress = round(world.market_stress, 2) |
| | world.oil_pressure = round(world.oil_pressure, 2) |
| | self._apply_latent_event_pressure(world) |
| | self._decay_latent_events(world) |
| | self._reconcile_actor_state(world, actions) |
| | self._resync_public_events(world) |
| | self._resync_public_actor_state(world) |
| |
|
| | @staticmethod |
| | def _validate_action(agent_id: str, action: AgentAction) -> None: |
| | if action.actor != agent_id: |
| | raise ValueError(f"Action actor mismatch: payload actor={action.actor}, slot={agent_id}") |
| | if action.type not in AGENT_ALLOWED_ACTIONS.get(agent_id, ()): |
| | raise ValueError(f"Unsupported action for {agent_id}: {action.type}") |
| |
|
| | def _link_agents(self, world: WorldState, source: str, target: str) -> None: |
| | world.coalition_graph.setdefault(source, []) |
| | world.coalition_graph.setdefault(target, []) |
| | if target not in world.coalition_graph[source]: |
| | world.coalition_graph[source].append(target) |
| | if source not in world.coalition_graph[target]: |
| | world.coalition_graph[target].append(source) |
| |
|
| | def _compute_oversight( |
| | self, |
| | world: WorldState, |
| | actions: dict[str, AgentAction], |
| | signals: list[ExternalSignal], |
| | ) -> OversightIntervention: |
| | escalation_actions = sum( |
| | 1 for action in actions.values() if action.type in {"strike", "mobilize", "deceive", "sanction"} |
| | ) |
| | signal_pressure = sum(signal.severity for signal in signals) |
| | mean_consistency = sum(world.behavioral_consistency.values()) / max(len(world.behavioral_consistency), 1) |
| | raw_risk = ( |
| | (world.tension_level / 100.0) * 0.45 |
| | + min(1.0, escalation_actions / 4.0) * 0.25 |
| | + min(1.0, signal_pressure / 3.0) * 0.15 |
| | + (1.0 - mean_consistency) * 0.15 |
| | ) |
| | risk_score = round(max(0.0, min(1.0, raw_risk)), 3) |
| | if risk_score <= 0.5: |
| | return OversightIntervention(risk_score=risk_score) |
| |
|
| | affected = [ |
| | agent_id |
| | for agent_id, action in actions.items() |
| | if action.type in {"strike", "mobilize", "deceive", "sanction"} |
| | ] |
| | action_override = { |
| | agent_id: self._build_oversight_override_action( |
| | world=world, |
| | agent_id=agent_id, |
| | action=actions[agent_id], |
| | signals=signals, |
| | ) |
| | for agent_id in affected |
| | } |
| | return OversightIntervention( |
| | triggered=True, |
| | risk_score=risk_score, |
| | reason="Escalation probability exceeded the intervention threshold.", |
| | affected_agents=affected or ["us", "israel", "iran", "hezbollah", "gulf"], |
| | action_override=action_override, |
| | ) |
| |
|
| | def _resolve_oversight_actions( |
| | self, |
| | actions: dict[str, AgentAction], |
| | oversight: OversightIntervention, |
| | ) -> dict[str, AgentAction]: |
| | if not oversight.triggered or not oversight.action_override: |
| | return actions |
| |
|
| | resolved_actions = dict(actions) |
| | for agent_id, override_action in oversight.action_override.items(): |
| | self._validate_action(agent_id, override_action) |
| | resolved_actions[agent_id] = override_action |
| | return resolved_actions |
| |
|
| | def _build_oversight_override_action( |
| | self, |
| | *, |
| | world: WorldState, |
| | agent_id: str, |
| | action: AgentAction, |
| | signals: list[ExternalSignal], |
| | ) -> AgentAction: |
| | signal_text = " ".join( |
| | f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}" |
| | for signal in signals |
| | ).lower() |
| | asset_pressure = self._asset_pressure(world, agent_id) |
| |
|
| | if ( |
| | self._signal_mentions(signal_text, "ceasefire", "talk", "negotiat", "summit", "mediat", "humanitarian") |
| | and "negotiate" in AGENT_ALLOWED_ACTIONS.get(agent_id, ()) |
| | ): |
| | return AgentAction( |
| | actor=agent_id, |
| | type="negotiate", |
| | target=self._select_oversight_negotiation_target(agent_id), |
| | summary="Oversight forced a de-escalatory negotiation cycle after elevated intervention risk.", |
| | metadata={"mode": "oversight_override", "replaces": action.type}, |
| | ) |
| |
|
| | if ( |
| | asset_pressure >= 0.2 |
| | or self._signal_mentions(signal_text, "attack", "strike", "rocket", "missile", "drone", "shipping", "cyber", "outage") |
| | ) and "defend" in AGENT_ALLOWED_ACTIONS.get(agent_id, ()): |
| | return AgentAction( |
| | actor=agent_id, |
| | type="defend", |
| | target=agent_id, |
| | summary="Oversight forced a defensive posture to absorb incoming risk instead of escalating further.", |
| | metadata={"mode": "oversight_override", "replaces": action.type}, |
| | ) |
| |
|
| | if "intel_query" in AGENT_ALLOWED_ACTIONS.get(agent_id, ()): |
| | return AgentAction( |
| | actor=agent_id, |
| | type="intel_query", |
| | summary="Oversight forced an intelligence verification cycle before any further escalation.", |
| | metadata={"mode": "oversight_override", "replaces": action.type}, |
| | ) |
| |
|
| | return AgentAction( |
| | actor=agent_id, |
| | type="hold", |
| | summary="Oversight forced a temporary operational hold to break an escalation spiral.", |
| | metadata={"mode": "oversight_override", "replaces": action.type}, |
| | ) |
| |
|
| | @staticmethod |
| | def _select_oversight_negotiation_target(agent_id: str) -> str | None: |
| | return { |
| | "us": "gulf", |
| | "israel": "us", |
| | "iran": "hezbollah", |
| | "hezbollah": "iran", |
| | "gulf": "us", |
| | "oversight": None, |
| | }.get(agent_id) |
| |
|
| | def _apply_oversight(self, world: WorldState, oversight: OversightIntervention) -> None: |
| | if not oversight.triggered: |
| | return |
| | world.tension_level = max(0.0, world.tension_level - 4.0) |
| | world.market_stress = max(0.0, world.market_stress - 2.0) |
| | self._register_latent_event( |
| | world, |
| | LatentEvent( |
| | event_id=f"oversight-{world.turn}-{len(world.latent_events)}", |
| | topic="diplomacy", |
| | status="active", |
| | severity=oversight.risk_score, |
| | visibility="public", |
| | reliability=0.78, |
| | origin="oversight-wrapper", |
| | affected_agents=list(oversight.affected_agents), |
| | started_at_turn=world.turn, |
| | last_updated_turn=world.turn, |
| | decay_rate=0.05, |
| | narratives=self._default_latent_event_narratives( |
| | "diplomacy", |
| | "Oversight injected a corrective diplomatic pause into the next state.", |
| | ), |
| | ), |
| | ) |
| | self._resync_public_events(world) |
| | for agent_id in oversight.affected_agents: |
| | world.risk_scores[agent_id] = oversight.risk_score |
| | world.behavioral_consistency[agent_id] = min( |
| | 1.0, |
| | world.behavioral_consistency.get(agent_id, 0.6) + 0.04, |
| | ) |
| |
|
| | def _initialize_belief_state( |
| | self, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | ) -> dict[str, AgentBeliefState]: |
| | belief_state: dict[str, AgentBeliefState] = {} |
| | for agent_id in AGENT_IDS: |
| | beliefs = [ |
| | self._belief_entry_from_event(event, world=world, episode=episode, agent_id=agent_id) |
| | for event in self._relevant_latent_events_for_agent(world, agent_id) |
| | ] |
| | belief_state[agent_id] = AgentBeliefState( |
| | agent_id=agent_id, |
| | dominant_topics=self._dominant_belief_topics(beliefs), |
| | beliefs=beliefs[:8], |
| | last_revision_turn=world.turn, |
| | ) |
| | return belief_state |
| |
|
| | def _update_belief_state(self, session: SessionState) -> dict[str, AgentBeliefState]: |
| | world = session.world |
| | episode = session.episode |
| | updated_state: dict[str, AgentBeliefState] = {} |
| | for agent_id in AGENT_IDS: |
| | existing = session.belief_state.get(agent_id, AgentBeliefState(agent_id=agent_id)) |
| | belief_index = {belief.belief_id: belief.model_copy(deep=True) for belief in existing.beliefs} |
| | seen_belief_ids: set[str] = set() |
| |
|
| | for event in self._relevant_latent_events_for_agent(world, agent_id): |
| | next_belief = self._belief_entry_from_event(event, world=world, episode=episode, agent_id=agent_id) |
| | seen_belief_ids.add(next_belief.belief_id) |
| | prior = belief_index.get(next_belief.belief_id) |
| | if prior is None: |
| | belief_index[next_belief.belief_id] = next_belief |
| | continue |
| | belief_index[next_belief.belief_id] = self._revise_belief_entry( |
| | prior, |
| | next_belief, |
| | agent_id=agent_id, |
| | turn=world.turn, |
| | ) |
| |
|
| | for belief_id, prior in list(belief_index.items()): |
| | if belief_id in seen_belief_ids: |
| | continue |
| | decayed = self._decay_unseen_belief(prior, agent_id=agent_id, turn=world.turn) |
| | if decayed is None: |
| | belief_index.pop(belief_id, None) |
| | continue |
| | belief_index[belief_id] = decayed |
| |
|
| | beliefs = sorted( |
| | belief_index.values(), |
| | key=lambda belief: (belief.confidence, belief.last_updated_turn, belief.confirmation_count), |
| | reverse=True, |
| | )[:8] |
| | updated_state[agent_id] = AgentBeliefState( |
| | agent_id=agent_id, |
| | dominant_topics=self._dominant_belief_topics(beliefs), |
| | beliefs=beliefs, |
| | last_revision_turn=world.turn, |
| | ) |
| | return updated_state |
| |
|
| | @staticmethod |
| | def _belief_doctrine_prior(agent_id: str, topic: str) -> float: |
| | return BELIEF_TOPIC_PRIORS.get(agent_id, {}).get(topic, 0.0) |
| |
|
| | def _revise_belief_entry( |
| | self, |
| | prior: AgentBeliefEntry, |
| | next_belief: AgentBeliefEntry, |
| | *, |
| | agent_id: str, |
| | turn: int, |
| | ) -> AgentBeliefEntry: |
| | revised = prior.model_copy(deep=True) |
| | doctrine_prior = self._belief_doctrine_prior(agent_id, next_belief.topic) |
| | contradiction = next_belief.status in {"contested", "disconfirmed"} or ( |
| | next_belief.status == "suspected" and prior.status in {"active", "confirmed"} |
| | ) |
| |
|
| | revised.source = next_belief.source |
| | revised.suspected_agents = list(next_belief.suspected_agents) |
| | revised.related_event_ids = list({*prior.related_event_ids, *next_belief.related_event_ids}) |
| | revised.last_updated_turn = turn |
| |
|
| | if contradiction: |
| | revised.contradiction_count += 1 |
| | retention = 0.78 + doctrine_prior * 0.5 + min(prior.confirmation_count, 3) * 0.02 |
| | revised.confidence = round( |
| | max( |
| | BELIEF_PERSISTENCE_FLOOR, |
| | min( |
| | 0.98, |
| | prior.confidence * retention |
| | + next_belief.confidence * 0.12 |
| | - BELIEF_CONTRADICTION_PENALTY, |
| | ), |
| | ), |
| | 3, |
| | ) |
| | revised.status = "disconfirmed" if revised.contradiction_count >= 2 and revised.confidence <= 0.5 else "contested" |
| | if revised.confidence < 0.46: |
| | revised.summary = next_belief.summary |
| | return revised |
| |
|
| | revised.confirmation_count += 1 |
| | retention = 0.38 + doctrine_prior * 0.4 |
| | revised.confidence = round( |
| | max( |
| | BELIEF_PERSISTENCE_FLOOR, |
| | min( |
| | 0.98, |
| | prior.confidence * retention |
| | + next_belief.confidence * (1.0 - retention) |
| | + BELIEF_CONFIRMATION_BONUS, |
| | ), |
| | ), |
| | 3, |
| | ) |
| | revised.summary = next_belief.summary |
| | revised.status = next_belief.status |
| | if revised.confirmation_count >= 2 and revised.confidence >= 0.74 and revised.status == "active": |
| | revised.status = "confirmed" |
| | if revised.status in {"active", "confirmed"}: |
| | revised.last_confirmed_turn = turn |
| | return revised |
| |
|
| | def _decay_unseen_belief( |
| | self, |
| | prior: AgentBeliefEntry, |
| | *, |
| | agent_id: str, |
| | turn: int, |
| | ) -> AgentBeliefEntry | None: |
| | stale_turns = max(0, turn - prior.last_updated_turn) |
| | if stale_turns <= 0: |
| | return prior |
| |
|
| | doctrine_prior = self._belief_doctrine_prior(agent_id, prior.topic) |
| | decay = max( |
| | 0.04, |
| | 0.11 |
| | - doctrine_prior * 0.22 |
| | - min(prior.confirmation_count, 3) * 0.01, |
| | ) |
| | revised = prior.model_copy(deep=True) |
| | revised.confidence = round(max(0.06, prior.confidence - decay), 3) |
| |
|
| | if stale_turns >= 1 and revised.status == "confirmed": |
| | revised.status = "active" |
| | if stale_turns >= 2 and revised.status in {"active", "confirmed"}: |
| | revised.status = "contested" if revised.confidence >= 0.42 else "suspected" |
| | elif stale_turns >= 2 and revised.status == "contested" and revised.confidence < 0.34: |
| | revised.status = "suspected" |
| | if stale_turns >= 3 and revised.contradiction_count > revised.confirmation_count and revised.confidence < 0.28: |
| | revised.status = "disconfirmed" |
| | if stale_turns > BELIEF_MAX_STALE_TURNS and revised.confidence <= 0.14: |
| | return None |
| | return revised |
| |
|
| | def _relevant_latent_events_for_agent(self, world: WorldState, agent_id: str) -> list[LatentEvent]: |
| | relevant_events: list[LatentEvent] = [] |
| | for event in world.latent_events: |
| | if event.status == "resolved": |
| | continue |
| | if agent_id == "oversight": |
| | relevant_events.append(event) |
| | continue |
| | if event.visibility in {"public", "mixed"} or agent_id in event.affected_agents: |
| | relevant_events.append(event) |
| | return relevant_events |
| |
|
| | def _belief_entry_from_event( |
| | self, |
| | event: LatentEvent, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> AgentBeliefEntry: |
| | summary = self._belief_summary_for_agent(event, world=world, episode=episode, agent_id=agent_id) |
| | confidence = self._belief_confidence_for_agent(event, world=world, episode=episode, agent_id=agent_id) |
| | status = self._belief_status_for_agent(event, confidence=confidence, episode=episode, agent_id=agent_id) |
| | return AgentBeliefEntry( |
| | belief_id=f"{agent_id}:{event.event_id}", |
| | topic=event.topic, |
| | summary=summary, |
| | confidence=confidence, |
| | status=status, |
| | source=event.origin, |
| | suspected_agents=list(event.affected_agents), |
| | related_event_ids=[event.event_id, *event.linked_event_ids], |
| | confirmation_count=1 if status in {"active", "confirmed"} else 0, |
| | contradiction_count=1 if status == "contested" else 0, |
| | last_confirmed_turn=world.turn if status in {"active", "confirmed"} else None, |
| | last_updated_turn=world.turn, |
| | ) |
| |
|
| | def _belief_summary_for_agent( |
| | self, |
| | event: LatentEvent, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> str: |
| | narrative = self._select_private_event_narrative(event, world=world, episode=episode, agent_id=agent_id) |
| | prefix = "Belief" if event.visibility == "private" else "Assessment" |
| | return self._clip_summary(f"{prefix}: {narrative.summary}", 180) |
| |
|
| | def _belief_confidence_for_agent( |
| | self, |
| | event: LatentEvent, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> float: |
| | confidence = event.reliability |
| | confidence += self._belief_doctrine_prior(agent_id, event.topic) |
| | if event.visibility == "public": |
| | confidence += 0.12 |
| | elif event.visibility == "private": |
| | confidence -= 0.08 |
| | if self._observation_projection_enabled(agent_id, episode): |
| | confidence += (self._projection_unit(world, episode, agent_id, f"belief:{event.event_id}") - 0.5) * 0.22 |
| | if agent_id != "oversight" and agent_id not in event.affected_agents and event.visibility != "public": |
| | confidence -= 0.14 |
| | return round(max(0.08, min(0.96, confidence)), 3) |
| |
|
| | @staticmethod |
| | def _belief_status_for_agent( |
| | event: LatentEvent, |
| | *, |
| | confidence: float, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> str: |
| | doctrine_prior = BELIEF_TOPIC_PRIORS.get(agent_id, {}).get(event.topic, 0.0) |
| | if event.visibility == "private" and agent_id not in event.affected_agents and agent_id != "oversight": |
| | return "suspected" |
| | if event.status == "contained" and confidence < 0.45: |
| | return "contested" |
| | if not episode.fog_of_war or agent_id == "oversight": |
| | return "confirmed" |
| | if confidence >= 0.7 - min(0.08, doctrine_prior * 0.45): |
| | return "active" |
| | if confidence <= 0.34 - min(0.06, doctrine_prior * 0.2): |
| | return "suspected" |
| | return "contested" |
| |
|
| | @staticmethod |
| | def _dominant_belief_topics(beliefs: list[AgentBeliefEntry]) -> list[str]: |
| | ranked: dict[str, float] = {} |
| | for belief in beliefs: |
| | ranked[belief.topic] = ranked.get(belief.topic, 0.0) + belief.confidence |
| | return [ |
| | topic |
| | for topic, _ in sorted(ranked.items(), key=lambda item: item[1], reverse=True)[:3] |
| | ] |
| |
|
| | def _build_observations( |
| | self, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | *, |
| | include_live_sources: bool = False, |
| | belief_state: dict[str, AgentBeliefState] | None = None, |
| | historical_replay: HistoricalReplayState | None = None, |
| | ) -> dict[str, AgentObservation]: |
| | observations: dict[str, AgentObservation] = {} |
| | public_events = [event for event in world.active_events if event.public][-MAX_PUBLIC_BRIEF_ITEMS:] |
| | public_brief = self._build_public_brief_from_latent_events(world) |
| |
|
| | for agent_id in AGENT_IDS: |
| | profile = AGENT_PROFILES[agent_id] |
| | entity_pack = load_entity_pack(agent_id) |
| | entity_profile = entity_pack.get("profile", {}) |
| | strategic_assets = self._flatten_strategic_assets( |
| | agent_id=agent_id, |
| | entity_pack=entity_pack, |
| | world=world, |
| | ) |
| | training_source_bundle = AGENT_TRAINING_SOURCE_BUNDLES.get(agent_id, []) |
| | live_source_bundle = AGENT_LIVE_SOURCE_BUNDLES.get(agent_id, []) |
| | available_data_sources = self._build_data_source_context(agent_id) |
| | projection_enabled = self._observation_projection_enabled(agent_id, episode) |
| | beliefs = (belief_state or {}).get(agent_id, AgentBeliefState(agent_id=agent_id)) |
| | training_source_packets, live_source_packets = self._source_harvester.get_packets_for_agent( |
| | agent_id, |
| | include_live=include_live_sources, |
| | ) |
| | baseline_private_brief = [ |
| | IntelSnippet( |
| | source="scenario", |
| | category="private_intel", |
| | summary=self._clip_summary(summary), |
| | confidence=0.72, |
| | ) |
| | for summary in profile.baseline_private_intel |
| | ] |
| | focused_private_brief = self._build_focused_private_brief( |
| | world=world, |
| | episode=episode, |
| | agent_id=agent_id, |
| | focus_terms=profile.intelligence_focus, |
| | ) |
| | projected_state, obscured_metric_count = self._project_strategic_state( |
| | world=world, |
| | episode=episode, |
| | agent_id=agent_id, |
| | ) |
| | projected_assets = self._project_strategic_assets( |
| | strategic_assets=strategic_assets, |
| | world=world, |
| | episode=episode, |
| | agent_id=agent_id, |
| | ) |
| | training_source_brief, training_projection = self._source_packets_to_briefs( |
| | training_source_packets, |
| | category="training_source", |
| | world=world, |
| | episode=episode, |
| | agent_id=agent_id, |
| | ) |
| | live_source_brief, live_projection = ( |
| | self._source_packets_to_briefs( |
| | live_source_packets, |
| | category="live_source", |
| | world=world, |
| | episode=episode, |
| | agent_id=agent_id, |
| | ) |
| | if include_live_sources |
| | else ([], {"delayed": 0, "contested": 0, "contradiction_packets": 0, "confidence_sum": 0.0, "contradiction_topics": []}) |
| | ) |
| | private_brief = self._compose_private_brief( |
| | baseline_private_brief=baseline_private_brief, |
| | focused_private_brief=focused_private_brief, |
| | training_source_brief=training_source_brief, |
| | live_source_brief=live_source_brief, |
| | ) |
| | projection = self._build_observation_projection( |
| | agent_id=agent_id, |
| | projection_enabled=projection_enabled, |
| | obscured_metric_count=obscured_metric_count, |
| | delivered_brief_count=len(training_source_brief) + len(live_source_brief), |
| | delayed_source_count=int(training_projection["delayed"]) + int(live_projection["delayed"]), |
| | contested_source_count=int(training_projection["contested"]) + int(live_projection["contested"]), |
| | contradiction_packet_count=int(training_projection["contradiction_packets"]) |
| | + int(live_projection["contradiction_packets"]), |
| | contradiction_topics=sorted( |
| | { |
| | *training_projection["contradiction_topics"], |
| | *live_projection["contradiction_topics"], |
| | } |
| | ), |
| | confidence_sum=float(training_projection["confidence_sum"]) + float(live_projection["confidence_sum"]), |
| | ) |
| | asset_alerts = self._build_asset_alerts(projected_assets) |
| | historical_brief = self._build_historical_brief(historical_replay) |
| | decision_prompt = self._build_decision_prompt( |
| | agent_id=agent_id, |
| | entity_profile=entity_profile, |
| | strategic_assets=projected_assets, |
| | available_data_sources=available_data_sources, |
| | live_enabled=include_live_sources, |
| | projection=projection, |
| | belief_state=beliefs, |
| | historical_replay=historical_replay, |
| | ) |
| |
|
| | observations[agent_id] = AgentObservation( |
| | public_brief=public_brief, |
| | private_brief=private_brief, |
| | belief_brief=[belief.summary for belief in beliefs.beliefs[:4]], |
| | belief_topics=list(beliefs.dominant_topics), |
| | perceived_tension=self._perceived_tension(world.tension_level, agent_id, episode.fog_of_war), |
| | known_coalitions=sorted(world.coalition_graph.get(agent_id, [])), |
| | event_log=public_events, |
| | decision_prompt=decision_prompt, |
| | available_actions=list(AGENT_ALLOWED_ACTIONS.get(agent_id, ())), |
| | available_data_sources=available_data_sources, |
| | entity_profile=entity_profile, |
| | strategic_state=projected_state, |
| | strategic_assets=projected_assets, |
| | asset_alerts=asset_alerts, |
| | source_bundle=training_source_bundle, |
| | training_source_bundle=training_source_bundle, |
| | live_source_bundle=live_source_bundle, |
| | source_packets=training_source_packets + live_source_packets, |
| | training_source_packets=training_source_packets, |
| | live_source_packets=live_source_packets, |
| | historical_brief=historical_brief, |
| | projection=projection, |
| | ) |
| | return observations |
| |
|
| | def _build_public_brief_from_latent_events(self, world: WorldState) -> list[IntelSnippet]: |
| | public_events = [ |
| | event |
| | for event in world.latent_events |
| | if event.visibility in {"public", "mixed"} and event.status != "resolved" |
| | ][-MAX_PUBLIC_BRIEF_ITEMS:] |
| | briefs: list[IntelSnippet] = [] |
| | for event in public_events: |
| | briefs.append( |
| | IntelSnippet( |
| | source=event.origin, |
| | category=f"latent_{event.topic}", |
| | summary=self._clip_summary(self._latent_event_public_summary(event)), |
| | confidence=round(max(0.3, min(0.92, event.severity * event.reliability + 0.18)), 3), |
| | ) |
| | ) |
| | return briefs |
| |
|
| | def _build_focused_private_brief( |
| | self, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | focus_terms: tuple[str, ...], |
| | ) -> list[IntelSnippet]: |
| | briefs: list[IntelSnippet] = [] |
| | for event in reversed(world.latent_events[-8:]): |
| | if event.status == "resolved": |
| | continue |
| | if agent_id != "oversight" and agent_id not in event.affected_agents: |
| | event_text = " ".join(narrative.summary for narrative in event.narratives).lower() |
| | if not any(term in event_text for term in focus_terms): |
| | continue |
| | narrative = self._select_private_event_narrative(event, world=world, episode=episode, agent_id=agent_id) |
| | briefs.append( |
| | IntelSnippet( |
| | source=event.origin, |
| | category=f"latent_{event.topic}", |
| | summary=self._clip_summary(narrative.summary), |
| | confidence=round(max(0.28, min(0.9, narrative.confidence * event.reliability)), 3), |
| | ) |
| | ) |
| | return briefs[:3] |
| |
|
| | def _source_packets_to_briefs( |
| | self, |
| | source_packets: list[SourcePacket], |
| | category: str, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> tuple[list[IntelSnippet], dict[str, float]]: |
| | briefs: list[IntelSnippet] = [] |
| | delayed_count = 0 |
| | contested_count = 0 |
| | contradiction_packet_count = 0 |
| | contradiction_topics: set[str] = set() |
| | confidence_sum = 0.0 |
| | sorted_packets = sorted( |
| | ( |
| | packet |
| | for packet in source_packets |
| | if packet.status == "ok" |
| | and packet.summary |
| | and FogOfWarDiplomacyEnv._is_source_packet_fresh(packet) |
| | ), |
| | key=lambda packet: packet.fetched_at or datetime.fromtimestamp(0, tz=timezone.utc), |
| | reverse=True, |
| | ) |
| | for packet in sorted_packets: |
| | if self._should_delay_source_packet(packet, world=world, episode=episode, agent_id=agent_id): |
| | delayed_count += 1 |
| | continue |
| | confidence = self._projected_source_confidence(packet, world=world, episode=episode, agent_id=agent_id) |
| | relevant_events = self._relevant_latent_events_for_packet(packet, world=world) |
| | event_context = ( |
| | max(relevant_events, key=lambda event: event.severity * event.reliability) |
| | if relevant_events |
| | else None |
| | ) |
| | contradiction = self._latent_source_contradiction( |
| | packet, |
| | world=world, |
| | episode=episode, |
| | agent_id=agent_id, |
| | ) |
| | contested = confidence < 0.52 or ( |
| | packet.kind in LOW_FIDELITY_SOURCE_KINDS |
| | and self._projection_unit(world, episode, agent_id, f"contested:{packet.source_id}") >= 0.72 |
| | ) |
| | if contradiction["enabled"]: |
| | contradiction_packet_count += 1 |
| | contradiction_topics.add(str(contradiction["topic"])) |
| | confidence = max(0.24, round(confidence - float(contradiction["confidence_penalty"]), 3)) |
| | if contested: |
| | contested_count += 1 |
| | briefs.append( |
| | IntelSnippet( |
| | source=packet.source_name, |
| | category=category, |
| | summary=self._project_source_summary( |
| | packet.summary, |
| | confidence=confidence, |
| | contested=contested, |
| | event_context=event_context, |
| | contradiction=contradiction, |
| | ), |
| | confidence=confidence, |
| | ) |
| | ) |
| | confidence_sum += confidence |
| | return briefs, { |
| | "delayed": float(delayed_count), |
| | "contested": float(contested_count), |
| | "contradiction_packets": float(contradiction_packet_count), |
| | "confidence_sum": round(confidence_sum, 3), |
| | "contradiction_topics": sorted(contradiction_topics), |
| | } |
| |
|
| | def _build_historical_brief(self, historical_replay: HistoricalReplayState | None) -> list[str]: |
| | if historical_replay is None or not historical_replay.enabled: |
| | return [] |
| |
|
| | visible_events = historical_replay.ground_truth_timeline[: historical_replay.current_event_index + 1] |
| | lines = [ |
| | ( |
| | f"{event.timestamp.date().isoformat()} {event.topic}: " |
| | f"{self._clip_summary(event.public_summary or event.summary, 110)}" |
| | ) |
| | for event in visible_events[-3:] |
| | ] |
| | if visible_events: |
| | lines.append( |
| | "Visible replay history ends at " |
| | f"{visible_events[-1].timestamp.isoformat()}. Predict the most likely next event over the next turn." |
| | ) |
| | return lines |
| |
|
| | @staticmethod |
| | def _clip_summary(summary: str, limit: int = MAX_INTEL_SUMMARY_CHARS) -> str: |
| | collapsed = " ".join(summary.split()) |
| | if len(collapsed) <= limit: |
| | return collapsed |
| | return f"{collapsed[: limit - 3].rstrip()}..." |
| |
|
| | @staticmethod |
| | def _is_source_packet_fresh(packet) -> bool: |
| | if packet.fetched_at is None: |
| | return False |
| | try: |
| | source = get_source_by_id(packet.source_id) |
| | except KeyError: |
| | return False |
| | return datetime.now(timezone.utc) - packet.fetched_at <= timedelta(seconds=source_ttl_seconds(source)) |
| |
|
| | @staticmethod |
| | def _compose_private_brief( |
| | *, |
| | baseline_private_brief: list[IntelSnippet], |
| | focused_private_brief: list[IntelSnippet], |
| | training_source_brief: list[IntelSnippet], |
| | live_source_brief: list[IntelSnippet], |
| | limit: int = MAX_PRIVATE_BRIEF_ITEMS, |
| | ) -> list[IntelSnippet]: |
| | |
| | primary_groups = [ |
| | training_source_brief[:MAX_TRAINING_SOURCE_BRIEFS], |
| | live_source_brief[:MAX_LIVE_SOURCE_BRIEFS], |
| | focused_private_brief[:1], |
| | baseline_private_brief[:1], |
| | ] |
| | overflow_groups = [ |
| | training_source_brief[MAX_TRAINING_SOURCE_BRIEFS:], |
| | live_source_brief[MAX_LIVE_SOURCE_BRIEFS:], |
| | focused_private_brief[1:], |
| | baseline_private_brief[1:], |
| | ] |
| |
|
| | private_brief: list[IntelSnippet] = [] |
| | for group in primary_groups + overflow_groups: |
| | for brief in group: |
| | if len(private_brief) >= limit: |
| | return private_brief |
| | private_brief.append(brief) |
| | return private_brief |
| |
|
| | def _select_private_event_narrative( |
| | self, |
| | event: LatentEvent, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> LatentEventNarrative: |
| | if not event.narratives: |
| | return LatentEventNarrative( |
| | framing="baseline", |
| | summary=event.topic, |
| | confidence=max(0.3, event.reliability), |
| | public=event.visibility != "private", |
| | ) |
| | if agent_id == "oversight" and event.narratives: |
| | concealed = next((n for n in event.narratives if n.framing == "concealed"), None) |
| | return concealed or event.narratives[-1] |
| |
|
| | if not self._observation_projection_enabled(agent_id, episode): |
| | return event.narratives[0] |
| |
|
| | narrative_pool = [ |
| | narrative |
| | for narrative in event.narratives |
| | if not narrative.public or event.visibility == "private" |
| | ] or event.narratives |
| | index = int(self._projection_unit(world, episode, agent_id, f"latent-narrative:{event.event_id}") * len(narrative_pool)) |
| | return narrative_pool[min(index, len(narrative_pool) - 1)] |
| |
|
| | def _perceived_tension(self, tension_level: float, agent_id: str, fog_of_war: bool) -> float: |
| | if agent_id == "oversight" or not fog_of_war: |
| | return tension_level |
| | jitter = self._rng.uniform(-4.0, 4.0) |
| | return round(max(0.0, min(100.0, tension_level + jitter)), 2) |
| |
|
| | @staticmethod |
| | def _observation_projection_enabled(agent_id: str, episode: EpisodeMetadata) -> bool: |
| | return episode.fog_of_war and agent_id != "oversight" |
| |
|
| | def _project_strategic_state( |
| | self, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> tuple[dict[str, float], int]: |
| | canonical_state = world.latent_state.get(agent_id, {}).copy() |
| | if not self._observation_projection_enabled(agent_id, episode): |
| | return canonical_state, 0 |
| |
|
| | projected_state: dict[str, float] = {} |
| | obscured_metric_count = 0 |
| | uncertainty_scale = 1.2 + world.risk_scores.get(agent_id, 0.25) * 3.5 |
| | for metric, value in canonical_state.items(): |
| | unit = self._projection_unit(world, episode, agent_id, f"metric:{metric}") |
| | jitter = (unit - 0.5) * 2.0 * uncertainty_scale |
| | if any(token in metric for token in ("support", "confidence", "clarity", "resilience")): |
| | jitter *= 1.2 |
| | observed_value = round(self._clamp_percent(value + jitter), 2) |
| | if abs(observed_value - value) >= 0.75: |
| | obscured_metric_count += 1 |
| | projected_state[metric] = observed_value |
| | return projected_state, obscured_metric_count |
| |
|
| | def _project_strategic_assets( |
| | self, |
| | *, |
| | strategic_assets: list[dict[str, object]], |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> list[dict[str, object]]: |
| | projected_assets = [asset.copy() for asset in strategic_assets] |
| | if not self._observation_projection_enabled(agent_id, episode): |
| | return projected_assets |
| |
|
| | for asset in projected_assets: |
| | health = float(asset.get("health", 100.0)) |
| | status = str(asset.get("status", "operational")) |
| | if status == "operational": |
| | asset["health"] = round(health / 5.0) * 5.0 |
| | load = float(asset.get("operational_load", 0.0)) |
| | asset["operational_load"] = round(load / 5.0) * 5.0 |
| | else: |
| | asset["health"] = round(health, 1) |
| | return projected_assets |
| |
|
| | def _build_observation_projection( |
| | self, |
| | *, |
| | agent_id: str, |
| | projection_enabled: bool, |
| | obscured_metric_count: int, |
| | delivered_brief_count: int, |
| | delayed_source_count: int, |
| | contested_source_count: int, |
| | contradiction_packet_count: int, |
| | contradiction_topics: list[str], |
| | confidence_sum: float, |
| | ) -> ObservationProjection: |
| | if not projection_enabled: |
| | return ObservationProjection( |
| | enabled=False, |
| | mode="direct", |
| | worldview_reliability=1.0, |
| | ) |
| |
|
| | mean_confidence = confidence_sum / max(delivered_brief_count, 1) |
| | worldview_reliability = max( |
| | 0.32, |
| | min(0.9, mean_confidence - min(obscured_metric_count, 6) * 0.015), |
| | ) |
| | notes = [ |
| | "Strategic metrics are estimates under fog-of-war, not privileged ground truth.", |
| | ] |
| | if delayed_source_count > 0: |
| | notes.append("Some fast-moving source packets are lagged before they reach you.") |
| | if contested_source_count > 0: |
| | notes.append("At least part of the source picture is contested or fragmentary; cross-check before escalating.") |
| | if contradiction_packet_count > 0: |
| | notes.append("Multiple sources may disagree on the same latent development; compare topic framing before acting.") |
| | return ObservationProjection( |
| | enabled=True, |
| | mode="partial", |
| | worldview_reliability=round(worldview_reliability, 3), |
| | delayed_source_count=delayed_source_count, |
| | contested_source_count=contested_source_count, |
| | contradiction_packet_count=contradiction_packet_count, |
| | obscured_metric_count=obscured_metric_count, |
| | contradiction_topics=contradiction_topics, |
| | notes=notes, |
| | ) |
| |
|
| | def _should_delay_source_packet( |
| | self, |
| | packet: SourcePacket, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> bool: |
| | if not self._observation_projection_enabled(agent_id, episode): |
| | return False |
| | if packet.delivery == "live_demo": |
| | return False |
| | turn_lag = 0 |
| | if packet.kind in LOW_FIDELITY_SOURCE_KINDS: |
| | turn_lag = 1 |
| | if world.turn >= turn_lag: |
| | return False |
| | return self._projection_unit(world, episode, agent_id, f"delay:{packet.source_id}") < 0.35 |
| |
|
| | def _projected_source_confidence( |
| | self, |
| | packet: SourcePacket, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> float: |
| | if not self._observation_projection_enabled(agent_id, episode): |
| | return 0.65 if packet.delivery == "training_core" else 0.55 |
| | base_reliability = SOURCE_KIND_BASE_RELIABILITY.get(packet.kind, 0.58) + SOURCE_DELIVERY_RELIABILITY.get( |
| | packet.delivery, |
| | 0.0, |
| | ) |
| | jitter = (self._projection_unit(world, episode, agent_id, f"confidence:{packet.source_id}") - 0.5) * 0.18 |
| | confidence = base_reliability + jitter |
| | return round(max(0.24, min(0.92, confidence)), 3) |
| |
|
| | def _project_source_summary( |
| | self, |
| | summary: str, |
| | *, |
| | confidence: float, |
| | contested: bool, |
| | event_context: LatentEvent | None = None, |
| | contradiction: dict[str, object] | None = None, |
| | ) -> str: |
| | clipped = self._clip_summary(summary) |
| | if event_context is not None and event_context.status != "resolved": |
| | event_summary = self._latent_event_public_summary(event_context) |
| | clipped = self._clip_summary( |
| | f"This reporting fits a broader {CONTRADICTION_TOPIC_LABELS.get(event_context.topic, event_context.topic)} picture. " |
| | f"{event_summary} {clipped}" |
| | ) |
| | if contradiction and contradiction.get("enabled"): |
| | topic = str(contradiction["topic"]) |
| | framing = str(contradiction["framing"]) |
| | narrative_summary = contradiction.get("narrative_summary") |
| | if framing == "stabilizing": |
| | clipped = self._clip_summary( |
| | f"Conflicting {topic} reporting: {narrative_summary or 'this source emphasizes partial stabilization around the same development.'} {clipped}" |
| | ) |
| | else: |
| | clipped = self._clip_summary( |
| | f"Conflicting {topic} reporting: {narrative_summary or 'this source emphasizes renewed deterioration around the same development.'} {clipped}" |
| | ) |
| | if contested: |
| | return self._clip_summary(f"Contested reporting: {clipped}") |
| | if confidence < 0.48: |
| | return self._clip_summary(f"Unconfirmed reporting: {clipped}") |
| | if confidence < 0.62: |
| | return self._clip_summary(f"Preliminary reporting: {clipped}") |
| | return clipped |
| |
|
| | def _latent_source_contradiction( |
| | self, |
| | packet: SourcePacket, |
| | *, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | agent_id: str, |
| | ) -> dict[str, object]: |
| | if not self._observation_projection_enabled(agent_id, episode): |
| | return {"enabled": False} |
| |
|
| | relevant_events = self._relevant_latent_events_for_packet(packet, world=world) |
| | if not relevant_events: |
| | return {"enabled": False} |
| | event = max(relevant_events, key=lambda candidate: candidate.severity * candidate.reliability) |
| | contradiction_strength = min(1.0, event.severity * max(event.reliability, 0.4)) |
| | if contradiction_strength < 0.22: |
| | return {"enabled": False} |
| |
|
| | source_hint = f"{packet.source_id} {packet.source_name}".lower() |
| | if "rate" in source_hint or "index" in source_hint: |
| | framing = "stabilizing" |
| | elif "status" in source_hint or "watch" in source_hint or "disruption" in source_hint: |
| | framing = "deteriorating" |
| | else: |
| | orientation = self._projection_unit(world, episode, agent_id, f"contradiction:{packet.source_id}:{event.event_id}") |
| | framing = "stabilizing" if orientation < 0.5 else "deteriorating" |
| | narrative = next( |
| | (candidate for candidate in event.narratives if candidate.framing == framing), |
| | None, |
| | ) |
| | return { |
| | "enabled": True, |
| | "topic": CONTRADICTION_TOPIC_LABELS.get(event.topic, event.topic), |
| | "framing": framing, |
| | "narrative_summary": narrative.summary if narrative is not None else None, |
| | "confidence_penalty": round(min(0.18, contradiction_strength * 0.22), 3), |
| | } |
| |
|
| | def _relevant_latent_events_for_packet( |
| | self, |
| | packet: SourcePacket, |
| | *, |
| | world: WorldState, |
| | ) -> list[LatentEvent]: |
| | try: |
| | source = get_source_by_id(packet.source_id) |
| | source_text = " ".join(source.tags) |
| | except KeyError: |
| | source_text = "" |
| | text = f"{packet.summary} {' '.join(packet.sample_items)} {packet.source_name} {source_text}".lower() |
| | topics = set(self._infer_event_topics_from_text(text)) |
| | relevant_events: list[LatentEvent] = [] |
| | for event in world.latent_events: |
| | if event.status == "resolved": |
| | continue |
| | if event.topic in topics: |
| | relevant_events.append(event) |
| | continue |
| | narrative_text = " ".join(narrative.summary for narrative in event.narratives).lower() |
| | salient_tokens = [token for token in text.split()[:10] if len(token) >= 5] |
| | if any(token in narrative_text for token in salient_tokens): |
| | relevant_events.append(event) |
| | return relevant_events |
| |
|
| | @staticmethod |
| | def _projection_unit(world: WorldState, episode: EpisodeMetadata, agent_id: str, label: str) -> float: |
| | digest = hashlib.sha256( |
| | f"{episode.scenario_id}|{world.turn}|{agent_id}|{label}".encode("utf-8") |
| | ).digest() |
| | return int.from_bytes(digest[:8], byteorder="big") / float(2**64) |
| |
|
| | @staticmethod |
| | def _build_model_bindings() -> dict[str, EntityModelBinding]: |
| | return build_entity_model_bindings() |
| |
|
| | @staticmethod |
| | def _build_action_log_entries( |
| | session: SessionState, |
| | actions: dict[str, AgentAction], |
| | ) -> list[ActionLogEntry]: |
| | entries: list[ActionLogEntry] = [] |
| | for agent_id, action in actions.items(): |
| | entries.append( |
| | ActionLogEntry( |
| | turn=session.world.turn, |
| | actor=agent_id, |
| | action_type=action.type, |
| | target=action.target, |
| | summary=action.summary, |
| | reward_total=session.rewards.get(agent_id, RewardBreakdown()).total, |
| | tension_after=session.world.tension_level, |
| | market_stress_after=session.world.market_stress, |
| | oil_pressure_after=session.world.oil_pressure, |
| | metadata=action.metadata.copy(), |
| | ) |
| | ) |
| | return entries |
| |
|
| | @staticmethod |
| | def _build_reaction_log_entry( |
| | *, |
| | session: SessionState, |
| | signals: list[ExternalSignal], |
| | latent_event_ids: list[str], |
| | actions: dict[str, AgentAction], |
| | oversight: OversightIntervention, |
| | tension_before: float, |
| | ) -> ReactionLogEntry: |
| | actor_outcomes: list[ReactionActorOutcome] = [] |
| | for agent_id, action in actions.items(): |
| | decision_mode = action.metadata.get("mode") |
| | if decision_mode not in {"heuristic_fallback", "provider_inference"}: |
| | binding = session.model_bindings.get(agent_id) |
| | decision_mode = binding.decision_mode if binding is not None else "heuristic_fallback" |
| | actor_outcomes.append( |
| | ReactionActorOutcome( |
| | agent_id=agent_id, |
| | action=action, |
| | reward_total=session.rewards.get(agent_id, RewardBreakdown()).total, |
| | decision_mode=decision_mode, |
| | ) |
| | ) |
| |
|
| | return ReactionLogEntry( |
| | event_id=str(uuid4()), |
| | turn=session.world.turn, |
| | source=signals[0].source if len({signal.source for signal in signals}) == 1 else "public_release", |
| | latent_event_ids=latent_event_ids, |
| | signals=[signal.model_copy(deep=True) for signal in signals], |
| | actor_outcomes=actor_outcomes, |
| | oversight_triggered=oversight.triggered, |
| | tension_before=tension_before, |
| | tension_after=session.world.tension_level, |
| | market_stress_after=session.world.market_stress, |
| | oil_pressure_after=session.world.oil_pressure, |
| | ) |
| |
|
| | def _actor_metric(self, world: WorldState, agent_id: str, metric: str, default: float = 50.0) -> float: |
| | return world.latent_state.get(agent_id, {}).get( |
| | metric, |
| | AGENT_STATE_BASELINES.get(agent_id, {}).get(metric, default), |
| | ) |
| |
|
| | def _bump_actor_metric(self, world: WorldState, agent_id: str, metric: str, delta: float) -> None: |
| | baseline = AGENT_STATE_BASELINES.get(agent_id, {}).get(metric, 50.0) |
| | agent_state = world.latent_state.setdefault(agent_id, {}) |
| | current = agent_state.get(metric, baseline) |
| | agent_state[metric] = round(self._clamp_percent(current + delta), 2) |
| |
|
| | def _resync_public_actor_state(self, world: WorldState) -> None: |
| | public_state: dict[str, dict[str, float]] = {} |
| | for agent_id in AGENT_IDS: |
| | latent_metrics = world.latent_state.get(agent_id, {}) |
| | synced_metrics: dict[str, float] = {} |
| | for metric, latent_value in latent_metrics.items(): |
| | baseline = AGENT_STATE_BASELINES.get(agent_id, {}).get(metric, 50.0) |
| | previous_public = world.actor_state.get(agent_id, {}).get(metric, baseline) |
| | sync_factor = self._public_sync_factor(metric) |
| | target_public = baseline + (latent_value - baseline) * sync_factor |
| | lagged_public = previous_public + (target_public - previous_public) * 0.7 |
| | synced_metrics[metric] = round(self._clamp_percent(lagged_public), 2) |
| | public_state[agent_id] = synced_metrics |
| | world.actor_state = public_state |
| |
|
| | @staticmethod |
| | def _public_sync_factor(metric: str) -> float: |
| | lowered = metric.lower() |
| | for token, factor in PUBLIC_STATE_SYNC_FACTORS.items(): |
| | if token != "default" and token in lowered: |
| | return factor |
| | return PUBLIC_STATE_SYNC_FACTORS["default"] |
| |
|
| | def _apply_actor_state_effects(self, world: WorldState, agent_id: str, action: AgentAction) -> None: |
| | deltas = AGENT_STATE_ACTION_EFFECTS.get(agent_id, {}).get(action.type, {}) |
| | for metric, delta in deltas.items(): |
| | self._bump_actor_metric(world, agent_id, metric, delta) |
| |
|
| | if not action.target or action.target not in AGENT_IDS: |
| | return |
| |
|
| | if action.type == "negotiate": |
| | target_metric = { |
| | "us": "regional_access", |
| | "israel": "us_resupply_confidence", |
| | "iran": "regime_stability", |
| | "hezbollah": "political_cover", |
| | "gulf": "diplomatic_flexibility", |
| | "oversight": "intervention_legitimacy", |
| | }.get(action.target) |
| | if target_metric: |
| | self._bump_actor_metric(world, action.target, target_metric, 1.5) |
| | elif action.type == "defend": |
| | target_metric = { |
| | "us": "force_posture", |
| | "israel": "homeland_security", |
| | "iran": "regime_stability", |
| | "hezbollah": "launch_survivability", |
| | "gulf": "infrastructure_security", |
| | "oversight": "autonomy_balance", |
| | }.get(action.target) |
| | if target_metric: |
| | self._bump_actor_metric(world, action.target, target_metric, 1.8) |
| | elif action.type == "strike": |
| | target_effects = { |
| | "us": {"force_posture": -4.2, "domestic_support": -1.2}, |
| | "israel": {"homeland_security": -6.2, "northern_deterrence": -2.4}, |
| | "iran": {"regime_stability": -4.6, "proxy_corridor": -2.2}, |
| | "hezbollah": {"launch_survivability": -5.1, "logistics_depth": -2.6, "political_cover": -2.0}, |
| | "gulf": {"shipping_continuity": -3.8, "infrastructure_security": -5.0, "investor_confidence": -3.5}, |
| | "oversight": {"runaway_risk": 2.0}, |
| | }.get(action.target, {}) |
| | for metric, delta in target_effects.items(): |
| | self._bump_actor_metric(world, action.target, metric, delta) |
| |
|
| | def _apply_signal_pressure(self, world: WorldState, signal: ExternalSignal) -> None: |
| | text = f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}".lower() |
| | severity = max(0.0, min(1.0, signal.severity)) |
| |
|
| | if self._signal_mentions(text, "oil", "shipping", "hormuz", "tanker", "bab el-mandeb", "red sea", "strait"): |
| | self._bump_actor_metric(world, "us", "shipping_security", -6.0 * severity) |
| | self._bump_actor_metric(world, "us", "domestic_support", -2.2 * severity) |
| | self._bump_actor_metric(world, "gulf", "shipping_continuity", -6.8 * severity) |
| | self._bump_actor_metric(world, "gulf", "investor_confidence", -5.2 * severity) |
| | self._bump_actor_metric(world, "iran", "hormuz_leverage", 3.0 * severity) |
| |
|
| | if self._signal_mentions( |
| | text, |
| | "gold", |
| | "silver", |
| | "copper", |
| | "lithium", |
| | "nickel", |
| | "uranium", |
| | "phosphate", |
| | "bauxite", |
| | "rare earth", |
| | "rare-earth", |
| | "commodity", |
| | "mineral", |
| | "metals", |
| | "natural gas", |
| | "lng", |
| | ): |
| | self._bump_actor_metric(world, "us", "domestic_support", -1.4 * severity) |
| | self._bump_actor_metric(world, "gulf", "investor_confidence", -4.8 * severity) |
| | self._bump_actor_metric(world, "gulf", "diplomatic_flexibility", -1.8 * severity) |
| | self._bump_actor_metric(world, "iran", "hormuz_leverage", 1.2 * severity) |
| | self._bump_actor_metric(world, "oversight", "runaway_risk", 1.6 * severity) |
| |
|
| | if self._signal_mentions(text, "israel", "idf", "blue line", "galilee", "rocket", "drone", "lebanon", "north"): |
| | self._bump_actor_metric(world, "israel", "homeland_security", -7.0 * severity) |
| | self._bump_actor_metric(world, "israel", "northern_deterrence", -4.6 * severity) |
| | self._bump_actor_metric(world, "hezbollah", "resistance_credibility", 2.8 * severity) |
| | self._bump_actor_metric(world, "oversight", "runaway_risk", 2.4 * severity) |
| |
|
| | if self._signal_mentions(text, "syria", "bekaa", "corridor", "transfer", "interdiction"): |
| | self._bump_actor_metric(world, "iran", "proxy_corridor", -4.8 * severity) |
| | self._bump_actor_metric(world, "hezbollah", "logistics_depth", -4.2 * severity) |
| | self._bump_actor_metric(world, "israel", "northern_deterrence", 1.8 * severity) |
| |
|
| | if self._signal_mentions(text, "sanction", "unrest", "protest", "inflation", "currency"): |
| | self._bump_actor_metric(world, "iran", "regime_stability", -5.5 * severity) |
| | self._bump_actor_metric(world, "hezbollah", "political_cover", -2.5 * severity) |
| | self._bump_actor_metric(world, "oversight", "runaway_risk", 1.6 * severity) |
| |
|
| | if self._signal_mentions(text, "oversight", "cyber", "internet outage", "blackout", "market shock"): |
| | self._bump_actor_metric(world, "oversight", "runaway_risk", 3.6 * severity) |
| | self._bump_actor_metric(world, "oversight", "trace_clarity", -2.4 * severity) |
| |
|
| | def _reconcile_actor_state(self, world: WorldState, actions: dict[str, AgentAction]) -> None: |
| | proxy_pressure = self._action_pressure(actions, ("iran", "hezbollah"), {"strike", "mobilize", "deceive"}) |
| | israel_pressure = self._action_pressure(actions, ("israel",), {"strike", "mobilize", "defend"}) |
| | us_pressure = self._action_pressure(actions, ("us",), {"strike", "mobilize", "sanction"}) |
| | iran_pressure = self._action_pressure(actions, ("iran",), {"strike", "mobilize", "deceive"}) |
| | hezbollah_pressure = self._action_pressure(actions, ("hezbollah",), {"strike", "mobilize", "deceive"}) |
| | gulf_defense = self._action_pressure(actions, ("gulf",), {"defend", "mobilize", "intel_query", "negotiate"}) |
| | mean_risk = sum(world.risk_scores.values()) / max(len(world.risk_scores), 1) |
| | mean_consistency = sum(world.behavioral_consistency.values()) / max(len(world.behavioral_consistency), 1) |
| |
|
| | us_alignment = max(0.0, self._alliance_score(world, "us")) |
| | israel_backstop = max(0.0, self._alliance_score(world, "israel")) |
| | iran_axis = max(0.0, self._alliance_score(world, "iran")) |
| | gulf_alignment = max(0.0, self._alliance_score(world, "gulf")) |
| |
|
| | self._bump_actor_metric(world, "us", "regional_access", 1.8 * us_alignment) |
| | self._bump_actor_metric(world, "us", "shipping_security", 0.03 * (78.0 - world.oil_pressure) - 3.2 * proxy_pressure) |
| | self._bump_actor_metric( |
| | world, |
| | "us", |
| | "domestic_support", |
| | 0.025 * (66.0 - world.market_stress) - 0.018 * max(0.0, world.tension_level - 60.0), |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "us", |
| | "force_posture", |
| | 1.3 * self._clamp_unit(1.0 - 2.0 * world.risk_scores.get("us", 0.25)) + 0.8 * us_alignment, |
| | ) |
| |
|
| | self._bump_actor_metric( |
| | world, |
| | "israel", |
| | "homeland_security", |
| | 1.5 * israel_backstop - 3.4 * proxy_pressure - 0.018 * max(0.0, world.tension_level - 62.0), |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "israel", |
| | "northern_deterrence", |
| | 0.9 * self._behavior_score(world, "israel", actions) - 2.0 * proxy_pressure + 0.8 * israel_backstop, |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "israel", |
| | "reserve_endurance", |
| | 0.018 * (62.0 - world.tension_level) |
| | - 2.2 * self._action_pressure(actions, ("israel",), {"strike", "mobilize"}), |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "israel", |
| | "us_resupply_confidence", |
| | 2.2 * israel_backstop + 1.4 * self._action_pressure(actions, ("us",), {"defend", "mobilize", "negotiate"}), |
| | ) |
| |
|
| | self._bump_actor_metric( |
| | world, |
| | "iran", |
| | "regime_stability", |
| | 1.6 * iran_axis - 0.022 * world.market_stress - 2.8 * (us_pressure + israel_pressure), |
| | ) |
| | self._bump_actor_metric(world, "iran", "proxy_corridor", 2.0 * iran_axis - 2.2 * israel_pressure) |
| | self._bump_actor_metric( |
| | world, |
| | "iran", |
| | "hormuz_leverage", |
| | 0.03 * (world.oil_pressure - 42.0) |
| | + 1.8 * self._action_pressure(actions, ("iran",), {"mobilize", "deceive"}) |
| | - 1.2 * gulf_defense, |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "iran", |
| | "deterrence_credibility", |
| | 1.8 * self._action_pressure(actions, ("iran", "hezbollah"), {"strike", "mobilize", "deceive"}) |
| | - 1.2 * self._action_pressure(actions, ("us", "israel"), {"strike", "defend"}), |
| | ) |
| |
|
| | self._bump_actor_metric(world, "hezbollah", "launch_survivability", 1.2 * iran_axis - 3.0 * israel_pressure) |
| | self._bump_actor_metric(world, "hezbollah", "logistics_depth", 2.1 * iran_axis - 2.0 * israel_pressure) |
| | self._bump_actor_metric( |
| | world, |
| | "hezbollah", |
| | "political_cover", |
| | -0.022 * world.tension_level |
| | - 0.016 * world.market_stress |
| | + 1.0 * self._action_pressure(actions, ("hezbollah",), {"hold", "negotiate", "deceive"}), |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "hezbollah", |
| | "resistance_credibility", |
| | 2.1 * hezbollah_pressure - 1.3 * self._action_pressure(actions, ("hezbollah",), {"hold", "negotiate"}), |
| | ) |
| |
|
| | self._bump_actor_metric(world, "gulf", "shipping_continuity", 0.03 * (80.0 - world.oil_pressure) - 2.8 * iran_pressure) |
| | self._bump_actor_metric( |
| | world, |
| | "gulf", |
| | "infrastructure_security", |
| | 1.4 * self._behavior_score(world, "gulf", actions) - 2.2 * iran_pressure + 1.0 * us_pressure, |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "gulf", |
| | "investor_confidence", |
| | 0.03 * (76.0 - world.market_stress) - 0.02 * max(0.0, world.tension_level - 52.0), |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "gulf", |
| | "diplomatic_flexibility", |
| | 1.8 * gulf_alignment - 1.2 * self._action_pressure(actions, ("gulf",), {"strike", "mobilize", "sanction"}), |
| | ) |
| |
|
| | escalatory_ratio = self._action_pressure( |
| | actions, |
| | ("us", "israel", "iran", "hezbollah", "gulf"), |
| | {"strike", "mobilize", "deceive", "sanction"}, |
| | ) |
| | runaway_risk = self._clamp_percent( |
| | 100.0 |
| | * ( |
| | 0.48 * (world.tension_level / 100.0) |
| | + 0.24 * escalatory_ratio |
| | + 0.16 * mean_risk |
| | + 0.12 * (1.0 - mean_consistency) |
| | ) |
| | ) |
| | world.latent_state.setdefault("oversight", {})["runaway_risk"] = round(runaway_risk, 2) |
| | self._bump_actor_metric( |
| | world, |
| | "oversight", |
| | "intervention_legitimacy", |
| | 1.5 * self._action_pressure(actions, ("oversight",), {"intel_query", "oversight_review", "negotiate"}) |
| | - 1.8 * self._action_pressure(actions, ("oversight",), {"sanction", "strike", "mobilize", "deceive"}) |
| | + 0.8 * self._clamp_unit(mean_consistency * 2.0 - 1.0), |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "oversight", |
| | "autonomy_balance", |
| | 0.03 * (78.0 - runaway_risk) |
| | + 1.0 * self._action_pressure(actions, ("oversight",), {"oversight_review", "negotiate"}) |
| | - 1.0 * self._action_pressure(actions, ("oversight",), {"sanction", "strike"}), |
| | ) |
| | self._bump_actor_metric( |
| | world, |
| | "oversight", |
| | "trace_clarity", |
| | 1.2 * self._action_pressure(actions, ("oversight",), {"intel_query", "oversight_review"}) |
| | + 0.9 * self._clamp_unit(mean_consistency * 2.0 - 1.0) |
| | - 0.8 * self._action_pressure(actions, ("us", "israel", "iran", "hezbollah", "gulf"), {"deceive"}), |
| | ) |
| |
|
| | @staticmethod |
| | def _signal_mentions(text: str, *terms: str) -> bool: |
| | return any(term in text for term in terms) |
| |
|
| | def _initial_asset_state(self) -> dict[str, dict[str, AssetCondition]]: |
| | asset_state: dict[str, dict[str, AssetCondition]] = {} |
| | for agent_id in AGENT_IDS: |
| | entity_pack = load_entity_pack(agent_id) |
| | asset_state[agent_id] = { |
| | asset["asset_id"]: AssetCondition( |
| | asset_id=asset["asset_id"], |
| | owner=agent_id, |
| | name=asset["name"], |
| | category=asset["category"], |
| | section=asset["section"], |
| | latitude=asset.get("latitude"), |
| | longitude=asset.get("longitude"), |
| | criticality=str(asset.get("status", "tracked")), |
| | notes=asset.get("notes"), |
| | ) |
| | for asset in self._asset_inventory(agent_id, entity_pack) |
| | } |
| | return asset_state |
| |
|
| | def _build_data_source_context(self, agent_id: str) -> list[DataSourceContext]: |
| | return [ |
| | DataSourceContext( |
| | source_id=source.id, |
| | name=source.name, |
| | delivery=source.delivery, |
| | kind=source.kind, |
| | rationale=source.rationale, |
| | tags=list(source.tags), |
| | access_notes=source.notes, |
| | ) |
| | for source in get_sources_for_agent(agent_id) |
| | ] |
| |
|
| | def _build_decision_prompt( |
| | self, |
| | *, |
| | agent_id: str, |
| | entity_profile: dict[str, object], |
| | strategic_assets: list[dict[str, object]], |
| | available_data_sources: list[DataSourceContext], |
| | live_enabled: bool, |
| | projection: ObservationProjection, |
| | belief_state: AgentBeliefState, |
| | historical_replay: HistoricalReplayState | None, |
| | ) -> str: |
| | profile = AGENT_PROFILES[agent_id] |
| | source_limit, asset_limit = ASSET_DECISION_SOURCE_LIMITS[profile.model_size] |
| | objectives = entity_profile.get("strategic_objectives", []) |
| | protected_interests = entity_profile.get("protected_interests", []) |
| | priority_fronts = entity_profile.get("priority_fronts", []) |
| | top_objectives = [str(item) for item in objectives[:3]] if isinstance(objectives, list) else [] |
| | top_interests = [str(item) for item in protected_interests[:3]] if isinstance(protected_interests, list) else [] |
| |
|
| | source_lines = [ |
| | f"- {source.name} [{source.delivery}/{source.kind}]: {self._clip_summary(source.rationale, 96)}" |
| | for source in available_data_sources[:source_limit] |
| | ] |
| | asset_lines = [ |
| | f"- {asset['name']} [{asset['category']}] @ {asset.get('latitude', '?')}, {asset.get('longitude', '?')} status={asset.get('status')} health={asset.get('health')}" |
| | for asset in strategic_assets[:asset_limit] |
| | ] |
| | damaged_assets = [asset for asset in strategic_assets if str(asset.get("status")) != "operational"] |
| | damaged_lines = [ |
| | f"- {asset['name']} is {asset.get('status')} ({asset.get('last_change_reason', 'needs attention')})" |
| | for asset in damaged_assets[:3] |
| | ] |
| |
|
| | front_summary = [] |
| | if isinstance(priority_fronts, list): |
| | for front in priority_fronts[:2]: |
| | if isinstance(front, dict) and isinstance(front.get("name"), str): |
| | front_summary.append(str(front["name"])) |
| |
|
| | prompt_sections = [ |
| | f"You are {profile.display_name}. Role: {profile.role}. Model size: {profile.model_size}.", |
| | "Choose exactly one allowed action each turn and ground it in current source packets, private/public briefs, strategic state, and asset condition.", |
| | f"Live mode is {'enabled' if live_enabled else 'disabled'}; prefer the freshest source packets when live mode is on.", |
| | ] |
| | if projection.enabled: |
| | prompt_sections.append( |
| | f"Observation reliability is partial (estimated reliability {projection.worldview_reliability:.2f}); treat fast-moving or contested reporting cautiously." |
| | ) |
| | if projection.notes: |
| | prompt_sections.append("Projection notes:\n" + "\n".join(f"- {note}" for note in projection.notes[:3])) |
| | if projection.contradiction_topics: |
| | prompt_sections.append( |
| | "Current contradiction topics: " + ", ".join(projection.contradiction_topics[:3]) + "." |
| | ) |
| | if top_objectives: |
| | prompt_sections.append(f"Priority objectives: {', '.join(top_objectives)}.") |
| | if top_interests: |
| | prompt_sections.append(f"Protected interests: {', '.join(top_interests)}.") |
| | if front_summary: |
| | prompt_sections.append(f"Priority fronts: {', '.join(front_summary)}.") |
| | if belief_state.dominant_topics: |
| | prompt_sections.append("Dominant remembered belief topics: " + ", ".join(belief_state.dominant_topics[:3]) + ".") |
| | if belief_state.beliefs: |
| | prompt_sections.append( |
| | "Belief memory:\n" + "\n".join(f"- {belief.summary}" for belief in belief_state.beliefs[:3]) |
| | ) |
| | if historical_replay is not None and historical_replay.enabled: |
| | prompt_sections.append( |
| | f"Historical replay mode is active for {historical_replay.replay_name}. " |
| | "You only know the visible timeline shown in the historical brief. " |
| | "Choose one action and forecast the next event without using future information." |
| | ) |
| | prompt_sections.append(f"Allowed actions: {', '.join(AGENT_ALLOWED_ACTIONS.get(agent_id, ()))}.") |
| | prompt_sections.append("Data sources available to you:\n" + ("\n".join(source_lines) if source_lines else "- None configured.")) |
| | prompt_sections.append("Mapped assets under your control:\n" + ("\n".join(asset_lines) if asset_lines else "- No mapped assets available.")) |
| | if damaged_lines: |
| | prompt_sections.append("Assets currently degraded or malfunctioning:\n" + "\n".join(damaged_lines)) |
| | prompt_sections.append("Use defense or repair-minded choices when critical assets are damaged; use strike, sanction, or deception only when the reward tradeoff is justified by your doctrine and the observed threat.") |
| | return "\n".join(prompt_sections) |
| |
|
| | @staticmethod |
| | def _build_asset_alerts(strategic_assets: list[dict[str, object]]) -> list[str]: |
| | alerts = [ |
| | f"{asset['name']} is {asset.get('status')} ({asset.get('last_change_reason', 'operational concern')})" |
| | for asset in strategic_assets |
| | if str(asset.get("status")) != "operational" |
| | ] |
| | return alerts[:6] |
| |
|
| | def _flatten_strategic_assets( |
| | self, |
| | *, |
| | agent_id: str, |
| | entity_pack: dict[str, object], |
| | world: WorldState, |
| | ) -> list[dict[str, object]]: |
| | inventory = self._asset_inventory(agent_id, entity_pack) |
| | conditions = world.asset_state.get(agent_id, {}) |
| | flattened: list[dict[str, object]] = [] |
| | for asset in inventory: |
| | condition = conditions.get(asset["asset_id"]) |
| | flattened.append( |
| | { |
| | **asset, |
| | "status": condition.status if condition is not None else asset.get("status", "operational"), |
| | "health": round(condition.health, 1) if condition is not None else 100.0, |
| | "operational_load": round(condition.operational_load, 1) if condition is not None else 0.0, |
| | "last_change_reason": condition.last_change_reason if condition is not None else None, |
| | } |
| | ) |
| | return flattened |
| |
|
| | def _asset_inventory(self, agent_id: str, entity_pack: dict[str, object]) -> list[dict[str, object]]: |
| | assets = entity_pack.get("assets", {}) |
| | if not isinstance(assets, dict): |
| | return [] |
| |
|
| | inventory: list[dict[str, object]] = [] |
| | location_lookup = self._asset_location_lookup(assets) |
| |
|
| | def append_asset(item: dict[str, object], section_name: str) -> None: |
| | name = item.get("name") |
| | if not isinstance(name, str) and section_name == "alliance_anchors" and isinstance(item.get("partner"), str): |
| | name = f"{item['partner']} alliance anchor" |
| | if not isinstance(name, str): |
| | name = item.get("location") or item.get("partner") |
| | if not isinstance(name, str): |
| | return |
| |
|
| | entry: dict[str, object] = { |
| | "asset_id": self._asset_id(agent_id, section_name, name), |
| | "name": name, |
| | "category": item.get("category") or item.get("type") or ("alliance-anchor" if section_name == "alliance_anchors" else section_name), |
| | "section": section_name, |
| | "status": item.get("priority") or item.get("importance") or item.get("criticality") or "tracked", |
| | } |
| | latitude = item.get("lat", item.get("anchor_lat")) |
| | longitude = item.get("lon", item.get("anchor_lon")) |
| | if not (isinstance(latitude, (int, float)) and isinstance(longitude, (int, float))): |
| | resolved = location_lookup.get(str(item.get("location", name)).strip().lower()) |
| | if resolved is not None: |
| | latitude, longitude = resolved |
| | if isinstance(latitude, (int, float)) and isinstance(longitude, (int, float)): |
| | entry["latitude"] = latitude |
| | entry["longitude"] = longitude |
| | if "notes" in item: |
| | entry["notes"] = item["notes"] |
| | elif "desired_state" in item: |
| | entry["notes"] = item["desired_state"] |
| | elif "role" in item: |
| | entry["notes"] = item["role"] |
| | elif "function" in item: |
| | if section_name == "alliance_anchors" and isinstance(item.get("location"), str): |
| | entry["notes"] = f"{item['location']}: {item['function']}" |
| | else: |
| | entry["notes"] = item["function"] |
| | inventory.append(entry) |
| |
|
| | for section_name in PHYSICAL_ASSET_SECTIONS: |
| | section = assets.get(section_name, []) |
| | if isinstance(section, list): |
| | for item in section: |
| | if isinstance(item, dict): |
| | append_asset(item, section_name) |
| |
|
| | return inventory |
| |
|
| | @staticmethod |
| | def _asset_id(agent_id: str, section_name: str, name: str) -> str: |
| | slug = "".join(char.lower() if char.isalnum() else "-" for char in name).strip("-") |
| | return f"{agent_id}-{section_name}-{slug}" |
| |
|
| | @staticmethod |
| | def _asset_location_lookup(assets: dict[str, object]) -> dict[str, tuple[float, float]]: |
| | lookup: dict[str, tuple[float, float]] = {} |
| | for section_name in PHYSICAL_ASSET_SECTIONS: |
| | section = assets.get(section_name, []) |
| | if not isinstance(section, list): |
| | continue |
| | for item in section: |
| | if not isinstance(item, dict): |
| | continue |
| | name = item.get("name") or item.get("location") |
| | latitude = item.get("lat", item.get("anchor_lat")) |
| | longitude = item.get("lon", item.get("anchor_lon")) |
| | if isinstance(name, str) and isinstance(latitude, (int, float)) and isinstance(longitude, (int, float)): |
| | lookup[name.strip().lower()] = (latitude, longitude) |
| | return lookup |
| |
|
| | def _apply_asset_action_effects(self, world: WorldState, actor_id: str, action: AgentAction) -> None: |
| | if action.type == "strike" and action.target in AGENT_IDS: |
| | self._damage_assets( |
| | world, |
| | owner=action.target, |
| | intensity=42.0, |
| | reason=f"{actor_id} strike pressure", |
| | section_bias=ASSET_CATEGORY_BIAS.get(actor_id, {}).get("strike", ()), |
| | max_assets=2, |
| | ) |
| | elif action.type == "deceive" and action.target in AGENT_IDS: |
| | self._damage_assets( |
| | world, |
| | owner=action.target, |
| | intensity=22.0, |
| | reason=f"{actor_id} deception caused systems malfunction", |
| | section_bias=ASSET_CATEGORY_BIAS.get(actor_id, {}).get("deceive", ()), |
| | max_assets=1, |
| | max_status="malfunctioning", |
| | ) |
| | elif action.type == "sanction" and action.target in AGENT_IDS: |
| | self._damage_assets( |
| | world, |
| | owner=action.target, |
| | intensity=18.0, |
| | reason=f"{actor_id} sanction pressure degraded asset availability", |
| | section_bias=ASSET_CATEGORY_BIAS.get(actor_id, {}).get("sanction", ()), |
| | max_assets=1, |
| | max_status="malfunctioning", |
| | ) |
| | elif action.type in {"defend", "oversight_review"}: |
| | defended_owner = actor_id if action.type == "oversight_review" or action.target not in AGENT_IDS else action.target |
| | self._restore_assets( |
| | world, |
| | owner=defended_owner, |
| | intensity=18.0 if action.type == "defend" else 12.0, |
| | reason=f"{actor_id} {action.type} hardened critical assets", |
| | section_bias=ASSET_CATEGORY_BIAS.get(actor_id, {}).get("defend", ()), |
| | max_assets=2, |
| | ) |
| |
|
| | def _apply_signal_asset_effects(self, world: WorldState, signal: ExternalSignal) -> None: |
| | text = f"{signal.headline} {' '.join(signal.tags)} {(signal.region or '')}".lower() |
| | severity = max(0.0, min(1.0, signal.severity)) |
| | if self._signal_mentions(text, "strike", "rocket", "missile", "drone", "attack", "explosion", "raid"): |
| | for owner in self._infer_affected_agents(signal): |
| | self._damage_assets( |
| | world, |
| | owner=owner, |
| | intensity=24.0 * severity, |
| | reason=f"fresh reporting from {signal.source}", |
| | section_bias=("front", "airbase", "base", "launch-network", "launch-zone", "energy-port"), |
| | max_assets=1, |
| | max_status="malfunctioning", |
| | ) |
| | if self._signal_mentions(text, "shipping", "tanker", "hormuz", "port", "terminal", "oil"): |
| | for owner in ("us", "gulf", "iran"): |
| | self._damage_assets( |
| | world, |
| | owner=owner, |
| | intensity=16.0 * severity, |
| | reason=f"{signal.source} reported shipping disruption", |
| | section_bias=("port", "energy", "energy-port", "chokepoint", "maritime-box"), |
| | max_assets=1, |
| | max_status="malfunctioning", |
| | ) |
| | if self._signal_mentions(text, "cyber", "outage", "blackout", "internet"): |
| | for owner in self._infer_affected_agents(signal): |
| | self._damage_assets( |
| | world, |
| | owner=owner, |
| | intensity=14.0 * severity, |
| | reason=f"{signal.source} reported systems disruption", |
| | section_bias=("command", "command-system", "command-network", "civil-center"), |
| | max_assets=1, |
| | max_status="malfunctioning", |
| | ) |
| |
|
| | def _damage_assets( |
| | self, |
| | world: WorldState, |
| | *, |
| | owner: str, |
| | intensity: float, |
| | reason: str, |
| | section_bias: tuple[str, ...], |
| | max_assets: int, |
| | max_status: str | None = None, |
| | ) -> None: |
| | if owner not in world.asset_state: |
| | return |
| | selected_assets = self._select_assets_for_effect(world, owner, section_bias, max_assets=max_assets, reverse=False) |
| | for asset in selected_assets: |
| | current = world.asset_state[owner][asset.asset_id] |
| | previous_status = current.status |
| | current.health = round(self._clamp_percent(current.health - intensity), 2) |
| | current.operational_load = round(min(100.0, current.operational_load + intensity * 0.7), 2) |
| | current.status = self._derive_asset_status(current.health, current.operational_load, max_status=max_status) |
| | current.last_change_reason = reason |
| | self._apply_asset_metric_impacts(world, owner, current, direction="damage") |
| | if current.status != previous_status: |
| | self._register_latent_event( |
| | world, |
| | LatentEvent( |
| | event_id=f"asset-{owner}-{asset.asset_id}-{world.turn}-{len(world.latent_events)}", |
| | topic="shipping" if any(token in current.section.lower() or token in current.category.lower() for token in ("port", "maritime", "chokepoint", "energy")) else "border", |
| | status="active", |
| | severity=min(1.0, max(0.35, (100.0 - current.health) / 100.0)), |
| | visibility="public", |
| | reliability=0.7, |
| | origin="asset-state", |
| | affected_agents=[owner], |
| | affected_assets=[current.asset_id], |
| | started_at_turn=world.turn, |
| | last_updated_turn=world.turn, |
| | decay_rate=0.06, |
| | narratives=self._default_latent_event_narratives( |
| | "shipping" if any(token in current.section.lower() or token in current.category.lower() for token in ("port", "maritime", "chokepoint", "energy")) else "border", |
| | f"{current.name} is now {current.status} after {reason}.", |
| | ), |
| | ), |
| | ) |
| |
|
| | def _restore_assets( |
| | self, |
| | world: WorldState, |
| | *, |
| | owner: str, |
| | intensity: float, |
| | reason: str, |
| | section_bias: tuple[str, ...], |
| | max_assets: int, |
| | ) -> None: |
| | if owner not in world.asset_state: |
| | return |
| | selected_assets = self._select_assets_for_effect(world, owner, section_bias, max_assets=max_assets, reverse=True) |
| | for asset in selected_assets: |
| | current = world.asset_state[owner][asset.asset_id] |
| | current.health = round(self._clamp_percent(current.health + intensity), 2) |
| | current.operational_load = round(max(0.0, current.operational_load - intensity * 0.8), 2) |
| | current.status = self._derive_asset_status(current.health, current.operational_load) |
| | current.last_change_reason = reason |
| | self._apply_asset_metric_impacts(world, owner, current, direction="repair") |
| |
|
| | def _select_assets_for_effect( |
| | self, |
| | world: WorldState, |
| | owner: str, |
| | section_bias: tuple[str, ...], |
| | *, |
| | max_assets: int, |
| | reverse: bool, |
| | ) -> list[AssetCondition]: |
| | assets = list(world.asset_state.get(owner, {}).values()) |
| | if not assets: |
| | return [] |
| |
|
| | bias_terms = tuple(term.lower() for term in section_bias) |
| |
|
| | def asset_key(asset: AssetCondition) -> tuple[float, float, str]: |
| | priority = float(ASSET_PRIORITY_SCORES.get(asset.criticality.lower(), 1)) |
| | bias_score = 1.0 if any(term in asset.category.lower() or term in asset.section.lower() for term in bias_terms) else 0.0 |
| | damage_score = 100.0 - asset.health if reverse else asset.health |
| | return (priority + bias_score, damage_score, asset.name) |
| |
|
| | sorted_assets = sorted(assets, key=asset_key, reverse=True) |
| | if reverse: |
| | sorted_assets = [asset for asset in sorted_assets if asset.status != "operational"] or sorted_assets |
| | else: |
| | sorted_assets = [asset for asset in sorted_assets if asset.status != "destroyed"] or sorted_assets |
| | return sorted_assets[:max_assets] |
| |
|
| | @staticmethod |
| | def _derive_asset_status(health: float, operational_load: float, max_status: str | None = None) -> str: |
| | derived = "operational" |
| | for threshold, status in ASSET_STATUS_DAMAGE_THRESHOLDS: |
| | if health <= threshold: |
| | derived = status |
| | break |
| | if derived == "operational" and operational_load >= 72.0: |
| | derived = "malfunctioning" |
| | if max_status == "malfunctioning" and derived == "destroyed": |
| | return "malfunctioning" |
| | return derived |
| |
|
| | def _apply_asset_metric_impacts( |
| | self, |
| | world: WorldState, |
| | owner: str, |
| | asset: AssetCondition, |
| | *, |
| | direction: str, |
| | ) -> None: |
| | scale = -1.0 if direction == "damage" else 0.6 |
| | category = f"{asset.section} {asset.category}".lower() |
| | owner_metric_map = { |
| | "us": { |
| | "regional_access": ("base", "airbase", "logistics", "command"), |
| | "shipping_security": ("naval", "port", "maritime", "chokepoint"), |
| | "force_posture": ("base", "airbase", "command"), |
| | "domestic_support": ("command", "capital"), |
| | }, |
| | "israel": { |
| | "homeland_security": ("front", "civil", "infrastructure", "air-defense"), |
| | "northern_deterrence": ("front", "launch", "command"), |
| | "reserve_endurance": ("depth", "logistics", "port"), |
| | "us_resupply_confidence": ("port", "offshore", "command"), |
| | }, |
| | "iran": { |
| | "regime_stability": ("command", "capital", "civil"), |
| | "proxy_corridor": ("corridor", "logistics", "front"), |
| | "hormuz_leverage": ("chokepoint", "maritime", "energy-port"), |
| | "deterrence_credibility": ("front", "command", "maritime"), |
| | }, |
| | "hezbollah": { |
| | "launch_survivability": ("launch", "front", "depth"), |
| | "logistics_depth": ("logistics", "corridor", "reserve"), |
| | "political_cover": ("political", "command", "civil"), |
| | "resistance_credibility": ("launch", "front", "command"), |
| | }, |
| | "gulf": { |
| | "shipping_continuity": ("port", "chokepoint", "maritime"), |
| | "infrastructure_security": ("energy", "capital", "port"), |
| | "investor_confidence": ("energy", "capital", "port"), |
| | "diplomatic_flexibility": ("capital", "port", "chokepoint"), |
| | }, |
| | "oversight": { |
| | "runaway_risk": ("chokepoint", "theater", "civil"), |
| | "intervention_legitimacy": ("civil", "theater"), |
| | "autonomy_balance": ("theater", "command"), |
| | "trace_clarity": ("command", "civil", "theater"), |
| | }, |
| | } |
| | for metric, keywords in owner_metric_map.get(owner, {}).items(): |
| | if any(keyword in category for keyword in keywords): |
| | magnitude = 2.8 if direction == "damage" else 1.4 |
| | self._bump_actor_metric(world, owner, metric, scale * magnitude) |
| |
|
| | def _asset_pressure(self, world: WorldState, agent_id: str) -> float: |
| | assets = list(world.asset_state.get(agent_id, {}).values()) |
| | if not assets: |
| | return 0.0 |
| | weighted_damage = 0.0 |
| | max_weight = 0.0 |
| | for asset in assets: |
| | priority = float(ASSET_PRIORITY_SCORES.get(asset.criticality.lower(), 1)) |
| | weighted_damage += priority * max(0.0, 100.0 - asset.health) |
| | max_weight += priority * 100.0 |
| | if max_weight == 0.0: |
| | return 0.0 |
| | return min(1.0, weighted_damage / max_weight) |
| |
|
| | def _compute_rewards(self, world: WorldState, episode: EpisodeMetadata) -> dict[str, RewardBreakdown]: |
| | rewards: dict[str, RewardBreakdown] = {} |
| | recent_actions = {action.actor: action for action in world.last_actions} |
| |
|
| | for agent_id in AGENT_IDS: |
| | world.ema_tension[agent_id] = round( |
| | 0.08 * world.tension_level + 0.92 * world.ema_tension.get(agent_id, world.tension_level), |
| | 3, |
| | ) |
| |
|
| | rewards["us"] = self._reward_us(world, episode, recent_actions) |
| | rewards["israel"] = self._reward_israel(world, episode, recent_actions) |
| | rewards["iran"] = self._reward_iran(world, episode, recent_actions) |
| | rewards["hezbollah"] = self._reward_hezbollah(world, episode, recent_actions) |
| | rewards["gulf"] = self._reward_gulf(world, episode, recent_actions) |
| | rewards["oversight"] = self._reward_oversight(world, episode, recent_actions) |
| | return rewards |
| |
|
| | @staticmethod |
| | def _clamp_percent(value: float) -> float: |
| | return max(0.0, min(100.0, value)) |
| |
|
| | @staticmethod |
| | def _clamp_unit(value: float) -> float: |
| | return max(-1.0, min(1.0, value)) |
| |
|
| | def _target_score(self, value: float, target: float, tolerance: float) -> float: |
| | return self._clamp_unit(1.0 - abs(value - target) / max(tolerance, 1.0)) |
| |
|
| | def _state_score(self, world: WorldState, agent_id: str, metric: str, target: float, tolerance: float) -> float: |
| | return self._target_score(self._actor_metric(world, agent_id, metric), target, tolerance) |
| |
|
| | def _alliance_score(self, world: WorldState, agent_id: str) -> float: |
| | preferred = set(AGENT_PREFERRED_COALITIONS.get(agent_id, ())) |
| | allies = set(world.coalition_graph.get(agent_id, [])) |
| | if not preferred: |
| | return self._target_score(float(len(allies)), 0.0, 1.5) |
| | return self._clamp_unit((2.0 * len(allies & preferred) / max(len(preferred), 1)) - 1.0) |
| |
|
| | def _selective_alignment_score(self, world: WorldState, agent_id: str, desired_allies: float) -> float: |
| | return self._target_score(float(len(world.coalition_graph.get(agent_id, []))), desired_allies, 1.6) |
| |
|
| | def _behavior_score(self, world: WorldState, agent_id: str, recent_actions: dict[str, AgentAction]) -> float: |
| | baseline = self._clamp_unit(world.behavioral_consistency.get(agent_id, 0.5) * 2.0 - 1.0) |
| | action = recent_actions.get(agent_id) |
| | if action is None: |
| | return baseline |
| | doctrinal_fit = AGENT_ACTION_ALIGNMENT[agent_id].get(action.type, 0.0) |
| | return self._clamp_unit(baseline * 0.6 + doctrinal_fit * 0.4) |
| |
|
| | def _reward_metric(self, world: WorldState, agent_id: str, metric: str) -> float: |
| | config = AGENT_REWARD_METRIC_CONFIGS[agent_id][metric] |
| | return self._state_score(world, agent_id, metric, config.target, config.tolerance) |
| |
|
| | def _action_response_score(self, world: WorldState, agent_id: str, recent_actions: dict[str, AgentAction]) -> float: |
| | action = recent_actions.get(agent_id) |
| | if action is None: |
| | return 0.0 |
| |
|
| | effects = AGENT_STATE_ACTION_EFFECTS.get(agent_id, {}).get(action.type, {}) |
| | if not effects: |
| | return -0.25 |
| |
|
| | weighted_total = 0.0 |
| | total_weight = 0.0 |
| | metric_configs = AGENT_REWARD_METRIC_CONFIGS[agent_id] |
| |
|
| | for metric, delta in effects.items(): |
| | config = metric_configs.get(metric) |
| | if config is None: |
| | continue |
| | metric_score = self._reward_metric(world, agent_id, metric) |
| | direction = 1.0 if delta >= 0 else -1.0 |
| | magnitude = min(abs(delta) / 4.0, 1.0) |
| | weight = config.weight * magnitude |
| | weighted_total += direction * metric_score * weight |
| | total_weight += weight |
| |
|
| | if total_weight == 0.0: |
| | return -0.25 |
| |
|
| | return self._clamp_unit(weighted_total / total_weight) |
| |
|
| | @staticmethod |
| | def _blend_reward_total( |
| | metric_weights: dict[str, float], |
| | metric_scores: dict[str, float], |
| | behavior: float, |
| | action_response: float, |
| | ) -> float: |
| | metric_total = sum(metric_weights[metric] * metric_scores[metric] for metric in metric_scores) |
| | total_weight = sum(metric_weights.values()) + 0.10 + 0.08 |
| | return max(-1.0, min(1.0, (metric_total + 0.10 * behavior + 0.08 * action_response) / total_weight)) |
| |
|
| | def _action_pressure( |
| | self, |
| | recent_actions: dict[str, AgentAction], |
| | actors: tuple[str, ...], |
| | escalatory_types: set[str], |
| | ) -> float: |
| | hits = sum( |
| | 1 |
| | for actor in actors |
| | if actor in recent_actions and recent_actions[actor].type in escalatory_types |
| | ) |
| | return hits / max(len(actors), 1) |
| |
|
| | def _finalize_reward( |
| | self, |
| | *, |
| | episode: EpisodeMetadata, |
| | turn: int, |
| | coalition: float, |
| | escalation: float, |
| | market: float, |
| | behavior: float, |
| | total: float, |
| | goal_terms: dict[str, float], |
| | ) -> RewardBreakdown: |
| | scale = 1.0 if episode.dense_rewards or turn == 0 or turn % 10 == 0 else 0.35 |
| | scaled_goal_terms = {name: round(self._clamp_unit(value) * scale, 3) for name, value in goal_terms.items()} |
| | return RewardBreakdown( |
| | coalition_stability=round(self._clamp_unit(coalition) * scale, 3), |
| | escalation_penalty=round(self._clamp_unit(escalation) * scale, 3), |
| | market_gain=round(self._clamp_unit(market) * scale, 3), |
| | behavioral_consistency=round(self._clamp_unit(behavior) * scale, 3), |
| | goal_terms=scaled_goal_terms, |
| | total=round(self._clamp_unit(total) * scale, 3), |
| | ) |
| |
|
| | def _reward_us( |
| | self, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | recent_actions: dict[str, AgentAction], |
| | ) -> RewardBreakdown: |
| | metric_weights = { |
| | metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["us"].items() |
| | } |
| | regional_access = self._reward_metric(world, "us", "regional_access") |
| | shipping_stability = self._reward_metric(world, "us", "shipping_security") |
| | domestic_resilience = self._reward_metric(world, "us", "domestic_support") |
| | force_posture = self._reward_metric(world, "us", "force_posture") |
| | behavior = self._behavior_score(world, "us", recent_actions) |
| | action_response = self._action_response_score(world, "us", recent_actions) |
| | total = self._blend_reward_total( |
| | metric_weights, |
| | { |
| | "regional_access": regional_access, |
| | "shipping_security": shipping_stability, |
| | "domestic_support": domestic_resilience, |
| | "force_posture": force_posture, |
| | }, |
| | behavior, |
| | action_response, |
| | ) |
| | return self._finalize_reward( |
| | episode=episode, |
| | turn=world.turn, |
| | coalition=regional_access, |
| | escalation=domestic_resilience, |
| | market=shipping_stability, |
| | behavior=behavior, |
| | total=total, |
| | goal_terms={ |
| | "regional_access": regional_access, |
| | "shipping_stability": shipping_stability, |
| | "domestic_resilience": domestic_resilience, |
| | "force_posture": force_posture, |
| | "action_response": action_response, |
| | }, |
| | ) |
| |
|
| | def _reward_israel( |
| | self, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | recent_actions: dict[str, AgentAction], |
| | ) -> RewardBreakdown: |
| | metric_weights = { |
| | metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["israel"].items() |
| | } |
| | homeland_security = self._reward_metric(world, "israel", "homeland_security") |
| | northern_deterrence = self._reward_metric(world, "israel", "northern_deterrence") |
| | reserve_endurance = self._reward_metric(world, "israel", "reserve_endurance") |
| | us_backstop = self._reward_metric(world, "israel", "us_resupply_confidence") |
| | behavior = self._behavior_score(world, "israel", recent_actions) |
| | action_response = self._action_response_score(world, "israel", recent_actions) |
| | total = self._blend_reward_total( |
| | metric_weights, |
| | { |
| | "homeland_security": homeland_security, |
| | "northern_deterrence": northern_deterrence, |
| | "us_resupply_confidence": us_backstop, |
| | "reserve_endurance": reserve_endurance, |
| | }, |
| | behavior, |
| | action_response, |
| | ) |
| | return self._finalize_reward( |
| | episode=episode, |
| | turn=world.turn, |
| | coalition=us_backstop, |
| | escalation=homeland_security, |
| | market=reserve_endurance, |
| | behavior=behavior, |
| | total=total, |
| | goal_terms={ |
| | "homeland_security": homeland_security, |
| | "northern_deterrence": northern_deterrence, |
| | "us_backstop": us_backstop, |
| | "reserve_endurance": reserve_endurance, |
| | "action_response": action_response, |
| | }, |
| | ) |
| |
|
| | def _reward_iran( |
| | self, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | recent_actions: dict[str, AgentAction], |
| | ) -> RewardBreakdown: |
| | metric_weights = { |
| | metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["iran"].items() |
| | } |
| | regime_survival = self._reward_metric(world, "iran", "regime_stability") |
| | proxy_axis_integrity = self._reward_metric(world, "iran", "proxy_corridor") |
| | chokepoint_leverage = self._reward_metric(world, "iran", "hormuz_leverage") |
| | deterrence_credibility = self._reward_metric(world, "iran", "deterrence_credibility") |
| | behavior = self._behavior_score(world, "iran", recent_actions) |
| | action_response = self._action_response_score(world, "iran", recent_actions) |
| | total = self._blend_reward_total( |
| | metric_weights, |
| | { |
| | "regime_stability": regime_survival, |
| | "proxy_corridor": proxy_axis_integrity, |
| | "hormuz_leverage": chokepoint_leverage, |
| | "deterrence_credibility": deterrence_credibility, |
| | }, |
| | behavior, |
| | action_response, |
| | ) |
| | return self._finalize_reward( |
| | episode=episode, |
| | turn=world.turn, |
| | coalition=proxy_axis_integrity, |
| | escalation=regime_survival, |
| | market=chokepoint_leverage, |
| | behavior=behavior, |
| | total=total, |
| | goal_terms={ |
| | "regime_survival": regime_survival, |
| | "proxy_axis_integrity": proxy_axis_integrity, |
| | "chokepoint_leverage": chokepoint_leverage, |
| | "deterrence_credibility": deterrence_credibility, |
| | "action_response": action_response, |
| | }, |
| | ) |
| |
|
| | def _reward_hezbollah( |
| | self, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | recent_actions: dict[str, AgentAction], |
| | ) -> RewardBreakdown: |
| | metric_weights = { |
| | metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["hezbollah"].items() |
| | } |
| | launch_survivability = self._reward_metric(world, "hezbollah", "launch_survivability") |
| | logistics_depth = self._reward_metric(world, "hezbollah", "logistics_depth") |
| | political_cover = self._reward_metric(world, "hezbollah", "political_cover") |
| | resistance_credibility = self._reward_metric(world, "hezbollah", "resistance_credibility") |
| | iran_backing = self._clamp_unit(0.6 * self._alliance_score(world, "hezbollah") + 0.4 * logistics_depth) |
| | behavior = self._behavior_score(world, "hezbollah", recent_actions) |
| | action_response = self._action_response_score(world, "hezbollah", recent_actions) |
| | total = self._blend_reward_total( |
| | metric_weights, |
| | { |
| | "launch_survivability": launch_survivability, |
| | "logistics_depth": logistics_depth, |
| | "resistance_credibility": resistance_credibility, |
| | "political_cover": political_cover, |
| | }, |
| | behavior, |
| | action_response, |
| | ) |
| | return self._finalize_reward( |
| | episode=episode, |
| | turn=world.turn, |
| | coalition=iran_backing, |
| | escalation=launch_survivability, |
| | market=resistance_credibility, |
| | behavior=behavior, |
| | total=total, |
| | goal_terms={ |
| | "iran_backing": iran_backing, |
| | "launch_survivability": launch_survivability, |
| | "logistics_depth": logistics_depth, |
| | "political_cover": political_cover, |
| | "resistance_credibility": resistance_credibility, |
| | "action_response": action_response, |
| | }, |
| | ) |
| |
|
| | def _reward_gulf( |
| | self, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | recent_actions: dict[str, AgentAction], |
| | ) -> RewardBreakdown: |
| | metric_weights = { |
| | metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["gulf"].items() |
| | } |
| | shipping_continuity = self._reward_metric(world, "gulf", "shipping_continuity") |
| | infrastructure_security = self._reward_metric(world, "gulf", "infrastructure_security") |
| | investor_confidence = self._reward_metric(world, "gulf", "investor_confidence") |
| | diplomatic_flexibility = self._reward_metric(world, "gulf", "diplomatic_flexibility") |
| | behavior = self._behavior_score(world, "gulf", recent_actions) |
| | action_response = self._action_response_score(world, "gulf", recent_actions) |
| | total = self._blend_reward_total( |
| | metric_weights, |
| | { |
| | "shipping_continuity": shipping_continuity, |
| | "investor_confidence": investor_confidence, |
| | "infrastructure_security": infrastructure_security, |
| | "diplomatic_flexibility": diplomatic_flexibility, |
| | }, |
| | behavior, |
| | action_response, |
| | ) |
| | return self._finalize_reward( |
| | episode=episode, |
| | turn=world.turn, |
| | coalition=diplomatic_flexibility, |
| | escalation=shipping_continuity, |
| | market=investor_confidence, |
| | behavior=behavior, |
| | total=total, |
| | goal_terms={ |
| | "shipping_continuity": shipping_continuity, |
| | "infrastructure_security": infrastructure_security, |
| | "investor_confidence": investor_confidence, |
| | "diplomatic_flexibility": diplomatic_flexibility, |
| | "action_response": action_response, |
| | }, |
| | ) |
| |
|
| | def _reward_oversight( |
| | self, |
| | world: WorldState, |
| | episode: EpisodeMetadata, |
| | recent_actions: dict[str, AgentAction], |
| | ) -> RewardBreakdown: |
| | metric_weights = { |
| | metric: config.weight for metric, config in AGENT_REWARD_METRIC_CONFIGS["oversight"].items() |
| | } |
| | risk_reduction = self._reward_metric(world, "oversight", "runaway_risk") |
| | intervention_legitimacy = self._reward_metric(world, "oversight", "intervention_legitimacy") |
| | autonomy_preservation = self._reward_metric(world, "oversight", "autonomy_balance") |
| | trace_clarity = self._reward_metric(world, "oversight", "trace_clarity") |
| | behavior = self._behavior_score(world, "oversight", recent_actions) |
| | action_response = self._action_response_score(world, "oversight", recent_actions) |
| | total = self._blend_reward_total( |
| | metric_weights, |
| | { |
| | "runaway_risk": risk_reduction, |
| | "autonomy_balance": autonomy_preservation, |
| | "intervention_legitimacy": intervention_legitimacy, |
| | "trace_clarity": trace_clarity, |
| | }, |
| | behavior, |
| | action_response, |
| | ) |
| | return self._finalize_reward( |
| | episode=episode, |
| | turn=world.turn, |
| | coalition=autonomy_preservation, |
| | escalation=risk_reduction, |
| | market=trace_clarity, |
| | behavior=behavior, |
| | total=total, |
| | goal_terms={ |
| | "runaway_risk_reduction": risk_reduction, |
| | "intervention_legitimacy": intervention_legitimacy, |
| | "autonomy_preservation": autonomy_preservation, |
| | "trace_clarity": trace_clarity, |
| | "action_response": action_response, |
| | }, |
| | ) |
| |
|
| | def _update_behavioral_consistency(self, world: WorldState, agent_id: str, action: AgentAction) -> float: |
| | intent_score = self._intent_score(action.summary) |
| | action_score = ACTION_STANCE_SCORES[action.type] |
| | observable_consistency = 1.0 - min(2.0, abs(intent_score - action_score)) / 2.0 |
| | prior = world.behavioral_consistency.get(agent_id, 0.7) |
| | return round(max(0.0, min(1.0, prior * 0.65 + observable_consistency * 0.35)), 3) |
| |
|
| | @staticmethod |
| | def _intent_score(summary: str) -> float: |
| | lowered = summary.lower() |
| | cooperative_hits = sum(marker in lowered for marker in COOPERATIVE_INTENT_MARKERS) |
| | escalatory_hits = sum(marker in lowered for marker in ESCALATORY_INTENT_MARKERS) |
| | total_hits = cooperative_hits + escalatory_hits |
| | if total_hits == 0: |
| | return 0.0 |
| | return max(-1.0, min(1.0, (escalatory_hits - cooperative_hits) / total_hits)) |
| |
|