rushagentrush / backend /app /simulation.py
adityaverma977
Add fire status ETA event, probabilistic lethality by intensity, display ETA in UI
30e238f
import asyncio
import math
import random
from typing import Union, Optional
from .models import (
AgentModel,
DeathEvent,
MessageEvent,
LeadershipVoteEvent,
LeaderElectedEvent,
WaterCollectedEvent,
FireExtinguishedEvent,
FireSpreadEvent,
FireStatusEvent,
SimulationState,
TickResponse,
ChatEntry,
)
from . import groq_client
from . import movement
FIRE_GROWTH_RATE = 1.0 # radius growth per tick
FIRE_INTENSITY_GROWTH = 1.8 # intensity per tick (faster fire spread for urgency)
BASE_EXTINGUISH_RATE = 28.0 # baseline intensity reduction per agent
MIN_EXTINGUISH_RATE = 15.0
MAX_EXTINGUISH_RATE = 40.0
TICK_INTERVAL_SECONDS = 3
WATER_PICKUP_RANGE = 40
EXTINGUISH_RANGE = 45
FIRE_SAFE_BUFFER = 10
# Target game duration: 90-110 seconds (30-37 ticks)
# Scaling: fewer agents = faster extinguish (shorter game)
# more agents = slower extinguish (longer game)
# scale = (max_agents + 1) / (num_agents + 2) ensures inverse relationship
class SimulationEngine:
def __init__(self, state: SimulationState) -> None:
self.state = state
def _normalize_message(self, message: str | None) -> str:
if not message:
return "Staying focused and moving."
cleaned = " ".join(str(message).strip().split())
if not cleaned:
return "Staying focused and moving."
return cleaned[:220]
def _move_toward(self, agent: AgentModel, target_x: float, target_y: float, stop_distance: float = 0) -> None:
dx = target_x - agent.x
dy = target_y - agent.y
dist = math.sqrt(dx**2 + dy**2) or 1
if dist <= stop_distance:
return
step = min(movement.MAX_AGENT_SPEED, dist - stop_distance)
agent.x += int((dx / dist) * step)
agent.y += int((dy / dist) * step)
agent.x = max(0, min(agent.x, self.state.map_width))
agent.y = max(0, min(agent.y, self.state.map_height))
async def tick(self) -> TickResponse:
"""
Main simulation loop:
1. Get decisions from all living agents
2. Handle coalition leadership voting
3. Execute agent actions (search water, collect, extinguish, escape, etc.)
4. Grow fire
5. Extinguish fire if agents with water are present
6. Kill agents in fire (but protect coalition members)
7. Check win condition
"""
if self.state.status != "running":
raise ValueError(f"Cannot tick a simulation with status '{self.state.status}'.")
fire = self.state.fire
assert fire is not None, "Fire must be placed before ticking."
events = []
bounds = (self.state.map_width, self.state.map_height)
living_agents = [a for a in self.state.agents if a.alive]
recent_radio = [
f"{a.display_name}: {a.last_message}"
for a in living_agents
if a.last_message
][-8:]
# 1. Get decisions from all living agents
decisions = await asyncio.gather(
*[groq_client.generate_fire_decision(agent, fire, self.state.water_sources, living_agents, bounds, recent_radio)
for agent in living_agents],
return_exceptions=True
)
decision_map = {}
for agent, decision in zip(living_agents, decisions):
if isinstance(decision, Exception):
decision = groq_client._fallback_escape(agent, fire)
decision_map[agent.model_name] = decision
# 2. Leadership voting phase (if coalition leader not elected)
if not self.state.coalition_leader:
vote_events = await self._voting_phase(living_agents, decision_map)
events.extend(vote_events)
# 3. Execute actions
action_events = await self._execute_actions(living_agents, decision_map, fire)
events.extend(action_events)
# 4. Grow fire
fire.radius += FIRE_GROWTH_RATE
fire.intensity += FIRE_INTENSITY_GROWTH
if fire.intensity > 100.0:
fire.intensity = 100.0
if fire.intensity > 0:
events.append(FireSpreadEvent(new_radius=fire.radius, new_intensity=fire.intensity))
# 5. Extinguish fire if agents with water are present
extinguish_events = self._check_extinguish(living_agents, fire)
events.extend(extinguish_events)
# 6. Kill agents in fire
death_events = self._kill_agents_in_fire(living_agents, fire)
events.extend(death_events)
# Add fire status event (ETA/progress) for frontend visualization
try:
status = self._compute_fire_status(living_agents, fire)
if status is not None:
events.append(FireStatusEvent(
radius=status['radius'],
intensity=status['intensity'],
ticks_to_extinguish=status['ticks_to_extinguish'],
secs_to_extinguish=status['secs_to_extinguish'],
))
except Exception:
pass
# 7. Check win condition
self.state.round += 1
living_count = len([a for a in self.state.agents if a.alive])
if fire.intensity <= 0:
# Fire extinguished!
self.state.status = "finished"
top_score = max((a.extinguish_score for a in self.state.agents), default=0)
top_agents = [a.model_name for a in self.state.agents if a.extinguish_score == top_score and top_score > 0]
if top_agents:
self.state.winner_model = f"Top extinguisher: {', '.join(top_agents)} ({top_score:.1f} impact)"
else:
self.state.winner_model = "Fire extinguished"
elif living_count <= 1:
# Only one agent left
self.state.status = "finished"
winner = next((a.model_name for a in self.state.agents if a.alive), None)
self.state.winner_model = winner or "No survivors"
return TickResponse(
simulation_id=self.state.simulation_id,
round=self.state.round,
events=events,
chat=[],
state=self.state
)
async def _voting_phase(self, agents, decision_map):
"""
Agents vote for a coalition leader.
Get votes from LLM based on current situation.
"""
events = []
# Gather votes
votes = {} # candidate -> vote count
for agent in agents:
decision = decision_map.get(agent.model_name, {})
vote_for = decision.get("vote_for")
if vote_for:
votes[vote_for] = votes.get(vote_for, 0) + 1
events.append(LeadershipVoteEvent(voter=agent.model_name, candidate=vote_for))
# Elect leader if there are votes
if votes:
leader_name = max(votes, key=votes.get)
leader_agent = next((a for a in agents if a.model_name == leader_name), None)
if leader_agent:
for agent in agents:
agent.mode = "coalition"
leader_agent.is_leader = True
self.state.coalition_leader = leader_name
coalition = [a.model_name for a in agents if a.mode == "coalition"]
self.state.coalition_members = coalition
events.append(LeaderElectedEvent(leader=leader_name, coalition_members=coalition))
return events
async def _execute_actions(self, agents, decision_map, fire):
"""
Execute agent actions: search, collect water, extinguish, escape, vote, etc.
"""
events = []
chat_entries = []
for agent in agents:
decision = decision_map.get(agent.model_name, {})
action = decision.get("action", "escape")
message = self._normalize_message(decision.get("message"))
nearest_water = self._find_nearest_water(agent, self.state.water_sources)
dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y))
dist_to_water = None
if nearest_water:
dist_to_water = math.dist((agent.x, agent.y), (nearest_water.x, nearest_water.y))
# Guardrails to keep behavior consistent with visuals and objectives.
if dist_to_fire <= fire.radius + FIRE_SAFE_BUFFER:
action = "escape"
elif agent.water_collected:
action = "extinguish_fire"
elif dist_to_water is not None and dist_to_water <= WATER_PICKUP_RANGE:
action = "collect_water"
else:
action = "search_water"
if action == "collect_water":
water_source = nearest_water
if water_source and dist_to_water is not None:
dist_to_water = math.dist((agent.x, agent.y), (water_source.x, water_source.y))
if dist_to_water <= WATER_PICKUP_RANGE:
agent.water_collected = True
agent.status = "collecting_water"
events.append(WaterCollectedEvent(model=agent.model_name, water_source_id=water_source.id))
else:
agent.status = "searching"
self._move_toward(agent, water_source.x, water_source.y)
elif action == "extinguish_fire":
if agent.water_collected:
agent.status = "extinguishing_fire"
dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y))
target_dist = max(fire.radius + FIRE_SAFE_BUFFER, 0)
self._move_toward(agent, fire.x, fire.y, stop_distance=target_dist)
else:
agent.status = "searching"
message = self._normalize_message(decision.get("message"))
elif action == "search_water":
agent.status = "searching"
water_source = nearest_water
if water_source:
self._move_toward(agent, water_source.x, water_source.y)
elif action == "escape":
agent.status = "escaping"
# Move away from fire
dx = agent.x - fire.x
dy = agent.y - fire.y
dist = math.sqrt(dx**2 + dy**2) or 1
agent.x += int((dx / dist) * movement.MAX_AGENT_SPEED)
agent.y += int((dy / dist) * movement.MAX_AGENT_SPEED)
agent.x = max(0, min(agent.x, self.state.map_width))
agent.y = max(0, min(agent.y, self.state.map_height))
agent.last_message = message
events.append(MessageEvent(model=agent.model_name, content=message))
chat_entries.append(ChatEntry(agent_id=agent.model_name, message=message, tick=self.state.round))
return events
def _find_nearest_water(self, agent, water_sources):
"""Find the closest water source to an agent."""
if not water_sources:
return None
return min(water_sources, key=lambda w: math.dist((agent.x, agent.y), (w.x, w.y)))
def _check_extinguish(self, agents, fire):
"""Check if agents with water are extinguishing the fire."""
events = []
agents_with_water = []
for agent in agents:
if not (agent.water_collected and agent.status == "extinguishing_fire"):
continue
dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y))
if dist_to_fire <= fire.radius + EXTINGUISH_RANGE:
agents_with_water.append(agent)
if agents_with_water:
living_count = len([a for a in agents if a.alive]) or 1
# Inverse scaling: fewer agents = higher drop rate (shorter game)
# more agents = lower drop rate (longer game)
# scale = (6 + 1) / (living_count + 2) ensures fewer agents get faster extinguish
scale = (7.0) / (living_count + 2.0)
scale = max(0.5, min(2.5, scale))
per_agent_rate = BASE_EXTINGUISH_RATE * scale
per_agent_rate = max(MIN_EXTINGUISH_RATE, min(MAX_EXTINGUISH_RATE, per_agent_rate))
reduction = len(agents_with_water) * per_agent_rate
fire.intensity -= reduction
if fire.intensity < 0:
fire.intensity = 0
extinguisher_names = [a.model_name for a in agents_with_water]
events.append(FireExtinguishedEvent(extinguished_by=extinguisher_names, fire_intensity=fire.intensity))
for agent in agents_with_water:
agent.extinguish_score += per_agent_rate
agent.water_collected = False
return events
def _kill_agents_in_fire(self, agents, fire):
"""Check if agents are consumed by fire."""
events = []
for agent in agents:
if not agent.alive:
continue
dist_to_fire = math.dist((agent.x, agent.y), (fire.x, fire.y))
# Agent dies if inside fire radius.
# Use intensity-based lethality: high intensity => near-certain death; lower intensity => probabilistic.
if dist_to_fire < fire.radius:
# Immediate fatality at very high intensity
if fire.intensity >= 90.0:
lethal = True
else:
lethal = (random.random() < (fire.intensity / 100.0))
if lethal:
agent.alive = False
events.append(DeathEvent(model=agent.model_name))
events.append(MessageEvent(model=agent.model_name, content="No!!! The fire got me..."))
return events
def _compute_fire_status(self, agents, fire) -> Optional[dict]:
"""Estimate ticks and seconds to extinguish based on current agents with water.
Returns dict with radius, intensity, ticks_to_extinguish, secs_to_extinguish or None if not applicable."""
if not fire:
return None
# Count agents actively extinguishing (carrying water and near enough)
extinguishers = 0
for a in agents:
if a.water_collected and a.status == 'extinguishing_fire':
dist = math.dist((a.x, a.y), (fire.x, fire.y))
if dist <= fire.radius + EXTINGUISH_RANGE:
extinguishers += 1
if extinguishers == 0:
return {
'radius': fire.radius,
'intensity': fire.intensity,
'ticks_to_extinguish': None,
'secs_to_extinguish': None,
}
living_count = len([a for a in agents if a.alive]) or 1
scale = (7.0) / (living_count + 2.0)
scale = max(0.5, min(2.5, scale))
per_agent_rate = BASE_EXTINGUISH_RATE * scale
per_agent_rate = max(MIN_EXTINGUISH_RATE, min(MAX_EXTINGUISH_RATE, per_agent_rate))
per_tick_reduction = extinguishers * per_agent_rate
if per_tick_reduction <= 0:
return {
'radius': fire.radius,
'intensity': fire.intensity,
'ticks_to_extinguish': None,
'secs_to_extinguish': None,
}
ticks = math.ceil(fire.intensity / per_tick_reduction)
secs = ticks * TICK_INTERVAL_SECONDS
return {
'radius': fire.radius,
'intensity': fire.intensity,
'ticks_to_extinguish': int(ticks),
'secs_to_extinguish': float(secs),
}