Spaces:
Running
Running
| """Simulation wrapper functions for the demo.""" | |
| import sys | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional | |
| # Ensure project root is on path (simulation.py is at utils/ under Space root) | |
| PROJECT_ROOT = Path(__file__).resolve().parent.parent | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| from swarm.analysis.aggregation import MetricsAggregator # noqa: E402 | |
| from swarm.core.orchestrator import Orchestrator, OrchestratorConfig # noqa: E402 | |
| from swarm.governance.config import GovernanceConfig # noqa: E402 | |
| from swarm.scenarios.loader import ( # noqa: E402 | |
| build_orchestrator, | |
| load_scenario, | |
| ) | |
| SCENARIOS_DIR = PROJECT_ROOT / "scenarios" | |
| # Safety limits to prevent excessive resource consumption | |
| MAX_AGENTS_PER_TYPE = 10 | |
| MAX_TOTAL_AGENTS = 40 | |
| MAX_EPOCHS = 50 | |
| MAX_STEPS_PER_EPOCH = 30 | |
| def _requires_llm(data: dict) -> bool: | |
| """Return True if scenario YAML uses LLM-backed agents.""" | |
| for agent_spec in data.get("agents", []): | |
| if agent_spec.get("type") == "llm": | |
| return True | |
| return False | |
| def list_scenarios() -> List[Dict[str, str]]: | |
| """List available scenarios, excluding those that need LLM API keys.""" | |
| import yaml | |
| scenarios = [] | |
| for yaml_file in sorted(SCENARIOS_DIR.glob("*.yaml")): | |
| with open(yaml_file) as f: | |
| data = yaml.safe_load(f) | |
| if _requires_llm(data): | |
| continue | |
| scenarios.append( | |
| { | |
| "id": data.get("scenario_id", yaml_file.stem), | |
| "description": data.get("description", ""), | |
| "path": str(yaml_file), | |
| "filename": yaml_file.name, | |
| } | |
| ) | |
| return scenarios | |
| def run_scenario(scenario_path: str, seed: Optional[int] = None) -> Dict[str, Any]: | |
| """Run a scenario and return structured results. | |
| Args: | |
| scenario_path: Path to YAML scenario file (must be under scenarios/) | |
| seed: Optional seed override | |
| Returns: | |
| Dict with epoch_metrics, agent_states, config info | |
| Raises: | |
| ValueError: If path is outside the scenarios directory | |
| """ | |
| # Path traversal protection: resolve and verify within scenarios dir | |
| resolved = Path(scenario_path).resolve() | |
| scenarios_resolved = SCENARIOS_DIR.resolve() | |
| if not str(resolved).startswith(str(scenarios_resolved)): | |
| raise ValueError( | |
| f"Scenario path must be within {SCENARIOS_DIR}, got {scenario_path}" | |
| ) | |
| # Reject scenarios that require LLM API keys | |
| import yaml | |
| with open(resolved) as f: | |
| raw = yaml.safe_load(f) | |
| if _requires_llm(raw): | |
| raise ValueError("LLM-backed scenarios are not supported in the demo") | |
| scenario = load_scenario(resolved) | |
| if seed is not None: | |
| scenario.orchestrator_config.seed = seed | |
| # Disable file logging in demo mode to prevent disk writes | |
| scenario.orchestrator_config.log_path = None | |
| scenario.orchestrator_config.log_events = False | |
| orchestrator = build_orchestrator(scenario) | |
| # Attach aggregator for rich metrics | |
| aggregator = MetricsAggregator() | |
| aggregator.start_simulation( | |
| simulation_id=scenario.scenario_id, | |
| n_epochs=scenario.orchestrator_config.n_epochs, | |
| steps_per_epoch=scenario.orchestrator_config.steps_per_epoch, | |
| n_agents=len(orchestrator._agents), | |
| seed=scenario.orchestrator_config.seed, | |
| ) | |
| # Wire up interaction recording | |
| def on_interaction(interaction, payoff_a, payoff_b): | |
| aggregator.record_interaction(interaction) | |
| aggregator.record_payoff(interaction.initiator, payoff_a) | |
| aggregator.record_payoff(interaction.counterparty, payoff_b) | |
| orchestrator._on_interaction_complete.append(on_interaction) | |
| # Wire up epoch finalization | |
| def on_epoch(epoch_metrics): | |
| agent_states = { | |
| aid: orchestrator.state.get_agent(aid) for aid in orchestrator._agents | |
| } | |
| aggregator.finalize_epoch( | |
| epoch=orchestrator.state.current_epoch - 1, | |
| agent_states=agent_states, | |
| ) | |
| orchestrator._on_epoch_end.append(on_epoch) | |
| # Run | |
| epoch_metrics_list = orchestrator.run() | |
| history = aggregator.end_simulation() | |
| incoherence_series: List[float] = [] | |
| if history and getattr(history, "epoch_snapshots", None): | |
| incoherence_series = [ | |
| float(getattr(snapshot, "incoherence_index", 0.0)) | |
| for snapshot in history.epoch_snapshots | |
| ] | |
| # Extract agent final states | |
| agent_states = [] | |
| for agent_id, _agent in orchestrator._agents.items(): | |
| state = orchestrator.state.get_agent(agent_id) | |
| if state: | |
| agent_states.append( | |
| { | |
| "agent_id": agent_id, | |
| "agent_type": state.agent_type.value, | |
| "reputation": round(state.reputation, 2), | |
| "resources": round(state.resources, 2), | |
| "interactions": state.interactions_initiated | |
| + state.interactions_received, | |
| "total_payoff": round(state.total_payoff, 2), | |
| } | |
| ) | |
| return { | |
| "scenario_id": scenario.scenario_id, | |
| "description": scenario.description, | |
| "epoch_metrics": epoch_metrics_list, | |
| "agent_states": agent_states, | |
| "history": history, | |
| "incoherence_series": incoherence_series, | |
| "n_epochs": scenario.orchestrator_config.n_epochs, | |
| "n_agents": len(orchestrator._agents), | |
| } | |
| def run_custom( | |
| n_honest: int = 3, | |
| n_opportunistic: int = 1, | |
| n_deceptive: int = 1, | |
| n_adversarial: int = 0, | |
| n_epochs: int = 20, | |
| steps_per_epoch: int = 10, | |
| tax_rate: float = 0.0, | |
| reputation_decay: float = 1.0, | |
| staking_enabled: bool = False, | |
| min_stake: float = 0.0, | |
| circuit_breaker_enabled: bool = False, | |
| freeze_threshold: float = 0.7, | |
| audit_enabled: bool = False, | |
| audit_probability: float = 0.1, | |
| seed: int = 42, | |
| ) -> Dict[str, Any]: | |
| """Run a custom simulation with specified parameters. | |
| Returns: | |
| Dict with epoch_metrics, agent_states, config info | |
| Raises: | |
| ValueError: If parameters exceed safety limits | |
| """ | |
| # Validate bounds to prevent resource exhaustion | |
| total_agents = n_honest + n_opportunistic + n_deceptive + n_adversarial | |
| if total_agents > MAX_TOTAL_AGENTS: | |
| raise ValueError( | |
| f"Total agents ({total_agents}) exceeds max ({MAX_TOTAL_AGENTS})" | |
| ) | |
| if total_agents < 1: | |
| raise ValueError("Must have at least 1 agent") | |
| if n_epochs > MAX_EPOCHS: | |
| raise ValueError(f"n_epochs ({n_epochs}) exceeds max ({MAX_EPOCHS})") | |
| if steps_per_epoch > MAX_STEPS_PER_EPOCH: | |
| raise ValueError( | |
| f"steps_per_epoch ({steps_per_epoch}) exceeds max ({MAX_STEPS_PER_EPOCH})" | |
| ) | |
| for name, val in [ | |
| ("n_honest", n_honest), | |
| ("n_opportunistic", n_opportunistic), | |
| ("n_deceptive", n_deceptive), | |
| ("n_adversarial", n_adversarial), | |
| ]: | |
| if val > MAX_AGENTS_PER_TYPE: | |
| raise ValueError(f"{name} ({val}) exceeds max ({MAX_AGENTS_PER_TYPE})") | |
| from swarm.agents.adversarial import AdversarialAgent | |
| from swarm.agents.deceptive import DeceptiveAgent | |
| from swarm.agents.honest import HonestAgent | |
| from swarm.agents.opportunistic import OpportunisticAgent | |
| governance_config = GovernanceConfig( | |
| transaction_tax_rate=tax_rate, | |
| reputation_decay_rate=reputation_decay, | |
| staking_enabled=staking_enabled, | |
| min_stake_to_participate=min_stake, | |
| circuit_breaker_enabled=circuit_breaker_enabled, | |
| freeze_threshold_toxicity=freeze_threshold, | |
| audit_enabled=audit_enabled, | |
| audit_probability=audit_probability, | |
| ) | |
| config = OrchestratorConfig( | |
| n_epochs=n_epochs, | |
| steps_per_epoch=steps_per_epoch, | |
| governance_config=governance_config, | |
| seed=seed, | |
| ) | |
| orchestrator = Orchestrator(config) | |
| # Register agents | |
| agent_specs = [ | |
| (HonestAgent, "honest", n_honest), | |
| (OpportunisticAgent, "opportunistic", n_opportunistic), | |
| (DeceptiveAgent, "deceptive", n_deceptive), | |
| (AdversarialAgent, "adversarial", n_adversarial), | |
| ] | |
| for agent_class, type_name, count in agent_specs: | |
| for i in range(count): | |
| orchestrator.register_agent(agent_class(agent_id=f"{type_name}_{i + 1}")) | |
| # Attach aggregator | |
| aggregator = MetricsAggregator() | |
| aggregator.start_simulation( | |
| simulation_id="custom", | |
| n_epochs=n_epochs, | |
| steps_per_epoch=steps_per_epoch, | |
| n_agents=len(orchestrator._agents), | |
| seed=seed, | |
| ) | |
| def on_interaction(interaction, payoff_a, payoff_b): | |
| aggregator.record_interaction(interaction) | |
| aggregator.record_payoff(interaction.initiator, payoff_a) | |
| aggregator.record_payoff(interaction.counterparty, payoff_b) | |
| orchestrator._on_interaction_complete.append(on_interaction) | |
| def on_epoch(epoch_metrics): | |
| agent_states = { | |
| aid: orchestrator.state.get_agent(aid) for aid in orchestrator._agents | |
| } | |
| aggregator.finalize_epoch( | |
| epoch=orchestrator.state.current_epoch - 1, | |
| agent_states=agent_states, | |
| ) | |
| orchestrator._on_epoch_end.append(on_epoch) | |
| epoch_metrics_list = orchestrator.run() | |
| history = aggregator.end_simulation() | |
| incoherence_series: List[float] = [] | |
| if history and getattr(history, "epoch_snapshots", None): | |
| incoherence_series = [ | |
| float(getattr(snapshot, "incoherence_index", 0.0)) | |
| for snapshot in history.epoch_snapshots | |
| ] | |
| # Extract agent final states | |
| agent_states = [] | |
| for agent_id, _agent in orchestrator._agents.items(): | |
| state = orchestrator.state.get_agent(agent_id) | |
| if state: | |
| agent_states.append( | |
| { | |
| "agent_id": agent_id, | |
| "agent_type": state.agent_type.value, | |
| "reputation": round(state.reputation, 2), | |
| "resources": round(state.resources, 2), | |
| "interactions": state.interactions_initiated | |
| + state.interactions_received, | |
| "total_payoff": round(state.total_payoff, 2), | |
| } | |
| ) | |
| return { | |
| "scenario_id": "custom", | |
| "description": "Custom simulation", | |
| "epoch_metrics": epoch_metrics_list, | |
| "agent_states": agent_states, | |
| "history": history, | |
| "incoherence_series": incoherence_series, | |
| "n_epochs": n_epochs, | |
| "n_agents": len(orchestrator._agents), | |
| } | |