Marcel0123's picture
Update app.py
d16e5f9 verified
import math
import random
import time
from dataclasses import dataclass, field
from typing import List, Optional, Tuple
import gradio as gr
import numpy as np
from PIL import Image, ImageDraw
# =============================
# Config
# =============================
W, H = 900, 540 # canvas size (px)
ROOM = (60, 40, W - 60, H - 40) # inner room rect (left, top, right, bottom)
DT = 0.05 # seconds per simulation step (~20 FPS)
MAX_SPEED_BASE = 65.0 # px/s baseline
PERSON_RADIUS = 6 # draw radius in px
NEIGHBOR_RANGE = 55.0 # px (perception)
EXIT_W, EXIT_H = 28, 68
# Colors
C_BG = (245, 246, 248, 255)
C_ROOM = (250, 250, 250, 255)
C_BORDER = (28, 28, 28, 255)
C_OBS = (175, 178, 185, 255)
C_EXIT = (52, 132, 255, 255)
C_CALM = (44, 184, 78, 255)
C_ALERT = (250, 191, 35, 255)
C_PANIC = (233, 68, 68, 255)
C_INCIDENT = (230, 60, 60, 96)
# Global running flag (simple & reliable for Spaces)
RUNNING = False
@dataclass
class Person:
x: float
y: float
vx: float
vy: float
stress: float = 0.0 # 0..1
state: int = 0 # 0 calm, 1 alert, 2 panic
evacuated: bool = False
def color(self):
return [C_CALM, C_ALERT, C_PANIC][self.state]
@dataclass
class ExitZone:
x: int
y: int
w: int = EXIT_W
h: int = EXIT_H
def rect(self):
return (self.x, self.y, self.x + self.w, self.y + self.h)
@dataclass
class World:
width: int = W
height: int = H
room: Tuple[int, int, int, int] = ROOM
people: List[Person] = field(default_factory=list)
exits: List[ExitZone] = field(default_factory=list)
obstacles: List[Tuple[int, int, int, int]] = field(default_factory=list)
incident: Optional[Tuple[int, int, int]] = None # (cx, cy, radius)
# Params (controlled by UI)
speed_scale: float = 1.0
cohesion: float = 0.3
separation: float = 0.6
panic_spread: float = 2.0
personal_space: float = 22.0
# cached metrics
evac_count: int = 0
avg_stress: float = 0.0
collisions_proxy: int = 0
def reset_metrics(self):
self.evac_count = sum(1 for p in self.people if p.evacuated)
if self.people:
self.avg_stress = float(np.mean([p.stress for p in self.people if not p.evacuated])) if any(
not p.evacuated for p in self.people) else 0.0
else:
self.avg_stress = 0.0
# simple collisions proxy: count of pairs closer than 0.8 * personal_space (approx)
close = 0
active = [p for p in self.people if not p.evacuated]
n = len(active)
for i in range(n):
pi = active[i]
for j in range(i + 1, n):
pj = active[j]
dx = pi.x - pj.x
dy = pi.y - pj.y
if dx * dx + dy * dy < (0.8 * self.personal_space) ** 2:
close += 1
self.collisions_proxy = close
def step(self, dt: float):
l, t, r, b = self.room
def nearest_exit_vec(px, py):
best = None
best_d2 = 1e12
for ex in self.exits:
cx = ex.x + ex.w / 2
cy = ex.y + ex.h / 2
dx, dy = (cx - px), (cy - py)
d2 = dx * dx + dy * dy
if d2 < best_d2:
best_d2 = d2
best = (dx, dy)
return best if best is not None else (0.0, 0.0)
# update states (stress diffusion + incident influence)
for p in self.people:
if p.evacuated:
continue
# incident influence
if self.incident is not None:
cx, cy, rad = self.incident
dx = p.x - cx
dy = p.y - cy
d = math.hypot(dx, dy)
if d < rad:
# stronger effect closer to center
p.stress += (1 - d / rad) * self.panic_spread * 0.18 * dt
# neighbor panic influence (cheap/approx)
# random subset to keep O(n)
for _ in range(6):
q = random.choice(self.people)
if q.evacuated:
continue
if q is p:
continue
dx = q.x - p.x
dy = q.y - p.y
d2 = dx * dx + dy * dy
if d2 < (NEIGHBOR_RANGE ** 2) and q.state == 2:
p.stress += self.panic_spread * 0.04 * dt
# clamp and assign state
p.stress = max(0.0, min(1.0, p.stress))
if p.stress > 0.65:
p.state = 2
elif p.stress > 0.35:
p.state = 1
else:
p.state = 0
# forces & motion
for i, p in enumerate(self.people):
if p.evacuated:
continue
# skip if already in exit
for ex in self.exits:
x0, y0, x1, y1 = ex.rect()
if x0 <= p.x <= x1 and y0 <= p.y <= y1:
p.evacuated = True
continue
if p.evacuated:
continue
# neighborhood (very light)
# sample 20 random neighbors instead of full n^2
neighbors = random.sample(self.people, k=min(20, len(self.people)))
# separation
fx, fy = 0.0, 0.0
for q in neighbors:
if q is p or q.evacuated:
continue
dx = p.x - q.x
dy = p.y - q.y
d2 = dx * dx + dy * dy
if d2 == 0:
continue
if d2 < (self.personal_space ** 2):
inv = 1.0 / max(1.0, d2)
fx += dx * inv
fy += dy * inv
# cohesion & alignment
cx, cy, cvx, cvy, cnt = 0.0, 0.0, 0.0, 0.0, 0
for q in neighbors:
if q is p or q.evacuated:
continue
dx = q.x - p.x
dy = q.y - p.y
if dx * dx + dy * dy < NEIGHBOR_RANGE ** 2:
cx += q.x
cy += q.y
cvx += q.vx
cvy += q.vy
cnt += 1
if cnt > 0:
cx /= cnt
cy /= cnt
cvx /= cnt
cvy /= cnt
# cohesion
fx += (cx - p.x) * self.cohesion * 0.15
fy += (cy - p.y) * self.cohesion * 0.15
# alignment
fx += (cvx - p.vx) * 0.05
fy += (cvy - p.vy) * 0.05
# goal (nearest exit)
gx, gy = nearest_exit_vec(p.x, p.y)
# scale stronger when panicking
goal_k = 0.6 if p.state == 0 else (0.8 if p.state == 1 else 1.1)
fx += gx * goal_k / 200.0
fy += gy * goal_k / 200.0
# obstacle avoidance (simple push)
for (ox0, oy0, ox1, oy1) in self.obstacles:
# compute penetration vector if inside expanded box
margin = self.personal_space * 0.7
if (ox0 - margin) < p.x < (ox1 + margin) and (oy0 - margin) < p.y < (oy1 + margin):
# push outwards from nearest edge
dx_left = p.x - ox0
dx_right = ox1 - p.x
dy_top = p.y - oy0
dy_bot = oy1 - p.y
m = min(dx_left, dx_right, dy_top, dy_bot)
if m == dx_left:
fx -= 0.8
elif m == dx_right:
fx += 0.8
elif m == dy_top:
fy -= 0.8
else:
fy += 0.8
# walls (room bounds)
x0, y0, x1, y1 = self.room
wall_margin = 8
if p.x < x0 + wall_margin:
fx += 1.2
if p.x > x1 - wall_margin:
fx -= 1.2
if p.y < y0 + wall_margin:
fy += 1.2
if p.y > y1 - wall_margin:
fy -= 1.2
# integrate
p.vx += fx
p.vy += fy
# max speed based on stress
stress_boost = [1.0, 1.2, 1.6][p.state]
vmax = MAX_SPEED_BASE * self.speed_scale * stress_boost
v = math.hypot(p.vx, p.vy)
if v > vmax:
p.vx = p.vx / v * vmax
p.vy = p.vy / v * vmax
p.x += p.vx * dt
p.y += p.vy * dt
# clamp to room
p.x = min(max(p.x, x0 + 2), x1 - 2)
p.y = min(max(p.y, y0 + 2), y1 - 2)
# metrics
self.reset_metrics()
def render(self) -> Image.Image:
img = Image.new("RGBA", (self.width, self.height), C_BG)
d = ImageDraw.Draw(img, "RGBA")
# room
d.rectangle(self.room, fill=C_ROOM, outline=C_BORDER, width=3)
# incident overlay
if self.incident is not None:
cx, cy, rad = self.incident
d.ellipse((cx - rad, cy - rad, cx + rad, cy + rad), fill=C_INCIDENT)
# obstacles
for (x0, y0, x1, y1) in self.obstacles:
d.rectangle((x0, y0, x1, y1), fill=C_OBS)
# exits
for ex in self.exits:
d.rectangle(ex.rect(), fill=C_EXIT)
label = "Exit"
tw, th = d.textlength(label), 12
d.text((ex.x + 6, ex.y + ex.h / 2 - 7), label, fill=(255, 255, 255, 255))
# people
for p in self.people:
if p.evacuated:
continue
c = p.color()
d.ellipse((p.x - PERSON_RADIUS, p.y - PERSON_RADIUS,
p.x + PERSON_RADIUS, p.y + PERSON_RADIUS), fill=c, outline=None)
return img
# =============================
# Helpers to build scenarios
# =============================
def make_world(n_people=200, speed=1.2, cohesion=0.3, separation=0.6, panic_spread=2.4):
random.seed(7)
np.random.seed(7)
world = World()
world.speed_scale = float(speed)
world.cohesion = float(cohesion)
world.separation = float(separation)
world.panic_spread = float(panic_spread)
world.personal_space = 22 + 10 * separation
# exits on right wall
ex1 = ExitZone(x=ROOM[2] - 8 - EXIT_W, y=int(ROOM[1] + 70))
ex2 = ExitZone(x=ROOM[2] - 8 - EXIT_W, y=int(ROOM[3] - 70 - EXIT_H))
world.exits = [ex1, ex2]
# obstacles (columns)
world.obstacles = [
(int(ROOM[0] + 220), int(ROOM[1] + 110), int(ROOM[0] + 360), int(ROOM[1] + 150)),
(int(ROOM[0] + 280), int(ROOM[3] - 150), int(ROOM[0] + 420), int(ROOM[3] - 110)),
(int(ROOM[2] - 140), int(ROOM[1] + 160), int(ROOM[2] - 110), int(ROOM[1] + 260)),
]
# crowd initial placement (gaussian blob)
l, t, r, b = ROOM
for _ in range(n_people):
x = np.clip(np.random.normal((l + r) / 2 - 120, 120), l + 20, r - 40)
y = np.clip(np.random.normal((t + b) / 2, 90), t + 20, b - 20)
ang = np.random.rand() * 2 * np.pi
speed0 = np.random.uniform(5, 25)
world.people.append(Person(x=x, y=y, vx=np.cos(ang) * speed0, vy=np.sin(ang) * speed0))
world.reset_metrics()
return world
def trigger_incident(world: World):
# incident: top-right of the room
cx = int(ROOM[2] - 220)
cy = int(ROOM[1] + 110)
rad = 150
world.incident = (cx, cy, rad)
# give initial stress kick to agents inside
for p in world.people:
if p.evacuated:
continue
dx, dy = p.x - cx, p.y - cy
d = math.hypot(dx, dy)
if d < rad * 0.85:
p.stress = max(p.stress, 0.7)
def clear_incident(world: World):
world.incident = None
# =============================
# Gradio bindings
# =============================
def reset_fn(n_people, speed, cohesion, separation, panic_spread):
global RUNNING
RUNNING = False
world = make_world(int(n_people), float(speed), float(cohesion), float(separation), float(panic_spread))
frame = world.render()
metrics = {
"% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}",
"Avg. Stress": f"{world.avg_stress:.2f}",
"Collisions": f"{world.collisions_proxy}"
}
return frame, metrics, world
def start_incident_fn(world):
if world is None:
world = make_world()
trigger_incident(world)
return world.render(), {
"% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}",
"Avg. Stress": f"{world.avg_stress:.2f}",
"Collisions": f"{world.collisions_proxy}"
}, world
def clear_incident_fn(world):
if world is None:
world = make_world()
clear_incident(world)
return world.render(), {
"% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}",
"Avg. Stress": f"{world.avg_stress:.2f}",
"Collisions": f"{world.collisions_proxy}"
}, world
def start_fn(world, n_steps=999999):
"""Generator: runs until paused."""
global RUNNING
RUNNING = True
if world is None:
world = make_world()
# stream frames
steps = 0
while RUNNING and steps < n_steps:
world.step(DT)
frame = world.render()
metrics = {
"% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}",
"Avg. Stress": f"{world.avg_stress:.2f}",
"Collisions": f"{world.collisions_proxy}"
}
yield frame, metrics, world
steps += 1
time.sleep(DT) # throttle to ~20 FPS
def pause_fn():
global RUNNING
RUNNING = False
return gr.update(), gr.update(), gr.update()
with gr.Blocks(title="Behavioral ML in Virtual Crowds") as demo:
gr.Markdown(
"# Behavioral ML in Virtual Crowds · Evacuatie-simulatie\n"
"Mensen (stippen) reageren op een incident, ontwijken obstakels en zoeken de **Exit**. "
"Gebruik de knoppen en sliders hieronder."
)
with gr.Row():
canvas = gr.Image(label="Simulatie", interactive=False, type="pil")
metrics = gr.Label(label="Live metrics")
with gr.Row():
with gr.Column(scale=2):
n_people = gr.Slider(20, 600, value=200, step=10, label="Aantal mensen")
speed = gr.Slider(0.5, 2.0, value=1.2, step=0.05, label="Snelheid (schaal)")
cohesion = gr.Slider(0.0, 1.0, value=0.3, step=0.05, label="Samenhang")
separation = gr.Slider(0.0, 1.0, value=0.6, step=0.05, label="Persoonlijke ruimte (schaal)")
panic_spread = gr.Slider(0.0, 4.0, value=2.4, step=0.1, label="Paniekverspreiding")
with gr.Column(scale=1):
start_btn = gr.Button("▶️ Start")
pause_btn = gr.Button("⏸️ Pauze")
reset_btn = gr.Button("🔁 Reset")
inc_btn = gr.Button("🚨 Start Incident")
clear_inc_btn = gr.Button("🧯 Clear Incident")
state = gr.State() # holds World
# Bindings
reset_btn.click(
reset_fn,
inputs=[n_people, speed, cohesion, separation, panic_spread],
outputs=[canvas, metrics, state],
api_name="reset",
)
# initialize once at app load
init_frame, init_metrics, init_world = reset_fn(200, 1.2, 0.3, 0.6, 2.4)
canvas.value = init_frame
metrics.value = init_metrics
state.value = init_world
start_btn.click(
start_fn,
inputs=[state],
outputs=[canvas, metrics, state],
show_progress=False, # smoother streaming
api_name="start",
)
pause_btn.click(pause_fn, outputs=[canvas, metrics, state], api_name="pause")
inc_btn.click(start_incident_fn, inputs=[state], outputs=[canvas, metrics, state], api_name="incident")
clear_inc_btn.click(clear_incident_fn, inputs=[state], outputs=[canvas, metrics, state], api_name="clear_incident")
if __name__ == "__main__":
demo.launch()