swarm-sandbox / utils /simulation.py
Raeli Savitt
Deploy SWARM interactive sandbox as Docker+Streamlit Space
99bd0b8
"""Simulation wrapper functions for the demo."""
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
# Ensure project root is on path (simulation.py is at utils/ under Space root)
PROJECT_ROOT = Path(__file__).resolve().parent.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from swarm.analysis.aggregation import MetricsAggregator # noqa: E402
from swarm.core.orchestrator import Orchestrator, OrchestratorConfig # noqa: E402
from swarm.governance.config import GovernanceConfig # noqa: E402
from swarm.scenarios.loader import ( # noqa: E402
build_orchestrator,
load_scenario,
)
SCENARIOS_DIR = PROJECT_ROOT / "scenarios"
# Safety limits to prevent excessive resource consumption
MAX_AGENTS_PER_TYPE = 10
MAX_TOTAL_AGENTS = 40
MAX_EPOCHS = 50
MAX_STEPS_PER_EPOCH = 30
def _requires_llm(data: dict) -> bool:
"""Return True if scenario YAML uses LLM-backed agents."""
for agent_spec in data.get("agents", []):
if agent_spec.get("type") == "llm":
return True
return False
def list_scenarios() -> List[Dict[str, str]]:
"""List available scenarios, excluding those that need LLM API keys."""
import yaml
scenarios = []
for yaml_file in sorted(SCENARIOS_DIR.glob("*.yaml")):
with open(yaml_file) as f:
data = yaml.safe_load(f)
if _requires_llm(data):
continue
scenarios.append(
{
"id": data.get("scenario_id", yaml_file.stem),
"description": data.get("description", ""),
"path": str(yaml_file),
"filename": yaml_file.name,
}
)
return scenarios
def run_scenario(scenario_path: str, seed: Optional[int] = None) -> Dict[str, Any]:
"""Run a scenario and return structured results.
Args:
scenario_path: Path to YAML scenario file (must be under scenarios/)
seed: Optional seed override
Returns:
Dict with epoch_metrics, agent_states, config info
Raises:
ValueError: If path is outside the scenarios directory
"""
# Path traversal protection: resolve and verify within scenarios dir
resolved = Path(scenario_path).resolve()
scenarios_resolved = SCENARIOS_DIR.resolve()
if not str(resolved).startswith(str(scenarios_resolved)):
raise ValueError(
f"Scenario path must be within {SCENARIOS_DIR}, got {scenario_path}"
)
# Reject scenarios that require LLM API keys
import yaml
with open(resolved) as f:
raw = yaml.safe_load(f)
if _requires_llm(raw):
raise ValueError("LLM-backed scenarios are not supported in the demo")
scenario = load_scenario(resolved)
if seed is not None:
scenario.orchestrator_config.seed = seed
# Disable file logging in demo mode to prevent disk writes
scenario.orchestrator_config.log_path = None
scenario.orchestrator_config.log_events = False
orchestrator = build_orchestrator(scenario)
# Attach aggregator for rich metrics
aggregator = MetricsAggregator()
aggregator.start_simulation(
simulation_id=scenario.scenario_id,
n_epochs=scenario.orchestrator_config.n_epochs,
steps_per_epoch=scenario.orchestrator_config.steps_per_epoch,
n_agents=len(orchestrator._agents),
seed=scenario.orchestrator_config.seed,
)
# Wire up interaction recording
def on_interaction(interaction, payoff_a, payoff_b):
aggregator.record_interaction(interaction)
aggregator.record_payoff(interaction.initiator, payoff_a)
aggregator.record_payoff(interaction.counterparty, payoff_b)
orchestrator._on_interaction_complete.append(on_interaction)
# Wire up epoch finalization
def on_epoch(epoch_metrics):
agent_states = {
aid: orchestrator.state.get_agent(aid) for aid in orchestrator._agents
}
aggregator.finalize_epoch(
epoch=orchestrator.state.current_epoch - 1,
agent_states=agent_states,
)
orchestrator._on_epoch_end.append(on_epoch)
# Run
epoch_metrics_list = orchestrator.run()
history = aggregator.end_simulation()
incoherence_series: List[float] = []
if history and getattr(history, "epoch_snapshots", None):
incoherence_series = [
float(getattr(snapshot, "incoherence_index", 0.0))
for snapshot in history.epoch_snapshots
]
# Extract agent final states
agent_states = []
for agent_id, _agent in orchestrator._agents.items():
state = orchestrator.state.get_agent(agent_id)
if state:
agent_states.append(
{
"agent_id": agent_id,
"agent_type": state.agent_type.value,
"reputation": round(state.reputation, 2),
"resources": round(state.resources, 2),
"interactions": state.interactions_initiated
+ state.interactions_received,
"total_payoff": round(state.total_payoff, 2),
}
)
return {
"scenario_id": scenario.scenario_id,
"description": scenario.description,
"epoch_metrics": epoch_metrics_list,
"agent_states": agent_states,
"history": history,
"incoherence_series": incoherence_series,
"n_epochs": scenario.orchestrator_config.n_epochs,
"n_agents": len(orchestrator._agents),
}
def run_custom(
n_honest: int = 3,
n_opportunistic: int = 1,
n_deceptive: int = 1,
n_adversarial: int = 0,
n_epochs: int = 20,
steps_per_epoch: int = 10,
tax_rate: float = 0.0,
reputation_decay: float = 1.0,
staking_enabled: bool = False,
min_stake: float = 0.0,
circuit_breaker_enabled: bool = False,
freeze_threshold: float = 0.7,
audit_enabled: bool = False,
audit_probability: float = 0.1,
seed: int = 42,
) -> Dict[str, Any]:
"""Run a custom simulation with specified parameters.
Returns:
Dict with epoch_metrics, agent_states, config info
Raises:
ValueError: If parameters exceed safety limits
"""
# Validate bounds to prevent resource exhaustion
total_agents = n_honest + n_opportunistic + n_deceptive + n_adversarial
if total_agents > MAX_TOTAL_AGENTS:
raise ValueError(
f"Total agents ({total_agents}) exceeds max ({MAX_TOTAL_AGENTS})"
)
if total_agents < 1:
raise ValueError("Must have at least 1 agent")
if n_epochs > MAX_EPOCHS:
raise ValueError(f"n_epochs ({n_epochs}) exceeds max ({MAX_EPOCHS})")
if steps_per_epoch > MAX_STEPS_PER_EPOCH:
raise ValueError(
f"steps_per_epoch ({steps_per_epoch}) exceeds max ({MAX_STEPS_PER_EPOCH})"
)
for name, val in [
("n_honest", n_honest),
("n_opportunistic", n_opportunistic),
("n_deceptive", n_deceptive),
("n_adversarial", n_adversarial),
]:
if val > MAX_AGENTS_PER_TYPE:
raise ValueError(f"{name} ({val}) exceeds max ({MAX_AGENTS_PER_TYPE})")
from swarm.agents.adversarial import AdversarialAgent
from swarm.agents.deceptive import DeceptiveAgent
from swarm.agents.honest import HonestAgent
from swarm.agents.opportunistic import OpportunisticAgent
governance_config = GovernanceConfig(
transaction_tax_rate=tax_rate,
reputation_decay_rate=reputation_decay,
staking_enabled=staking_enabled,
min_stake_to_participate=min_stake,
circuit_breaker_enabled=circuit_breaker_enabled,
freeze_threshold_toxicity=freeze_threshold,
audit_enabled=audit_enabled,
audit_probability=audit_probability,
)
config = OrchestratorConfig(
n_epochs=n_epochs,
steps_per_epoch=steps_per_epoch,
governance_config=governance_config,
seed=seed,
)
orchestrator = Orchestrator(config)
# Register agents
agent_specs = [
(HonestAgent, "honest", n_honest),
(OpportunisticAgent, "opportunistic", n_opportunistic),
(DeceptiveAgent, "deceptive", n_deceptive),
(AdversarialAgent, "adversarial", n_adversarial),
]
for agent_class, type_name, count in agent_specs:
for i in range(count):
orchestrator.register_agent(agent_class(agent_id=f"{type_name}_{i + 1}"))
# Attach aggregator
aggregator = MetricsAggregator()
aggregator.start_simulation(
simulation_id="custom",
n_epochs=n_epochs,
steps_per_epoch=steps_per_epoch,
n_agents=len(orchestrator._agents),
seed=seed,
)
def on_interaction(interaction, payoff_a, payoff_b):
aggregator.record_interaction(interaction)
aggregator.record_payoff(interaction.initiator, payoff_a)
aggregator.record_payoff(interaction.counterparty, payoff_b)
orchestrator._on_interaction_complete.append(on_interaction)
def on_epoch(epoch_metrics):
agent_states = {
aid: orchestrator.state.get_agent(aid) for aid in orchestrator._agents
}
aggregator.finalize_epoch(
epoch=orchestrator.state.current_epoch - 1,
agent_states=agent_states,
)
orchestrator._on_epoch_end.append(on_epoch)
epoch_metrics_list = orchestrator.run()
history = aggregator.end_simulation()
incoherence_series: List[float] = []
if history and getattr(history, "epoch_snapshots", None):
incoherence_series = [
float(getattr(snapshot, "incoherence_index", 0.0))
for snapshot in history.epoch_snapshots
]
# Extract agent final states
agent_states = []
for agent_id, _agent in orchestrator._agents.items():
state = orchestrator.state.get_agent(agent_id)
if state:
agent_states.append(
{
"agent_id": agent_id,
"agent_type": state.agent_type.value,
"reputation": round(state.reputation, 2),
"resources": round(state.resources, 2),
"interactions": state.interactions_initiated
+ state.interactions_received,
"total_payoff": round(state.total_payoff, 2),
}
)
return {
"scenario_id": "custom",
"description": "Custom simulation",
"epoch_metrics": epoch_metrics_list,
"agent_states": agent_states,
"history": history,
"incoherence_series": incoherence_series,
"n_epochs": n_epochs,
"n_agents": len(orchestrator._agents),
}