rushagentrush / app /simulation.py
adityaverma977
Switch to dynamic model-generated chat with context
1bb7020
import asyncio
import math
from typing import Union
from .models import (
AgentModel,
DeathEvent,
MessageEvent,
LeadershipVoteEvent,
LeaderElectedEvent,
WaterCollectedEvent,
FireExtinguishedEvent,
FireSpreadEvent,
SimulationState,
TickResponse,
ChatEntry,
)
from . import groq_client
from . import movement
FIRE_GROWTH_RATE = 1.0 # radius growth per tick
FIRE_INTENSITY_GROWTH = 0.9 # intensity per tick
BASE_EXTINGUISH_RATE = 15.0 # baseline intensity reduction per agent
MIN_EXTINGUISH_RATE = 8.0
MAX_EXTINGUISH_RATE = 28.0
TICK_INTERVAL_SECONDS = 3
WATER_PICKUP_RANGE = 40
EXTINGUISH_RANGE = 45
FIRE_SAFE_BUFFER = 10
class SimulationEngine:
def __init__(self, state: SimulationState) -> None:
self.state = state
def _normalize_message(self, message: str | None) -> str:
if not message:
return "Staying focused and moving."
cleaned = " ".join(str(message).strip().split())
if not cleaned:
return "Staying focused and moving."
return cleaned[:220]
def _move_toward(self, agent: AgentModel, target_x: float, target_y: float, stop_distance: float = 0) -> None:
dx = target_x - agent.x
dy = target_y - agent.y
dist = math.sqrt(dx**2 + dy**2) or 1
if dist <= stop_distance:
return
step = min(movement.MAX_AGENT_SPEED, dist - stop_distance)
agent.x += int((dx / dist) * step)
agent.y += int((dy / dist) * step)
agent.x = max(0, min(agent.x, self.state.map_width))
agent.y = max(0, min(agent.y, self.state.map_height))
async def tick(self) -> TickResponse:
"""
Main simulation loop:
1. Get decisions from all living agents
2. Handle coalition leadership voting
3. Execute agent actions (search water, collect, extinguish, escape, etc.)
4. Grow fire
5. Extinguish fire if agents with water are present
6. Kill agents in fire (but protect coalition members)
7. Check win condition
"""
if self.state.status != "running":
raise ValueError(f"Cannot tick a simulation with status '{self.state.status}'.")
fire = self.state.fire
assert fire is not None, "Fire must be placed before ticking."
events = []
bounds = (self.state.map_width, self.state.map_height)
living_agents = [a for a in self.state.agents if a.alive]
recent_radio = [
f"{a.display_name}: {a.last_message}"
for a in living_agents
if a.last_message
][-8:]
# 1. Get decisions from all living agents
decisions = await asyncio.gather(
*[groq_client.generate_fire_decision(agent, fire, self.state.water_sources, living_agents, bounds, recent_radio)
for agent in living_agents],
return_exceptions=True
)
decision_map = {}
for agent, decision in zip(living_agents, decisions):
if isinstance(decision, Exception):
decision = groq_client._fallback_escape(agent, fire)
decision_map[agent.model_name] = decision
# 2. Leadership voting phase (if coalition leader not elected)
if not self.state.coalition_leader:
vote_events = await self._voting_phase(living_agents, decision_map)
events.extend(vote_events)
# 3. Execute actions
action_events = await self._execute_actions(living_agents, decision_map, fire)
events.extend(action_events)
# 4. Grow fire
fire.radius += FIRE_GROWTH_RATE
fire.intensity += FIRE_INTENSITY_GROWTH
if fire.intensity > 100.0:
fire.intensity = 100.0
if fire.intensity > 0:
events.append(FireSpreadEvent(new_radius=fire.radius, new_intensity=fire.intensity))
# 5. Extinguish fire if agents with water are present
extinguish_events = self._check_extinguish(living_agents, fire)
events.extend(extinguish_events)
# 6. Kill agents in fire
death_events = self._kill_agents_in_fire(living_agents, fire)
events.extend(death_events)
# 7. Check win condition
self.state.round += 1
living_count = len([a for a in self.state.agents if a.alive])
if fire.intensity <= 0:
# Fire extinguished!
self.state.status = "finished"
top_score = max((a.extinguish_score for a in self.state.agents), default=0)
top_agents = [a.model_name for a in self.state.agents if a.extinguish_score == top_score and top_score > 0]
if top_agents:
self.state.winner_model = f"Top extinguisher: {', '.join(top_agents)} ({top_score:.1f} impact)"
else:
self.state.winner_model = "Fire extinguished"
elif living_count <= 1:
# Only one agent left
self.state.status = "finished"
winner = next((a.model_name for a in self.state.agents if a.alive), None)
self.state.winner_model = winner or "No survivors"
return TickResponse(
simulation_id=self.state.simulation_id,
round=self.state.round,
events=events,
chat=[],
state=self.state
)
async def _voting_phase(self, agents, decision_map):
"""
Agents vote for a coalition leader.
Get votes from LLM based on current situation.
"""
events = []
# Gather votes
votes = {} # candidate -> vote count
for agent in agents:
decision = decision_map.get(agent.model_name, {})
vote_for = decision.get("vote_for")
if vote_for:
votes[vote_for] = votes.get(vote_for, 0) + 1
events.append(LeadershipVoteEvent(voter=agent.model_name, candidate=vote_for))
# Elect leader if there are votes
if votes:
leader_name = max(votes, key=votes.get)
leader_agent = next((a for a in agents if a.model_name == leader_name), None)
if leader_agent:
for agent in agents:
agent.mode = "coalition"
leader_agent.is_leader = True
self.state.coalition_leader = leader_name
coalition = [a.model_name for a in agents if a.mode == "coalition"]
self.state.coalition_members = coalition
events.append(LeaderElectedEvent(leader=leader_name, coalition_members=coalition))
return events
async def _execute_actions(self, agents, decision_map, fire):
"""
Execute agent actions: search, collect water, extinguish, escape, vote, etc.
"""
events = []
chat_entries = []
for agent in agents:
decision = decision_map.get(agent.model_name, {})
action = decision.get("action", "escape")
message = self._normalize_message(decision.get("message"))
nearest_water = self._find_nearest_water(agent, self.state.water_sources)
dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y))
dist_to_water = None
if nearest_water:
dist_to_water = math.dist((agent.x, agent.y), (nearest_water.x, nearest_water.y))
# Guardrails to keep behavior consistent with visuals and objectives.
if dist_to_fire <= fire.radius + FIRE_SAFE_BUFFER:
action = "escape"
elif agent.water_collected:
action = "extinguish_fire"
elif dist_to_water is not None and dist_to_water <= WATER_PICKUP_RANGE:
action = "collect_water"
else:
action = "search_water"
if action == "collect_water":
water_source = nearest_water
if water_source and dist_to_water is not None:
dist_to_water = math.dist((agent.x, agent.y), (water_source.x, water_source.y))
if dist_to_water <= WATER_PICKUP_RANGE:
agent.water_collected = True
agent.status = "collecting_water"
events.append(WaterCollectedEvent(model=agent.model_name, water_source_id=water_source.id))
else:
agent.status = "searching"
self._move_toward(agent, water_source.x, water_source.y)
elif action == "extinguish_fire":
if agent.water_collected:
agent.status = "extinguishing_fire"
dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y))
target_dist = max(fire.radius + FIRE_SAFE_BUFFER, 0)
self._move_toward(agent, fire.x, fire.y, stop_distance=target_dist)
else:
agent.status = "searching"
message = self._normalize_message(decision.get("message"))
elif action == "search_water":
agent.status = "searching"
water_source = nearest_water
if water_source:
self._move_toward(agent, water_source.x, water_source.y)
elif action == "escape":
agent.status = "escaping"
# Move away from fire
dx = agent.x - fire.x
dy = agent.y - fire.y
dist = math.sqrt(dx**2 + dy**2) or 1
agent.x += int((dx / dist) * movement.MAX_AGENT_SPEED)
agent.y += int((dy / dist) * movement.MAX_AGENT_SPEED)
agent.x = max(0, min(agent.x, self.state.map_width))
agent.y = max(0, min(agent.y, self.state.map_height))
agent.last_message = message
events.append(MessageEvent(model=agent.model_name, content=message))
chat_entries.append(ChatEntry(agent_id=agent.model_name, message=message, tick=self.state.round))
return events
def _find_nearest_water(self, agent, water_sources):
"""Find the closest water source to an agent."""
if not water_sources:
return None
return min(water_sources, key=lambda w: math.dist((agent.x, agent.y), (w.x, w.y)))
def _check_extinguish(self, agents, fire):
"""Check if agents with water are extinguishing the fire."""
events = []
agents_with_water = []
for agent in agents:
if not (agent.water_collected and agent.status == "extinguishing_fire"):
continue
dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y))
if dist_to_fire <= fire.radius + EXTINGUISH_RANGE:
agents_with_water.append(agent)
if agents_with_water:
living_count = len([a for a in agents if a.alive]) or 1
scale = max(0.5, min(2.0, 2.0 / living_count))
per_agent_rate = BASE_EXTINGUISH_RATE * scale
per_agent_rate = max(MIN_EXTINGUISH_RATE, min(MAX_EXTINGUISH_RATE, per_agent_rate))
reduction = len(agents_with_water) * per_agent_rate
fire.intensity -= reduction
if fire.intensity < 0:
fire.intensity = 0
extinguisher_names = [a.model_name for a in agents_with_water]
events.append(FireExtinguishedEvent(extinguished_by=extinguisher_names, fire_intensity=fire.intensity))
for agent in agents_with_water:
agent.extinguish_score += per_agent_rate
agent.water_collected = False
return events
def _kill_agents_in_fire(self, agents, fire):
"""Check if agents are consumed by fire."""
events = []
for agent in agents:
if not agent.alive:
continue
dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y))
# Agent dies if inside fire radius
if dist_to_fire < fire.radius:
agent.alive = False
events.append(DeathEvent(model=agent.model_name))
events.append(MessageEvent(model=agent.model_name, content="No!!! The fire got me..."))
return events