import math import random import time from dataclasses import dataclass, field from typing import List, Optional, Tuple import gradio as gr import numpy as np from PIL import Image, ImageDraw # ============================= # Config # ============================= W, H = 900, 540 # canvas size (px) ROOM = (60, 40, W - 60, H - 40) # inner room rect (left, top, right, bottom) DT = 0.05 # seconds per simulation step (~20 FPS) MAX_SPEED_BASE = 65.0 # px/s baseline PERSON_RADIUS = 6 # draw radius in px NEIGHBOR_RANGE = 55.0 # px (perception) EXIT_W, EXIT_H = 28, 68 # Colors C_BG = (245, 246, 248, 255) C_ROOM = (250, 250, 250, 255) C_BORDER = (28, 28, 28, 255) C_OBS = (175, 178, 185, 255) C_EXIT = (52, 132, 255, 255) C_CALM = (44, 184, 78, 255) C_ALERT = (250, 191, 35, 255) C_PANIC = (233, 68, 68, 255) C_INCIDENT = (230, 60, 60, 96) # Global running flag (simple & reliable for Spaces) RUNNING = False @dataclass class Person: x: float y: float vx: float vy: float stress: float = 0.0 # 0..1 state: int = 0 # 0 calm, 1 alert, 2 panic evacuated: bool = False def color(self): return [C_CALM, C_ALERT, C_PANIC][self.state] @dataclass class ExitZone: x: int y: int w: int = EXIT_W h: int = EXIT_H def rect(self): return (self.x, self.y, self.x + self.w, self.y + self.h) @dataclass class World: width: int = W height: int = H room: Tuple[int, int, int, int] = ROOM people: List[Person] = field(default_factory=list) exits: List[ExitZone] = field(default_factory=list) obstacles: List[Tuple[int, int, int, int]] = field(default_factory=list) incident: Optional[Tuple[int, int, int]] = None # (cx, cy, radius) # Params (controlled by UI) speed_scale: float = 1.0 cohesion: float = 0.3 separation: float = 0.6 panic_spread: float = 2.0 personal_space: float = 22.0 # cached metrics evac_count: int = 0 avg_stress: float = 0.0 collisions_proxy: int = 0 def reset_metrics(self): self.evac_count = sum(1 for p in self.people if p.evacuated) if self.people: self.avg_stress = float(np.mean([p.stress for p in self.people if not p.evacuated])) if any( not p.evacuated for p in self.people) else 0.0 else: self.avg_stress = 0.0 # simple collisions proxy: count of pairs closer than 0.8 * personal_space (approx) close = 0 active = [p for p in self.people if not p.evacuated] n = len(active) for i in range(n): pi = active[i] for j in range(i + 1, n): pj = active[j] dx = pi.x - pj.x dy = pi.y - pj.y if dx * dx + dy * dy < (0.8 * self.personal_space) ** 2: close += 1 self.collisions_proxy = close def step(self, dt: float): l, t, r, b = self.room def nearest_exit_vec(px, py): best = None best_d2 = 1e12 for ex in self.exits: cx = ex.x + ex.w / 2 cy = ex.y + ex.h / 2 dx, dy = (cx - px), (cy - py) d2 = dx * dx + dy * dy if d2 < best_d2: best_d2 = d2 best = (dx, dy) return best if best is not None else (0.0, 0.0) # update states (stress diffusion + incident influence) for p in self.people: if p.evacuated: continue # incident influence if self.incident is not None: cx, cy, rad = self.incident dx = p.x - cx dy = p.y - cy d = math.hypot(dx, dy) if d < rad: # stronger effect closer to center p.stress += (1 - d / rad) * self.panic_spread * 0.18 * dt # neighbor panic influence (cheap/approx) # random subset to keep O(n) for _ in range(6): q = random.choice(self.people) if q.evacuated: continue if q is p: continue dx = q.x - p.x dy = q.y - p.y d2 = dx * dx + dy * dy if d2 < (NEIGHBOR_RANGE ** 2) and q.state == 2: p.stress += self.panic_spread * 0.04 * dt # clamp and assign state p.stress = max(0.0, min(1.0, p.stress)) if p.stress > 0.65: p.state = 2 elif p.stress > 0.35: p.state = 1 else: p.state = 0 # forces & motion for i, p in enumerate(self.people): if p.evacuated: continue # skip if already in exit for ex in self.exits: x0, y0, x1, y1 = ex.rect() if x0 <= p.x <= x1 and y0 <= p.y <= y1: p.evacuated = True continue if p.evacuated: continue # neighborhood (very light) # sample 20 random neighbors instead of full n^2 neighbors = random.sample(self.people, k=min(20, len(self.people))) # separation fx, fy = 0.0, 0.0 for q in neighbors: if q is p or q.evacuated: continue dx = p.x - q.x dy = p.y - q.y d2 = dx * dx + dy * dy if d2 == 0: continue if d2 < (self.personal_space ** 2): inv = 1.0 / max(1.0, d2) fx += dx * inv fy += dy * inv # cohesion & alignment cx, cy, cvx, cvy, cnt = 0.0, 0.0, 0.0, 0.0, 0 for q in neighbors: if q is p or q.evacuated: continue dx = q.x - p.x dy = q.y - p.y if dx * dx + dy * dy < NEIGHBOR_RANGE ** 2: cx += q.x cy += q.y cvx += q.vx cvy += q.vy cnt += 1 if cnt > 0: cx /= cnt cy /= cnt cvx /= cnt cvy /= cnt # cohesion fx += (cx - p.x) * self.cohesion * 0.15 fy += (cy - p.y) * self.cohesion * 0.15 # alignment fx += (cvx - p.vx) * 0.05 fy += (cvy - p.vy) * 0.05 # goal (nearest exit) gx, gy = nearest_exit_vec(p.x, p.y) # scale stronger when panicking goal_k = 0.6 if p.state == 0 else (0.8 if p.state == 1 else 1.1) fx += gx * goal_k / 200.0 fy += gy * goal_k / 200.0 # obstacle avoidance (simple push) for (ox0, oy0, ox1, oy1) in self.obstacles: # compute penetration vector if inside expanded box margin = self.personal_space * 0.7 if (ox0 - margin) < p.x < (ox1 + margin) and (oy0 - margin) < p.y < (oy1 + margin): # push outwards from nearest edge dx_left = p.x - ox0 dx_right = ox1 - p.x dy_top = p.y - oy0 dy_bot = oy1 - p.y m = min(dx_left, dx_right, dy_top, dy_bot) if m == dx_left: fx -= 0.8 elif m == dx_right: fx += 0.8 elif m == dy_top: fy -= 0.8 else: fy += 0.8 # walls (room bounds) x0, y0, x1, y1 = self.room wall_margin = 8 if p.x < x0 + wall_margin: fx += 1.2 if p.x > x1 - wall_margin: fx -= 1.2 if p.y < y0 + wall_margin: fy += 1.2 if p.y > y1 - wall_margin: fy -= 1.2 # integrate p.vx += fx p.vy += fy # max speed based on stress stress_boost = [1.0, 1.2, 1.6][p.state] vmax = MAX_SPEED_BASE * self.speed_scale * stress_boost v = math.hypot(p.vx, p.vy) if v > vmax: p.vx = p.vx / v * vmax p.vy = p.vy / v * vmax p.x += p.vx * dt p.y += p.vy * dt # clamp to room p.x = min(max(p.x, x0 + 2), x1 - 2) p.y = min(max(p.y, y0 + 2), y1 - 2) # metrics self.reset_metrics() def render(self) -> Image.Image: img = Image.new("RGBA", (self.width, self.height), C_BG) d = ImageDraw.Draw(img, "RGBA") # room d.rectangle(self.room, fill=C_ROOM, outline=C_BORDER, width=3) # incident overlay if self.incident is not None: cx, cy, rad = self.incident d.ellipse((cx - rad, cy - rad, cx + rad, cy + rad), fill=C_INCIDENT) # obstacles for (x0, y0, x1, y1) in self.obstacles: d.rectangle((x0, y0, x1, y1), fill=C_OBS) # exits for ex in self.exits: d.rectangle(ex.rect(), fill=C_EXIT) label = "Exit" d.text((ex.x + 6, ex.y + ex.h / 2 - 7), label, fill=(255, 255, 255, 255)) # people for p in self.people: if p.evacuated: continue c = p.color() d.ellipse((p.x - PERSON_RADIUS, p.y - PERSON_RADIUS, p.x + PERSON_RADIUS, p.y + PERSON_RADIUS), fill=c, outline=None) return img # ============================= # Helpers to build scenarios # ============================= def make_world(n_people=200, speed=1.2, cohesion=0.3, separation=0.6, panic_spread=2.4): random.seed(7) np.random.seed(7) world = World() world.speed_scale = float(speed) world.cohesion = float(cohesion) world.separation = float(separation) world.panic_spread = float(panic_spread) world.personal_space = 22 + 10 * separation # exits on right wall ex1 = ExitZone(x=ROOM[2] - 8 - EXIT_W, y=int(ROOM[1] + 70)) ex2 = ExitZone(x=ROOM[2] - 8 - EXIT_W, y=int(ROOM[3] - 70 - EXIT_H)) world.exits = [ex1, ex2] # obstacles (columns) world.obstacles = [ (int(ROOM[0] + 220), int(ROOM[1] + 110), int(ROOM[0] + 360), int(ROOM[1] + 150)), (int(ROOM[0] + 280), int(ROOM[3] - 150), int(ROOM[0] + 420), int(ROOM[3] - 110)), (int(ROOM[2] - 140), int(ROOM[1] + 160), int(ROOM[2] - 110), int(ROOM[1] + 260)), ] # crowd initial placement (gaussian blob) l, t, r, b = ROOM for _ in range(n_people): x = np.clip(np.random.normal((l + r) / 2 - 120, 120), l + 20, r - 40) y = np.clip(np.random.normal((t + b) / 2, 90), t + 20, b - 20) ang = np.random.rand() * 2 * np.pi speed0 = np.random.uniform(5, 25) world.people.append(Person(x=x, y=y, vx=np.cos(ang) * speed0, vy=np.sin(ang) * speed0)) world.reset_metrics() return world def trigger_incident(world: World): # incident: top-right of the room cx = int(ROOM[2] - 220) cy = int(ROOM[1] + 110) rad = 150 world.incident = (cx, cy, rad) # give initial stress kick to agents inside for p in world.people: if p.evacuated: continue dx, dy = p.x - cx, p.y - cy d = math.hypot(dx, dy) if d < rad * 0.85: p.stress = max(p.stress, 0.7) def clear_incident(world: World): world.incident = None # ============================= # Gradio bindings # ============================= def reset_fn(n_people, speed, cohesion, separation, panic_spread): global RUNNING RUNNING = False world = make_world(int(n_people), float(speed), float(cohesion), float(separation), float(panic_spread)) frame = world.render() metrics = { "% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}", "Avg. Stress": f"{world.avg_stress:.2f}", "Collisions": f"{world.collisions_proxy}" } return frame, metrics, world def start_incident_fn(world): if world is None: world = make_world() trigger_incident(world) return world.render(), { "% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}", "Avg. Stress": f"{world.avg_stress:.2f}", "Collisions": f"{world.collisions_proxy}" }, world def clear_incident_fn(world): if world is None: world = make_world() clear_incident(world) return world.render(), { "% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}", "Avg. Stress": f"{world.avg_stress:.2f}", "Collisions": f"{world.collisions_proxy}" }, world def start_fn(world, n_steps=999999): """Generator: runs until paused.""" global RUNNING RUNNING = True if world is None: world = make_world() # stream frames steps = 0 while RUNNING and steps < n_steps: world.step(DT) frame = world.render() metrics = { "% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}", "Avg. Stress": f"{world.avg_stress:.2f}", "Collisions": f"{world.collisions_proxy}" } yield frame, metrics, world steps += 1 time.sleep(DT) # throttle to ~20 FPS def pause_fn(): global RUNNING RUNNING = False return gr.update(), gr.update(), gr.update() with gr.Blocks(title="Behavioral ML in Virtual Crowds") as demo: gr.Markdown( "# Behavioral ML in Virtual Crowds · Evacuatie-simulatie\n" "Mensen (stippen) reageren op een incident, ontwijken obstakels en zoeken de **Exit**. " "Gebruik de knoppen en sliders hieronder." ) with gr.Row(): canvas = gr.Image(label="Simulatie", interactive=False, type="pil") metrics = gr.Label(label="Live metrics") with gr.Row(): with gr.Column(scale=2): n_people = gr.Slider(20, 600, value=200, step=10, label="Aantal mensen") speed = gr.Slider(0.5, 2.0, value=1.2, step=0.05, label="Snelheid (schaal)") cohesion = gr.Slider(0.0, 1.0, value=0.3, step=0.05, label="Samenhang") separation = gr.Slider(0.0, 1.0, value=0.6, step=0.05, label="Persoonlijke ruimte (schaal)") panic_spread = gr.Slider(0.0, 4.0, value=2.4, step=0.1, label="Paniekverspreiding") with gr.Column(scale=1): start_btn = gr.Button("▶️ Start") pause_btn = gr.Button("⏸️ Pauze") reset_btn = gr.Button("🔁 Reset") inc_btn = gr.Button("🚨 Start Incident") clear_inc_btn = gr.Button("🧯 Clear Incident") state = gr.State() # holds World # Bindings reset_btn.click( reset_fn, inputs=[n_people, speed, cohesion, separation, panic_spread], outputs=[canvas, metrics, state], api_name="reset", ) # initialize once at app load init_frame, init_metrics, init_world = reset_fn(200, 1.2, 0.3, 0.6, 2.4) canvas.value = init_frame metrics.value = init_metrics state.value = init_world start_btn.click( start_fn, inputs=[state], outputs=[canvas, metrics, state], show_progress=False, # smoother streaming api_name="start", ) pause_btn.click(pause_fn, outputs=[canvas, metrics, state], api_name="pause") inc_btn.click(start_incident_fn, inputs=[state], outputs=[canvas, metrics, state], api_name="incident") clear_inc_btn.click(clear_incident_fn, inputs=[state], outputs=[canvas, metrics, state], api_name="clear_incident") if __name__ == "__main__": demo.launch()