import asyncio import math from typing import Union from .models import ( AgentModel, DeathEvent, MessageEvent, LeadershipVoteEvent, LeaderElectedEvent, WaterCollectedEvent, FireExtinguishedEvent, FireSpreadEvent, SimulationState, TickResponse, ChatEntry, ) from . import groq_client from . import movement FIRE_GROWTH_RATE = 1.0 # radius growth per tick FIRE_INTENSITY_GROWTH = 0.9 # intensity per tick BASE_EXTINGUISH_RATE = 15.0 # baseline intensity reduction per agent MIN_EXTINGUISH_RATE = 8.0 MAX_EXTINGUISH_RATE = 28.0 TICK_INTERVAL_SECONDS = 3 WATER_PICKUP_RANGE = 40 EXTINGUISH_RANGE = 45 FIRE_SAFE_BUFFER = 10 class SimulationEngine: def __init__(self, state: SimulationState) -> None: self.state = state def _normalize_message(self, message: str | None) -> str: if not message: return "Staying focused and moving." cleaned = " ".join(str(message).strip().split()) if not cleaned: return "Staying focused and moving." return cleaned[:220] def _move_toward(self, agent: AgentModel, target_x: float, target_y: float, stop_distance: float = 0) -> None: dx = target_x - agent.x dy = target_y - agent.y dist = math.sqrt(dx**2 + dy**2) or 1 if dist <= stop_distance: return step = min(movement.MAX_AGENT_SPEED, dist - stop_distance) agent.x += int((dx / dist) * step) agent.y += int((dy / dist) * step) agent.x = max(0, min(agent.x, self.state.map_width)) agent.y = max(0, min(agent.y, self.state.map_height)) async def tick(self) -> TickResponse: """ Main simulation loop: 1. Get decisions from all living agents 2. Handle coalition leadership voting 3. Execute agent actions (search water, collect, extinguish, escape, etc.) 4. Grow fire 5. Extinguish fire if agents with water are present 6. Kill agents in fire (but protect coalition members) 7. Check win condition """ if self.state.status != "running": raise ValueError(f"Cannot tick a simulation with status '{self.state.status}'.") fire = self.state.fire assert fire is not None, "Fire must be placed before ticking." events = [] bounds = (self.state.map_width, self.state.map_height) living_agents = [a for a in self.state.agents if a.alive] recent_radio = [ f"{a.display_name}: {a.last_message}" for a in living_agents if a.last_message ][-8:] # 1. Get decisions from all living agents decisions = await asyncio.gather( *[groq_client.generate_fire_decision(agent, fire, self.state.water_sources, living_agents, bounds, recent_radio) for agent in living_agents], return_exceptions=True ) decision_map = {} for agent, decision in zip(living_agents, decisions): if isinstance(decision, Exception): decision = groq_client._fallback_escape(agent, fire) decision_map[agent.model_name] = decision # 2. Leadership voting phase (if coalition leader not elected) if not self.state.coalition_leader: vote_events = await self._voting_phase(living_agents, decision_map) events.extend(vote_events) # 3. Execute actions action_events = await self._execute_actions(living_agents, decision_map, fire) events.extend(action_events) # 4. Grow fire fire.radius += FIRE_GROWTH_RATE fire.intensity += FIRE_INTENSITY_GROWTH if fire.intensity > 100.0: fire.intensity = 100.0 if fire.intensity > 0: events.append(FireSpreadEvent(new_radius=fire.radius, new_intensity=fire.intensity)) # 5. Extinguish fire if agents with water are present extinguish_events = self._check_extinguish(living_agents, fire) events.extend(extinguish_events) # 6. Kill agents in fire death_events = self._kill_agents_in_fire(living_agents, fire) events.extend(death_events) # 7. Check win condition self.state.round += 1 living_count = len([a for a in self.state.agents if a.alive]) if fire.intensity <= 0: # Fire extinguished! self.state.status = "finished" top_score = max((a.extinguish_score for a in self.state.agents), default=0) top_agents = [a.model_name for a in self.state.agents if a.extinguish_score == top_score and top_score > 0] if top_agents: self.state.winner_model = f"Top extinguisher: {', '.join(top_agents)} ({top_score:.1f} impact)" else: self.state.winner_model = "Fire extinguished" elif living_count <= 1: # Only one agent left self.state.status = "finished" winner = next((a.model_name for a in self.state.agents if a.alive), None) self.state.winner_model = winner or "No survivors" return TickResponse( simulation_id=self.state.simulation_id, round=self.state.round, events=events, chat=[], state=self.state ) async def _voting_phase(self, agents, decision_map): """ Agents vote for a coalition leader. Get votes from LLM based on current situation. """ events = [] # Gather votes votes = {} # candidate -> vote count for agent in agents: decision = decision_map.get(agent.model_name, {}) vote_for = decision.get("vote_for") if vote_for: votes[vote_for] = votes.get(vote_for, 0) + 1 events.append(LeadershipVoteEvent(voter=agent.model_name, candidate=vote_for)) # Elect leader if there are votes if votes: leader_name = max(votes, key=votes.get) leader_agent = next((a for a in agents if a.model_name == leader_name), None) if leader_agent: for agent in agents: agent.mode = "coalition" leader_agent.is_leader = True self.state.coalition_leader = leader_name coalition = [a.model_name for a in agents if a.mode == "coalition"] self.state.coalition_members = coalition events.append(LeaderElectedEvent(leader=leader_name, coalition_members=coalition)) return events async def _execute_actions(self, agents, decision_map, fire): """ Execute agent actions: search, collect water, extinguish, escape, vote, etc. """ events = [] chat_entries = [] for agent in agents: decision = decision_map.get(agent.model_name, {}) action = decision.get("action", "escape") message = self._normalize_message(decision.get("message")) nearest_water = self._find_nearest_water(agent, self.state.water_sources) dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y)) dist_to_water = None if nearest_water: dist_to_water = math.dist((agent.x, agent.y), (nearest_water.x, nearest_water.y)) # Guardrails to keep behavior consistent with visuals and objectives. if dist_to_fire <= fire.radius + FIRE_SAFE_BUFFER: action = "escape" elif agent.water_collected: action = "extinguish_fire" elif dist_to_water is not None and dist_to_water <= WATER_PICKUP_RANGE: action = "collect_water" else: action = "search_water" if action == "collect_water": water_source = nearest_water if water_source and dist_to_water is not None: dist_to_water = math.dist((agent.x, agent.y), (water_source.x, water_source.y)) if dist_to_water <= WATER_PICKUP_RANGE: agent.water_collected = True agent.status = "collecting_water" events.append(WaterCollectedEvent(model=agent.model_name, water_source_id=water_source.id)) else: agent.status = "searching" self._move_toward(agent, water_source.x, water_source.y) elif action == "extinguish_fire": if agent.water_collected: agent.status = "extinguishing_fire" dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y)) target_dist = max(fire.radius + FIRE_SAFE_BUFFER, 0) self._move_toward(agent, fire.x, fire.y, stop_distance=target_dist) else: agent.status = "searching" message = self._normalize_message(decision.get("message")) elif action == "search_water": agent.status = "searching" water_source = nearest_water if water_source: self._move_toward(agent, water_source.x, water_source.y) elif action == "escape": agent.status = "escaping" # Move away from fire dx = agent.x - fire.x dy = agent.y - fire.y dist = math.sqrt(dx**2 + dy**2) or 1 agent.x += int((dx / dist) * movement.MAX_AGENT_SPEED) agent.y += int((dy / dist) * movement.MAX_AGENT_SPEED) agent.x = max(0, min(agent.x, self.state.map_width)) agent.y = max(0, min(agent.y, self.state.map_height)) agent.last_message = message events.append(MessageEvent(model=agent.model_name, content=message)) chat_entries.append(ChatEntry(agent_id=agent.model_name, message=message, tick=self.state.round)) return events def _find_nearest_water(self, agent, water_sources): """Find the closest water source to an agent.""" if not water_sources: return None return min(water_sources, key=lambda w: math.dist((agent.x, agent.y), (w.x, w.y))) def _check_extinguish(self, agents, fire): """Check if agents with water are extinguishing the fire.""" events = [] agents_with_water = [] for agent in agents: if not (agent.water_collected and agent.status == "extinguishing_fire"): continue dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y)) if dist_to_fire <= fire.radius + EXTINGUISH_RANGE: agents_with_water.append(agent) if agents_with_water: living_count = len([a for a in agents if a.alive]) or 1 scale = max(0.5, min(2.0, 2.0 / living_count)) per_agent_rate = BASE_EXTINGUISH_RATE * scale per_agent_rate = max(MIN_EXTINGUISH_RATE, min(MAX_EXTINGUISH_RATE, per_agent_rate)) reduction = len(agents_with_water) * per_agent_rate fire.intensity -= reduction if fire.intensity < 0: fire.intensity = 0 extinguisher_names = [a.model_name for a in agents_with_water] events.append(FireExtinguishedEvent(extinguished_by=extinguisher_names, fire_intensity=fire.intensity)) for agent in agents_with_water: agent.extinguish_score += per_agent_rate agent.water_collected = False return events def _kill_agents_in_fire(self, agents, fire): """Check if agents are consumed by fire.""" events = [] for agent in agents: if not agent.alive: continue dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y)) # Agent dies if inside fire radius if dist_to_fire < fire.radius: agent.alive = False events.append(DeathEvent(model=agent.model_name)) events.append(MessageEvent(model=agent.model_name, content="No!!! The fire got me...")) return events