Spaces:
Runtime error
Runtime error
| import math | |
| import random | |
| import time | |
| from dataclasses import dataclass, field | |
| from typing import List, Optional, Tuple | |
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image, ImageDraw | |
| # ============================= | |
| # Config | |
| # ============================= | |
| W, H = 900, 540 # canvas size (px) | |
| ROOM = (60, 40, W - 60, H - 40) # inner room rect (left, top, right, bottom) | |
| DT = 0.05 # seconds per simulation step (~20 FPS) | |
| MAX_SPEED_BASE = 65.0 # px/s baseline | |
| PERSON_RADIUS = 6 # draw radius in px | |
| NEIGHBOR_RANGE = 55.0 # px (perception) | |
| EXIT_W, EXIT_H = 28, 68 | |
| # Colors | |
| C_BG = (245, 246, 248, 255) | |
| C_ROOM = (250, 250, 250, 255) | |
| C_BORDER = (28, 28, 28, 255) | |
| C_OBS = (175, 178, 185, 255) | |
| C_EXIT = (52, 132, 255, 255) | |
| C_CALM = (44, 184, 78, 255) | |
| C_ALERT = (250, 191, 35, 255) | |
| C_PANIC = (233, 68, 68, 255) | |
| C_INCIDENT = (230, 60, 60, 96) | |
| # Global running flag (simple & reliable for Spaces) | |
| RUNNING = False | |
| class Person: | |
| x: float | |
| y: float | |
| vx: float | |
| vy: float | |
| stress: float = 0.0 # 0..1 | |
| state: int = 0 # 0 calm, 1 alert, 2 panic | |
| evacuated: bool = False | |
| def color(self): | |
| return [C_CALM, C_ALERT, C_PANIC][self.state] | |
| class ExitZone: | |
| x: int | |
| y: int | |
| w: int = EXIT_W | |
| h: int = EXIT_H | |
| def rect(self): | |
| return (self.x, self.y, self.x + self.w, self.y + self.h) | |
| class World: | |
| width: int = W | |
| height: int = H | |
| room: Tuple[int, int, int, int] = ROOM | |
| people: List[Person] = field(default_factory=list) | |
| exits: List[ExitZone] = field(default_factory=list) | |
| obstacles: List[Tuple[int, int, int, int]] = field(default_factory=list) | |
| incident: Optional[Tuple[int, int, int]] = None # (cx, cy, radius) | |
| # Params (controlled by UI) | |
| speed_scale: float = 1.0 | |
| cohesion: float = 0.3 | |
| separation: float = 0.6 | |
| panic_spread: float = 2.0 | |
| personal_space: float = 22.0 | |
| # cached metrics | |
| evac_count: int = 0 | |
| avg_stress: float = 0.0 | |
| collisions_proxy: int = 0 | |
| def reset_metrics(self): | |
| self.evac_count = sum(1 for p in self.people if p.evacuated) | |
| if self.people: | |
| self.avg_stress = float(np.mean([p.stress for p in self.people if not p.evacuated])) if any( | |
| not p.evacuated for p in self.people) else 0.0 | |
| else: | |
| self.avg_stress = 0.0 | |
| # simple collisions proxy: count of pairs closer than 0.8 * personal_space (approx) | |
| close = 0 | |
| active = [p for p in self.people if not p.evacuated] | |
| n = len(active) | |
| for i in range(n): | |
| pi = active[i] | |
| for j in range(i + 1, n): | |
| pj = active[j] | |
| dx = pi.x - pj.x | |
| dy = pi.y - pj.y | |
| if dx * dx + dy * dy < (0.8 * self.personal_space) ** 2: | |
| close += 1 | |
| self.collisions_proxy = close | |
| def step(self, dt: float): | |
| l, t, r, b = self.room | |
| def nearest_exit_vec(px, py): | |
| best = None | |
| best_d2 = 1e12 | |
| for ex in self.exits: | |
| cx = ex.x + ex.w / 2 | |
| cy = ex.y + ex.h / 2 | |
| dx, dy = (cx - px), (cy - py) | |
| d2 = dx * dx + dy * dy | |
| if d2 < best_d2: | |
| best_d2 = d2 | |
| best = (dx, dy) | |
| return best if best is not None else (0.0, 0.0) | |
| # update states (stress diffusion + incident influence) | |
| for p in self.people: | |
| if p.evacuated: | |
| continue | |
| # incident influence | |
| if self.incident is not None: | |
| cx, cy, rad = self.incident | |
| dx = p.x - cx | |
| dy = p.y - cy | |
| d = math.hypot(dx, dy) | |
| if d < rad: | |
| # stronger effect closer to center | |
| p.stress += (1 - d / rad) * self.panic_spread * 0.18 * dt | |
| # neighbor panic influence (cheap/approx) | |
| # random subset to keep O(n) | |
| for _ in range(6): | |
| q = random.choice(self.people) | |
| if q.evacuated: | |
| continue | |
| if q is p: | |
| continue | |
| dx = q.x - p.x | |
| dy = q.y - p.y | |
| d2 = dx * dx + dy * dy | |
| if d2 < (NEIGHBOR_RANGE ** 2) and q.state == 2: | |
| p.stress += self.panic_spread * 0.04 * dt | |
| # clamp and assign state | |
| p.stress = max(0.0, min(1.0, p.stress)) | |
| if p.stress > 0.65: | |
| p.state = 2 | |
| elif p.stress > 0.35: | |
| p.state = 1 | |
| else: | |
| p.state = 0 | |
| # forces & motion | |
| for i, p in enumerate(self.people): | |
| if p.evacuated: | |
| continue | |
| # skip if already in exit | |
| for ex in self.exits: | |
| x0, y0, x1, y1 = ex.rect() | |
| if x0 <= p.x <= x1 and y0 <= p.y <= y1: | |
| p.evacuated = True | |
| continue | |
| if p.evacuated: | |
| continue | |
| # neighborhood (very light) | |
| # sample 20 random neighbors instead of full n^2 | |
| neighbors = random.sample(self.people, k=min(20, len(self.people))) | |
| # separation | |
| fx, fy = 0.0, 0.0 | |
| for q in neighbors: | |
| if q is p or q.evacuated: | |
| continue | |
| dx = p.x - q.x | |
| dy = p.y - q.y | |
| d2 = dx * dx + dy * dy | |
| if d2 == 0: | |
| continue | |
| if d2 < (self.personal_space ** 2): | |
| inv = 1.0 / max(1.0, d2) | |
| fx += dx * inv | |
| fy += dy * inv | |
| # cohesion & alignment | |
| cx, cy, cvx, cvy, cnt = 0.0, 0.0, 0.0, 0.0, 0 | |
| for q in neighbors: | |
| if q is p or q.evacuated: | |
| continue | |
| dx = q.x - p.x | |
| dy = q.y - p.y | |
| if dx * dx + dy * dy < NEIGHBOR_RANGE ** 2: | |
| cx += q.x | |
| cy += q.y | |
| cvx += q.vx | |
| cvy += q.vy | |
| cnt += 1 | |
| if cnt > 0: | |
| cx /= cnt | |
| cy /= cnt | |
| cvx /= cnt | |
| cvy /= cnt | |
| # cohesion | |
| fx += (cx - p.x) * self.cohesion * 0.15 | |
| fy += (cy - p.y) * self.cohesion * 0.15 | |
| # alignment | |
| fx += (cvx - p.vx) * 0.05 | |
| fy += (cvy - p.vy) * 0.05 | |
| # goal (nearest exit) | |
| gx, gy = nearest_exit_vec(p.x, p.y) | |
| # scale stronger when panicking | |
| goal_k = 0.6 if p.state == 0 else (0.8 if p.state == 1 else 1.1) | |
| fx += gx * goal_k / 200.0 | |
| fy += gy * goal_k / 200.0 | |
| # obstacle avoidance (simple push) | |
| for (ox0, oy0, ox1, oy1) in self.obstacles: | |
| # compute penetration vector if inside expanded box | |
| margin = self.personal_space * 0.7 | |
| if (ox0 - margin) < p.x < (ox1 + margin) and (oy0 - margin) < p.y < (oy1 + margin): | |
| # push outwards from nearest edge | |
| dx_left = p.x - ox0 | |
| dx_right = ox1 - p.x | |
| dy_top = p.y - oy0 | |
| dy_bot = oy1 - p.y | |
| m = min(dx_left, dx_right, dy_top, dy_bot) | |
| if m == dx_left: | |
| fx -= 0.8 | |
| elif m == dx_right: | |
| fx += 0.8 | |
| elif m == dy_top: | |
| fy -= 0.8 | |
| else: | |
| fy += 0.8 | |
| # walls (room bounds) | |
| x0, y0, x1, y1 = self.room | |
| wall_margin = 8 | |
| if p.x < x0 + wall_margin: | |
| fx += 1.2 | |
| if p.x > x1 - wall_margin: | |
| fx -= 1.2 | |
| if p.y < y0 + wall_margin: | |
| fy += 1.2 | |
| if p.y > y1 - wall_margin: | |
| fy -= 1.2 | |
| # integrate | |
| p.vx += fx | |
| p.vy += fy | |
| # max speed based on stress | |
| stress_boost = [1.0, 1.2, 1.6][p.state] | |
| vmax = MAX_SPEED_BASE * self.speed_scale * stress_boost | |
| v = math.hypot(p.vx, p.vy) | |
| if v > vmax: | |
| p.vx = p.vx / v * vmax | |
| p.vy = p.vy / v * vmax | |
| p.x += p.vx * dt | |
| p.y += p.vy * dt | |
| # clamp to room | |
| p.x = min(max(p.x, x0 + 2), x1 - 2) | |
| p.y = min(max(p.y, y0 + 2), y1 - 2) | |
| # metrics | |
| self.reset_metrics() | |
| def render(self) -> Image.Image: | |
| img = Image.new("RGBA", (self.width, self.height), C_BG) | |
| d = ImageDraw.Draw(img, "RGBA") | |
| # room | |
| d.rectangle(self.room, fill=C_ROOM, outline=C_BORDER, width=3) | |
| # incident overlay | |
| if self.incident is not None: | |
| cx, cy, rad = self.incident | |
| d.ellipse((cx - rad, cy - rad, cx + rad, cy + rad), fill=C_INCIDENT) | |
| # obstacles | |
| for (x0, y0, x1, y1) in self.obstacles: | |
| d.rectangle((x0, y0, x1, y1), fill=C_OBS) | |
| # exits | |
| for ex in self.exits: | |
| d.rectangle(ex.rect(), fill=C_EXIT) | |
| label = "Exit" | |
| tw, th = d.textlength(label), 12 | |
| d.text((ex.x + 6, ex.y + ex.h / 2 - 7), label, fill=(255, 255, 255, 255)) | |
| # people | |
| for p in self.people: | |
| if p.evacuated: | |
| continue | |
| c = p.color() | |
| d.ellipse((p.x - PERSON_RADIUS, p.y - PERSON_RADIUS, | |
| p.x + PERSON_RADIUS, p.y + PERSON_RADIUS), fill=c, outline=None) | |
| return img | |
| # ============================= | |
| # Helpers to build scenarios | |
| # ============================= | |
| def make_world(n_people=200, speed=1.2, cohesion=0.3, separation=0.6, panic_spread=2.4): | |
| random.seed(7) | |
| np.random.seed(7) | |
| world = World() | |
| world.speed_scale = float(speed) | |
| world.cohesion = float(cohesion) | |
| world.separation = float(separation) | |
| world.panic_spread = float(panic_spread) | |
| world.personal_space = 22 + 10 * separation | |
| # exits on right wall | |
| ex1 = ExitZone(x=ROOM[2] - 8 - EXIT_W, y=int(ROOM[1] + 70)) | |
| ex2 = ExitZone(x=ROOM[2] - 8 - EXIT_W, y=int(ROOM[3] - 70 - EXIT_H)) | |
| world.exits = [ex1, ex2] | |
| # obstacles (columns) | |
| world.obstacles = [ | |
| (int(ROOM[0] + 220), int(ROOM[1] + 110), int(ROOM[0] + 360), int(ROOM[1] + 150)), | |
| (int(ROOM[0] + 280), int(ROOM[3] - 150), int(ROOM[0] + 420), int(ROOM[3] - 110)), | |
| (int(ROOM[2] - 140), int(ROOM[1] + 160), int(ROOM[2] - 110), int(ROOM[1] + 260)), | |
| ] | |
| # crowd initial placement (gaussian blob) | |
| l, t, r, b = ROOM | |
| for _ in range(n_people): | |
| x = np.clip(np.random.normal((l + r) / 2 - 120, 120), l + 20, r - 40) | |
| y = np.clip(np.random.normal((t + b) / 2, 90), t + 20, b - 20) | |
| ang = np.random.rand() * 2 * np.pi | |
| speed0 = np.random.uniform(5, 25) | |
| world.people.append(Person(x=x, y=y, vx=np.cos(ang) * speed0, vy=np.sin(ang) * speed0)) | |
| world.reset_metrics() | |
| return world | |
| def trigger_incident(world: World): | |
| # incident: top-right of the room | |
| cx = int(ROOM[2] - 220) | |
| cy = int(ROOM[1] + 110) | |
| rad = 150 | |
| world.incident = (cx, cy, rad) | |
| # give initial stress kick to agents inside | |
| for p in world.people: | |
| if p.evacuated: | |
| continue | |
| dx, dy = p.x - cx, p.y - cy | |
| d = math.hypot(dx, dy) | |
| if d < rad * 0.85: | |
| p.stress = max(p.stress, 0.7) | |
| def clear_incident(world: World): | |
| world.incident = None | |
| # ============================= | |
| # Gradio bindings | |
| # ============================= | |
| def reset_fn(n_people, speed, cohesion, separation, panic_spread): | |
| global RUNNING | |
| RUNNING = False | |
| world = make_world(int(n_people), float(speed), float(cohesion), float(separation), float(panic_spread)) | |
| frame = world.render() | |
| metrics = { | |
| "% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}", | |
| "Avg. Stress": f"{world.avg_stress:.2f}", | |
| "Collisions": f"{world.collisions_proxy}" | |
| } | |
| return frame, metrics, world | |
| def start_incident_fn(world): | |
| if world is None: | |
| world = make_world() | |
| trigger_incident(world) | |
| return world.render(), { | |
| "% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}", | |
| "Avg. Stress": f"{world.avg_stress:.2f}", | |
| "Collisions": f"{world.collisions_proxy}" | |
| }, world | |
| def clear_incident_fn(world): | |
| if world is None: | |
| world = make_world() | |
| clear_incident(world) | |
| return world.render(), { | |
| "% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}", | |
| "Avg. Stress": f"{world.avg_stress:.2f}", | |
| "Collisions": f"{world.collisions_proxy}" | |
| }, world | |
| def start_fn(world, n_steps=999999): | |
| """Generator: runs until paused.""" | |
| global RUNNING | |
| RUNNING = True | |
| if world is None: | |
| world = make_world() | |
| # stream frames | |
| steps = 0 | |
| while RUNNING and steps < n_steps: | |
| world.step(DT) | |
| frame = world.render() | |
| metrics = { | |
| "% Evacuated": f"{(world.evac_count / max(1, len(world.people)))*100:.0f}", | |
| "Avg. Stress": f"{world.avg_stress:.2f}", | |
| "Collisions": f"{world.collisions_proxy}" | |
| } | |
| yield frame, metrics, world | |
| steps += 1 | |
| time.sleep(DT) # throttle to ~20 FPS | |
| def pause_fn(): | |
| global RUNNING | |
| RUNNING = False | |
| return gr.update(), gr.update(), gr.update() | |
| with gr.Blocks(title="Behavioral ML in Virtual Crowds") as demo: | |
| gr.Markdown( | |
| "# Behavioral ML in Virtual Crowds · Evacuatie-simulatie\n" | |
| "Mensen (stippen) reageren op een incident, ontwijken obstakels en zoeken de **Exit**. " | |
| "Gebruik de knoppen en sliders hieronder." | |
| ) | |
| with gr.Row(): | |
| canvas = gr.Image(label="Simulatie", interactive=False, type="pil") | |
| metrics = gr.Label(label="Live metrics") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| n_people = gr.Slider(20, 600, value=200, step=10, label="Aantal mensen") | |
| speed = gr.Slider(0.5, 2.0, value=1.2, step=0.05, label="Snelheid (schaal)") | |
| cohesion = gr.Slider(0.0, 1.0, value=0.3, step=0.05, label="Samenhang") | |
| separation = gr.Slider(0.0, 1.0, value=0.6, step=0.05, label="Persoonlijke ruimte (schaal)") | |
| panic_spread = gr.Slider(0.0, 4.0, value=2.4, step=0.1, label="Paniekverspreiding") | |
| with gr.Column(scale=1): | |
| start_btn = gr.Button("▶️ Start") | |
| pause_btn = gr.Button("⏸️ Pauze") | |
| reset_btn = gr.Button("🔁 Reset") | |
| inc_btn = gr.Button("🚨 Start Incident") | |
| clear_inc_btn = gr.Button("🧯 Clear Incident") | |
| state = gr.State() # holds World | |
| # Bindings | |
| reset_btn.click( | |
| reset_fn, | |
| inputs=[n_people, speed, cohesion, separation, panic_spread], | |
| outputs=[canvas, metrics, state], | |
| api_name="reset", | |
| ) | |
| # initialize once at app load | |
| init_frame, init_metrics, init_world = reset_fn(200, 1.2, 0.3, 0.6, 2.4) | |
| canvas.value = init_frame | |
| metrics.value = init_metrics | |
| state.value = init_world | |
| start_btn.click( | |
| start_fn, | |
| inputs=[state], | |
| outputs=[canvas, metrics, state], | |
| show_progress=False, # smoother streaming | |
| api_name="start", | |
| ) | |
| pause_btn.click(pause_fn, outputs=[canvas, metrics, state], api_name="pause") | |
| inc_btn.click(start_incident_fn, inputs=[state], outputs=[canvas, metrics, state], api_name="incident") | |
| clear_inc_btn.click(clear_incident_fn, inputs=[state], outputs=[canvas, metrics, state], api_name="clear_incident") | |
| if __name__ == "__main__": | |
| demo.launch() | |