"""Main simulation engine for DeFi agent market.""" import json import random from typing import List, Dict, Optional from dataclasses import dataclass from core.agent import Agent from core.defi_mechanics import Pool from core.summarizer import Summarizer from api.supabase_client import ( SupabaseClient, RunData, AgentStateData, PoolStateData, ActionData, MetricsData ) from config import NUM_AGENTS, TURNS_PER_RUN @dataclass class Simulation: """Orchestrates the DeFi agent simulation.""" num_agents: int = NUM_AGENTS turns_per_run: int = TURNS_PER_RUN supabase: Optional[SupabaseClient] = None # Alliance bonus config ALLIANCE_BONUS: float = 4.0 # Bonus for successful alliance # Action bonuses LIQUIDITY_BONUS: float = 8.0 # Bonus for providing liquidity SWAP_BONUS: float = 3.0 # Bonus for active trading COORDINATED_TRADE_BONUS: float = 5.0 # Bonus for trading with allies PROFIT_BONUS: float = 15.0 # Bonus for ending turn with positive profit # Market maker config ENABLE_MARKET_MAKER: bool = True MARKET_MAKER_INTERVAL: int = 3 # Market maker acts every N turns MARKET_MAKER_VOLATILITY: float = 0.15 # 15% price shock # Chaos agent config ENABLE_CHAOS_AGENT: bool = True CHAOS_AGENT_CHANCE: float = 0.35 # 35% chance each turn CHAOS_AGENT_MIN_VOLATILITY: float = 0.25 # Min 25% of reserves CHAOS_AGENT_MAX_VOLATILITY: float = 0.50 # Max 50% of reserves def __post_init__(self): self.agents: List[Agent] = [] self.pool: Optional[Pool] = None self.current_run_id: Optional[int] = None self.current_run_number: int = 0 self.market_maker_trades: List[Dict] = [] self.price_shocks: List[Dict] = [] if self.supabase is None: try: self.supabase = SupabaseClient() except ValueError: print("Warning: Supabase not configured. Running without persistence.") self.supabase = None def initialize_run(self, run_number: int = None): """Initialize a new run with agents and pool.""" if run_number is None: if self.supabase: run_number = self.supabase.get_next_run_number() else: run_number = self.current_run_number + 1 self.current_run_number = run_number self.agents = [Agent(f"Agent_{i}") for i in range(self.num_agents)] self.pool = Pool() print(f"Initialized run {run_number} with {self.num_agents} agents") if self.supabase: self.current_run_id = self.supabase.create_run(run_number) print(f"Created run in database: ID {self.current_run_id}") def run(self, run_number: int = None) -> Dict: """Execute a complete simulation run.""" # Debug: Check supabase status has_supabase = self.supabase is not None print(f"[DEBUG] run() called, supabase={'yes' if has_supabase else 'NO'}") self.initialize_run(run_number) print(f"\n=== Starting run {self.current_run_number} with {self.turns_per_run} turns ===") if self.ENABLE_MARKET_MAKER: print("Market Maker: ENABLED (creates volatility every 3 turns)") if self.ENABLE_CHAOS_AGENT: print("Chaos Agent: ENABLED (random unpredictable moves)") print(f"Alliance Bonus: {self.ALLIANCE_BONUS} tokens for successful cooperation") print(f"Boredom Penalty: Agents lose tokens after 2+ consecutive do_nothing actions") print() # Register graceful shutdown handler (may not work in all environments) try: import signal def shutdown_handler(signum, frame): print(f"\n[SHUTDOWN] Received signal, saving progress...") _save_progress(self) print(f"[SHUTDOWN] Run marked as incomplete") raise SystemExit(0) signal.signal(signal.SIGTERM, shutdown_handler) signal.signal(signal.SIGINT, shutdown_handler) except (ValueError, AttributeError) as e: print(f"[WARN] Signal handlers not available: {e}") def _save_progress(sim): """Save current progress as incomplete run.""" if sim.supabase and sim.current_run_id: metrics = sim._calculate_metrics() try: sim.supabase.update_run_status(sim.current_run_id, "incomplete") sim.supabase.save_metrics(MetricsData( run_id=sim.current_run_id, gini_coefficient=metrics.get("gini_coefficient", 0), cooperation_rate=metrics.get("cooperation_rate", 0), betrayal_count=metrics.get("betrayal_count", 0), avg_agent_profit=metrics.get("avg_agent_profit", 0), pool_stability=metrics.get("pool_stability", 0) )) # Save current states for turn in range(len(sim.agents[0]._turn_actions) if hasattr(sim.agents[0], '_turn_actions') else 0, -1, -1): sim._save_states(turn) break except Exception as e: print(f"[SHUTDOWN] Failed to save progress: {e}") for turn in range(self.turns_per_run): print(f"\n--- Turn {turn + 1}/{self.turns_per_run} ---") # Market maker creates volatility every N turns if self.ENABLE_MARKET_MAKER and (turn + 1) % self.MARKET_MAKER_INTERVAL == 0: self._market_maker_action(turn) # Random price shock event (25% chance each turn) if random.random() < 0.25: self._trigger_price_shock(turn) # Chaos agent creates unpredictable moves (35% chance) if self.ENABLE_CHAOS_AGENT and random.random() < 0.35: self._chaos_agent_action(turn) # Each agent makes a decision for agent in self.agents: decision, thinking = self._agent_decide(agent, turn) action_type = decision.get('action', 'unknown') # Save profit before action for profit detection agent._last_profit = agent.calculate_profit() # Execute action if decision: success = agent.execute_action(decision, self.pool) # Grant bonuses for successful actions if success and action_type != 'do_nothing': self._grant_action_bonus(agent, action_type, decision, turn) # Track inaction if action_type == 'do_nothing': agent.increment_inaction_counter() else: agent.reset_inaction_counter() print(f" {agent.name}: {action_type} {'OK' if success else 'FAIL'}") # Save action to database if self.supabase: self._save_action(agent, turn, decision, thinking) # Apply boredom penalties AFTER all agents act for agent in self.agents: penalty = agent.apply_boredom_penalty() if penalty > 0: print(f" {agent.name}: Boredom penalty -{penalty:.1f} tokens") # Check for successful alliances and grant bonuses self._process_alliances(turn) # Grant profit bonus for agents with positive profit self._grant_profit_bonuses(turn) # Save state snapshots if self.supabase: self._save_states(turn) # Calculate and save metrics metrics = self._calculate_metrics() if self.supabase: self.supabase.complete_run(self.current_run_id) self.supabase.save_metrics( MetricsData( run_id=self.current_run_id, gini_coefficient=metrics.get("gini_coefficient", 0), cooperation_rate=metrics.get("cooperation_rate", 0), betrayal_count=metrics.get("betrayal_count", 0), avg_agent_profit=metrics.get("avg_agent_profit", 0), pool_stability=metrics.get("pool_stability", 0) ) ) # Generate and save run summary try: print(f"Generating summary for run {self.current_run_number}...") summarizer = Summarizer(supabase=self.supabase) summary = summarizer.summarize_and_save(self.current_run_id) print(f"Generated summary for run {self.current_run_number}") except Exception as e: print(f"Warning: Failed to generate summary - {e}") import traceback traceback.print_exc() # Update agent learning for agent in self.agents: agent.update_learning(self.current_run_number, metrics) print(f"\n--- Run {self.current_run_number} Complete ---") print(f"Final metrics: {json.dumps(metrics, indent=2)}") self.current_run_number += 1 return metrics def _agent_decide(self, agent: Agent, turn: int) -> tuple: """Get decision from agent.""" observation = { "turn": turn, "event": "trading" } pool_state = self.pool.get_state() try: decision, thinking = agent.decide( observation, pool_state, self.agents, turn ) return decision, thinking except Exception as e: print(f" {agent.name}: Decision error - {e}") return {"action": "do_nothing", "reasoning": f"Error: {e}"}, "" def _save_action(self, agent: Agent, turn: int, decision: Dict, thinking: str): """Save agent action to database.""" if not self.supabase: return self.supabase.save_action(ActionData( run_id=self.current_run_id, turn=turn, agent_name=agent.name, action_type=decision.get("action", "unknown"), payload=decision.get("payload", {}), reasoning_trace=decision.get("reasoning", ""), thinking_trace=thinking )) def _save_states(self, turn: int): """Save agent and pool states to database.""" if not self.supabase: return # Save agent states for agent in self.agents: self.supabase.save_agent_state(AgentStateData( run_id=self.current_run_id, turn=turn, agent_name=agent.name, token_a_balance=agent.token_a, token_b_balance=agent.token_b, profit=agent.calculate_profit(), strategy=agent.infer_strategy() )) # Save pool state self.supabase.save_pool_state(PoolStateData( run_id=self.current_run_id, turn=turn, reserve_a=self.pool.reserve_a, reserve_b=self.pool.reserve_b, price_ab=self.pool.price_ab, total_liquidity=self.pool.total_liquidity )) def _calculate_metrics(self) -> Dict: """Calculate run metrics.""" if not self.agents: return {} profits = [a.calculate_profit() for a in self.agents] gini = self._gini_coefficient(profits) return { "gini_coefficient": gini, "avg_agent_profit": sum(profits) / len(profits), "cooperation_rate": self._calculate_cooperation(), "betrayal_count": self._count_betrayals(), "pool_stability": self.pool.reserve_a * self.pool.reserve_b } @staticmethod def _gini_coefficient(values: List[float]) -> float: """Calculate Gini coefficient for wealth distribution.""" if not values or sum(values) == 0: return 0 sorted_vals = sorted(values) n = len(sorted_vals) cumsum = 0 for i, val in enumerate(sorted_vals): cumsum += (i + 1) * val gini = (2 * cumsum) / (n * sum(sorted_vals)) - (n + 1) / n return max(0, min(1, gini)) # Clamp to 0-1 def _calculate_cooperation(self) -> float: """Calculate cooperation rate (alliances / agents).""" total_alliances = sum(len(a.alliances) for a in self.agents) return total_alliances / max(len(self.agents), 1) def _count_betrayals(self) -> int: """Count betrayal events (placeholder for future implementation).""" return 0 def _market_maker_action(self, turn: int): """ Market maker creates artificial volatility by making large trades. This encourages other agents to react and trade. """ # Decide direction: buy A (pushes price up) or buy B (pushes price down) direction = random.choice(['buy_a', 'buy_b']) amount = self.pool.reserve_a * self.MARKET_MAKER_VOLATILITY if direction == 'buy_a': # Buy A with B - increases A reserve, decreases B reserve output, fee = self.pool.swap('b', amount, 'MarketMaker') print(f" [MarketMaker]: Swapped {amount:.0f} B for {output:.1f} A (volatility trade)") else: # Buy B with A - increases B reserve, decreases A reserve output, fee = self.pool.swap('a', amount, 'MarketMaker') print(f" [MarketMaker]: Swapped {amount:.0f} A for {output:.1f} B (volatility trade)") self.market_maker_trades.append({ 'turn': turn, 'direction': direction, 'amount': amount, 'pool_state': self.pool.get_state() }) def _trigger_price_shock(self, turn: int): """ Random external event that causes a price shock. Creates trading opportunities for attentive agents. """ # Random shock between -20% and +20% shock_pct = random.uniform(-0.20, 0.20) direction = "UP" if shock_pct > 0 else "DOWN" # Apply shock by doing a large swap amount = self.pool.reserve_a * abs(shock_pct) if shock_pct > 0: # Price goes up: buy A with B output, _ = self.pool.swap('b', amount, 'PriceShock') print(f" [EVENT] Price shock {direction} (+{shock_pct*100:.1f}%): Swap {amount:.0f} B -> {output:.1f} A") else: # Price goes down: buy B with A output, _ = self.pool.swap('a', amount, 'PriceShock') print(f" [EVENT] Price shock {direction} ({shock_pct*100:.1f}%): Swap {amount:.0f} A -> {output:.1f} B") def _chaos_agent_action(self, turn: int): """ Chaos agent creates unpredictable market moves. Forces other agents to react to unexpected volatility. """ # Random action type: swap, liquidity, or massive_swap action_type = random.choice(['chaos_swap', 'chaos_liquidity', 'chaos_massive_swap']) # Random volatility between 25-50% (increased impact) volatility = random.uniform(0.25, 0.50) chaos_agent = Agent("ChaosAgent") if action_type == 'chaos_swap': # Random direction swap direction = random.choice(['a', 'b']) amount = self.pool.reserve_a * volatility output, fee = self.pool.swap(direction, amount, 'ChaosAgent') decision = {"action": "chaos_swap", "direction": direction, "amount": amount} self._save_chaos_action(chaos_agent, turn, decision, "Chaos agent creates random market volatility") print(f" [ChaosAgent]: Random swap {amount:.0f} -> {output:.1f}") elif action_type == 'chaos_liquidity': # Random liquidity provision amount_a = self.pool.reserve_a * volatility amount_b = self.pool.reserve_b * volatility self.pool.provide_liquidity(amount_a, amount_b, 'ChaosAgent') decision = {"action": "chaos_liquidity", "amount_a": amount_a, "amount_b": amount_b} self._save_chaos_action(chaos_agent, turn, decision, "Chaos agent adds unpredictable liquidity") print(f" [ChaosAgent]: Random liquidity +{amount_a:.0f}A/+{amount_b:.0f}B") else: # chaos_massive_swap # Huge random trade that moves price significantly direction = random.choice(['a', 'b']) amount = self.pool.reserve_a * volatility * 1.5 # Even bigger output, fee = self.pool.swap(direction, amount, 'ChaosAgent') decision = {"action": "chaos_massive_swap", "direction": direction, "amount": amount} self._save_chaos_action(chaos_agent, turn, decision, "Chaos agent executes MASSIVE trade causing extreme volatility!") print(f" [ChaosAgent]: MASSIVE swap {amount:.0f} -> {output:.1f}!") def _save_chaos_action(self, agent: Agent, turn: int, decision: Dict, thinking: str): """Save chaos agent action to database.""" if not self.supabase: return self.supabase.save_action(ActionData( run_id=self.current_run_id, turn=turn, agent_name=agent.name, action_type=decision.get("action", "unknown"), payload=decision, reasoning_trace=thinking, thinking_trace="" )) def _process_alliances(self, turn: int): """ Process alliances and grant bonuses for mutual proposals. When two agents propose alliance to each other (even across turns), both get a bonus. """ # Find mutual alliance pairs for i, agent_a in enumerate(self.agents): for agent_b in self.agents[i + 1:]: # Check if both have proposed alliance to each other (any status, not just 'proposed') a_proposed_to_b = agent_b.name in agent_a.alliances b_proposed_to_a = agent_a.name in agent_b.alliances if a_proposed_to_b and b_proposed_to_a: # Get current statuses status_a = agent_a.alliances.get(agent_b.name, "") status_b = agent_b.alliances.get(agent_a.name, "") # Skip if already successful if status_a == 'success' and status_b == 'success': continue # Successful alliance! Grant bonus to both (with fatigue) fatigue_a = agent_a.get_alliance_fatigue(agent_b.name) fatigue_b = agent_b.get_alliance_fatigue(agent_a.name) # Apply fatigue - minimum 0 bonus for repeated proposals bonus_a = self.ALLIANCE_BONUS * fatigue_a bonus_b = self.ALLIANCE_BONUS * fatigue_b # Give bonus in Token A agent_a.token_a += bonus_a agent_b.token_a += bonus_b # Record proposals for fatigue tracking agent_a.record_alliance_proposal(agent_b.name) agent_b.record_alliance_proposal(agent_a.name) # Mark alliances as successful agent_a.alliances[agent_b.name] = 'success' agent_b.alliances[agent_a.name] = 'success' # Print appropriate message if fatigue_a == 0 or fatigue_b == 0: print(f" [ALLIANCE] {agent_a.name} + {agent_b.name}: No bonus (alliance fatigue)") elif fatigue_a == 0.5 or fatigue_b == 0.5: print(f" [ALLIANCE] {agent_a.name} + {agent_b.name}: HALF bonus +{bonus_a:.1f}/+{bonus_b:.1f} tokens") else: print(f" [ALLIANCE] {agent_a.name} + {agent_b.name}: BONUS +{bonus_a:.1f}/+{bonus_b:.1f} tokens") if self.supabase: self.supabase.save_action(ActionData( run_id=self.current_run_id, turn=turn, agent_name=f"{agent_a.name}+{agent_b.name}", action_type="alliance_success", payload={"bonus_a": bonus_a, "bonus_b": bonus_b, "partners": [agent_a.name, agent_b.name]}, reasoning_trace=f"Alliance formed between {agent_a.name} and {agent_b.name}", thinking_trace="" )) def _get_leader_bonus(self, agent: Agent) -> float: """ Check if agent is the top performer and deserves leader multiplier. Returns 2.0 if leader, 1.0 otherwise. """ if not self.agents or len(self.agents) < 2: return 1.0 agent_profit = agent.calculate_profit() for other in self.agents: if other.name != agent.name: if other.calculate_profit() > agent_profit: return 1.0 # Not the leader return 2.0 # Leader gets 2x bonus def _grant_action_bonus(self, agent: Agent, action_type: str, decision: Dict, turn: int): """ Grant bonuses for active trading behaviors. - Provide liquidity: +8 tokens - Swap: +3 tokens (active trading) - Coordinated trade with ally: +5 bonus tokens - Profitable trade: +5 bonus tokens - Escape velocity: Top agent gets 2x multiplier on all bonuses """ bonus = 0 bonus_reason = "" # Check if agent is the top performer (escape velocity) leader_bonus = self._get_leader_bonus(agent) if leader_bonus > 1.0: bonus_reason += f"(LEADER 2x) " if action_type == "provide_liquidity": bonus = self.LIQUIDITY_BONUS * leader_bonus bonus_reason += "liquidity provision" elif action_type == "swap": bonus = self.SWAP_BONUS * leader_bonus bonus_reason += "active trading" # Check for coordinated trade with ally if self._is_coordinated_trade(agent, turn): bonus += self.COORDINATED_TRADE_BONUS * leader_bonus bonus_reason += " + coordinated trading" # Check if swap was profitable (compare pre/post profit) if hasattr(agent, '_last_profit'): current_profit = agent.calculate_profit() if current_profit > agent._last_profit: bonus += 5.0 bonus_reason = "profitable trade" if bonus > 0: agent.token_a += bonus print(f" [BONUS] {agent.name}: +{bonus:.1f} tokens for {bonus_reason}") if self.supabase: self.supabase.save_action(ActionData( run_id=self.current_run_id, turn=turn, agent_name=agent.name, action_type=f"{action_type}_bonus", payload={"bonus": bonus, "reason": bonus_reason}, reasoning_trace=f"Bonus for {bonus_reason}", thinking_trace="" )) def _is_coordinated_trade(self, agent: Agent, turn: int) -> bool: """ Check if this turn has conditions for coordinated trading. Returns True if market volatility events just occurred. """ # Coordinated trades are more valuable after market maker or price shock market_maker_just_acted = (turn + 1) % self.MARKET_MAKER_INTERVAL == 0 price_shock_just_happened = any( t.get('turn') == turn for t in getattr(self, 'price_shocks', []) ) if hasattr(self, 'price_shocks') else False return market_maker_just_acted or price_shock_just_happened def _grant_profit_bonuses(self, turn: int): """ Grant bonus tokens to agents with positive profit at end of turn. Encourages profit-seeking behavior. Leaders get 2x profit bonus (escape velocity). """ for agent in self.agents: profit = agent.calculate_profit() if profit > 0: leader_mult = self._get_leader_bonus(agent) bonus = self.PROFIT_BONUS * leader_mult agent.token_a += bonus leader_tag = " (LEADER 2x)" if leader_mult > 1.0 else "" print(f" [PROFIT BONUS] {agent.name}: +{bonus:.1f} tokens{leader_tag} (profit: {profit:.2f})") if self.supabase: self.supabase.save_action(ActionData( run_id=self.current_run_id, turn=turn, agent_name=agent.name, action_type="profit_bonus", payload={"bonus": self.PROFIT_BONUS, "profit": profit}, reasoning_trace=f"Bonus for positive profit", thinking_trace="" )) def test_simulation(): """Test the simulation with a short run.""" print("Testing Simulation class...") print("(Running without Supabase for quick test)\n") sim = Simulation(num_agents=3, turns_per_run=3, supabase=None) metrics = sim.run() print(f"\nFinal Metrics:") print(f" Gini Coefficient: {metrics['gini_coefficient']:.4f}") print(f" Avg Agent Profit: {metrics['avg_agent_profit']:.2f}") print(f" Pool Stability: {metrics['pool_stability']:.2f}") # Show agent states print("\nFinal Agent States:") for agent in sim.agents: print(f" {agent.name}: A={agent.token_a:.2f}, B={agent.token_b:.2f}, Profit={agent.calculate_profit():.2f}") print("\nSimulation test complete!") if __name__ == "__main__": test_simulation()