Spaces:
Running
Running
| """ | |
| engine/sim/fluid.py | |
| =================== | |
| Hydrodynamika — Level 1 (Cellular Fluid) + Level 2 (Pressure Fluid). | |
| Blueprint §4.4: "Chybějící hlavní modul." | |
| Blueprint §15: Detailní návrh první verze — implementujeme zde přesně. | |
| Level 1: voda se přesouvá mezi buňkami, gravitace, hladina, | |
| jednoduché šíření, zaplavování, odtok, tlak jako aproximace. | |
| Level 2: tlakové pole, tok podle gradientu tlaku, objemová konzervace. | |
| Algoritmus (blueprint §15): | |
| 1. aplikuj gravitaci | |
| 2. spočítej možné přesuny dolů | |
| 3. rozlij přebytek do stran | |
| 4. vyrovnej hladinu v sousedních buňkách | |
| 5. aplikuj tlak pro uzavřené prostory | |
| 6. aktualizuj interakce s materiály | |
| 7. emituj události | |
| Události (blueprint §15): | |
| FLUID_CELL_FILLED, FLUID_CELL_DRAINED, FLUID_PRESSURE_HIGH, | |
| FLUID_CONTACT_MATERIAL, FLUID_EXTINGUISH_FIRE, FLUID_DAMAGE_STRUCTURE, | |
| FLUID_ACTIVATE_SENSOR | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from typing import List, Optional, Tuple | |
| import numpy as np | |
| # ── Event types pro fluid (blueprint §15) ──────────────────────────────────── | |
| class FluidEventType: | |
| CELL_FILLED = "fluid_cell_filled" | |
| CELL_DRAINED = "fluid_cell_drained" | |
| PRESSURE_HIGH = "fluid_pressure_high" | |
| CONTACT_MATERIAL = "fluid_contact_material" | |
| EXTINGUISH_FIRE = "fluid_extinguish_fire" | |
| DAMAGE_STRUCTURE = "fluid_damage_structure" | |
| ACTIVATE_SENSOR = "fluid_activate_sensor" | |
| # ── FluidGrid — datová vrstva ────────────────────────────────────────────────── | |
| class FluidGrid: | |
| """ | |
| Voxelový fluid field (Level 1 + Level 2). | |
| Každá buňka má: | |
| - volume : množství kapaliny (0.0 – 1.0, kde 1.0 = plná buňka) | |
| - pressure : tlakové pole (pro Level 2) | |
| - settled : buňka v klidu (optimalizace — přeskočit settled buňky) | |
| Integruje se přes World: materials grid určuje kde je pevná hmota, | |
| fluid_vol grid nese množství kapaliny. | |
| """ | |
| MAX_VOLUME = 1.0 # maximální objem buňky | |
| MIN_FLOW = 0.005 # minimální tok (pod tímto ignoruj) | |
| MAX_FLOW = 0.95 # maximální tok v jednom kroku | |
| FLOW_SPEED = 0.5 # rychlost toku (0-1) | |
| def __init__(self, sx: int, sy: int, sz: int): | |
| self.sx = sx | |
| self.sy = sy | |
| self.sz = sz | |
| self.volume = np.zeros((sx, sy, sz), dtype=np.float32) | |
| self.pressure = np.zeros((sx, sy, sz), dtype=np.float32) | |
| self._new_vol = np.zeros((sx, sy, sz), dtype=np.float32) | |
| self.solid = np.zeros((sx, sy, sz), dtype=bool) # true = pevná hmota | |
| self.ignited = np.zeros((sx, sy, sz), dtype=bool) # hoří zde? | |
| self._tick = 0 | |
| def set_solid(self, x: int, y: int, z: int, is_solid: bool) -> None: | |
| self.solid[x, y, z] = is_solid | |
| if is_solid: | |
| self.volume[x, y, z] = 0.0 | |
| def add_fluid(self, x: int, y: int, z: int, amount: float) -> float: | |
| """Přidej kapalinu. Vrátí přebytek.""" | |
| if self.solid[x, y, z]: | |
| return amount | |
| space = self.MAX_VOLUME - self.volume[x, y, z] | |
| added = min(space, amount) | |
| self.volume[x, y, z] += added | |
| return amount - added | |
| def drain_fluid(self, x: int, y: int, z: int, amount: float) -> float: | |
| """Odeber kapalinu. Vrátí odebrané množství.""" | |
| drained = min(self.volume[x, y, z], amount) | |
| self.volume[x, y, z] -= drained | |
| return drained | |
| def total_volume(self) -> float: | |
| return float(self.volume.sum()) | |
| # ── Hlavní simulační krok ───────────────────────────────────────────────── | |
| def step(self, tick: int) -> List[dict]: | |
| """ | |
| Jeden tick fluidní simulace. | |
| Vrátí seznam událostí pro EventBus. | |
| """ | |
| self._tick = tick | |
| events: List[dict] = [] | |
| old_vol = self.volume.copy() | |
| self._new_vol[:] = self.volume | |
| sx, sy, sz = self.sx, self.sy, self.sz | |
| # 1. Gravitace — tok dolů (y-osa = výška, y=0 je dno) | |
| self._flow_down(events) | |
| # 2. Rozlití do stran (x, z) | |
| self._flow_horizontal(events) | |
| # 3. Pressure flow — vyrovnání hladiny (Level 2 approximace) | |
| self._equalize_pressure(events) | |
| # 4. Aplikuj nový objem | |
| self.volume[:] = self._new_vol | |
| # Clamping: zajisti 0 ≤ volume ≤ MAX_VOLUME a ne v solid | |
| self.volume = np.clip(self.volume, 0.0, self.MAX_VOLUME) | |
| self.volume[self.solid] = 0.0 | |
| # 5. Materiálové interakce | |
| self._material_interactions(old_vol, events) | |
| # 6. Tlakové pole (pro Level 2, aproximace) | |
| self._update_pressure() | |
| return events | |
| def _flow_down(self, events: List[dict]) -> None: | |
| """Gravitační tok — kapalina teče dolů (y-1).""" | |
| for y in range(1, self.sy): | |
| # Buňky kde je kapalina | |
| has_fluid = (self._new_vol[:, y, :] > self.MIN_FLOW) & ~self.solid[:, y, :] | |
| # Spodní buňka musí být volná | |
| below_free = ~self.solid[:, y-1, :] | |
| if not np.any(has_fluid & below_free): | |
| continue | |
| # Kolik může odtéct dolů | |
| can_take = self.MAX_VOLUME - self._new_vol[:, y-1, :] | |
| can_take = np.maximum(can_take, 0.0) | |
| flow = np.minimum(self._new_vol[:, y, :], can_take) | |
| flow = np.minimum(flow, self.MAX_FLOW) * self.FLOW_SPEED | |
| flow[~(has_fluid & below_free)] = 0.0 | |
| self._new_vol[:, y, :] -= flow | |
| self._new_vol[:, y-1, :] += flow | |
| def _flow_horizontal(self, events: List[dict]) -> None: | |
| """Horizontální tok — kapalina se šíří do stran.""" | |
| DIRS = [(1, 0), (-1, 0), (0, 1), (0, -1)] # (dx, dz) | |
| for dx, dz in DIRS: | |
| x_src = slice(max(0, -dx), self.sx - max(0, dx)) | |
| z_src = slice(max(0, -dz), self.sz - max(0, dz)) | |
| x_dst = slice(max(0, dx), self.sx - max(0, -dx)) | |
| z_dst = slice(max(0, dz), self.sz - max(0, -dz)) | |
| src_vol = self._new_vol[x_src, :, z_src] | |
| dst_vol = self._new_vol[x_dst, :, z_dst] | |
| src_solid = self.solid[x_src, :, z_src] | |
| dst_solid = self.solid[x_dst, :, z_dst] | |
| # Tok jen pokud src > dst a dst není solid | |
| diff = (src_vol - dst_vol) * 0.25 | |
| flow = np.maximum(diff, 0.0) * self.FLOW_SPEED * 0.5 | |
| flow[src_solid | dst_solid] = 0.0 | |
| flow = np.minimum(flow, self.MAX_FLOW * 0.25) | |
| self._new_vol[x_src, :, z_src] -= flow | |
| self._new_vol[x_dst, :, z_dst] += flow | |
| def _equalize_pressure(self, events: List[dict]) -> None: | |
| """ | |
| Level 2: Pressure equalization — tok podle gradientu tlaku. | |
| Umožňuje kapalině stoupat pod tlakem (communicating vessels). | |
| """ | |
| # Spočítej "hydrostatic pressure" = součet kapaliny ve sloupci nad buňkou | |
| for y in range(self.sy - 2, -1, -1): | |
| above = self._new_vol[:, y+1, :] | |
| here = self._new_vol[:, y, :] | |
| # Pokud je buňka plná a nad ní je kapalina → tlak | |
| full = here >= self.MAX_VOLUME - self.MIN_FLOW | |
| pressure = above * 0.15 # tlakový koeficient | |
| can_push = ~self.solid[:, y, :] & full | |
| flow = np.where(can_push, pressure, 0.0) | |
| flow = np.minimum(flow, self._new_vol[:, y+1, :]) | |
| self._new_vol[:, y, :] = np.minimum(self._new_vol[:, y, :] + flow * 0.1, | |
| self.MAX_VOLUME + 0.15) # slight overpressure | |
| # Emituj high pressure event (sampling) | |
| if np.any(self._new_vol[:, y, :] > self.MAX_VOLUME): | |
| events.append({ | |
| "type": FluidEventType.PRESSURE_HIGH, | |
| "y": y, | |
| "max_pressure": float(self._new_vol[:, y, :].max()), | |
| }) | |
| break | |
| def _material_interactions(self, old_vol: np.ndarray, events: List[dict]) -> None: | |
| """ | |
| Blueprint §15: interakce kapaliny s materiály světa. | |
| - FLUID_EXTINGUISH_FIRE: voda + oheň = zhasnutí | |
| - FLUID_DAMAGE_STRUCTURE: voda (nebo lava) oslabí materiál | |
| """ | |
| # Nově zaplavené buňky | |
| newly_filled = (self.volume > 0.3) & (old_vol <= 0.05) | |
| newly_drained = (self.volume < 0.05) & (old_vol > 0.3) | |
| if np.any(newly_filled): | |
| coords = list(zip(*np.where(newly_filled))) | |
| for (x, y, z) in coords[:5]: # limituj počet eventů | |
| events.append({ | |
| "type": FluidEventType.CELL_FILLED, | |
| "pos": (int(x), int(y), int(z)), | |
| "volume": float(self.volume[x, y, z]), | |
| }) | |
| # Zhasnutí ohně vodou | |
| if np.any(newly_filled & self.ignited): | |
| events.append({"type": FluidEventType.EXTINGUISH_FIRE}) | |
| if np.any(newly_drained): | |
| events.append({"type": FluidEventType.CELL_DRAINED}) | |
| def _update_pressure(self) -> None: | |
| """Aktualizuj pressure field z aktuálního volume.""" | |
| # Jednoduchá aproximace: pressure = volume + kolik je nad tím | |
| self.pressure[:] = self.volume | |
| for y in range(self.sy - 2, -1, -1): | |
| self.pressure[:, y, :] += self.pressure[:, y+1, :] * 0.3 | |
| np.clip(self.pressure, 0, None, out=self.pressure) | |
| # ── Debug ───────────────────────────────────────────────────────────────── | |
| def cross_section_y(self, y: int) -> str: | |
| """ASCII vizualizace horizontálního řezu na výšce y.""" | |
| chars = " .░▒▓█" | |
| lines = [] | |
| for z in range(self.sz): | |
| row = "" | |
| for x in range(self.sx): | |
| if self.solid[x, y, z]: | |
| row += "█" | |
| else: | |
| v = self.volume[x, y, z] | |
| idx = min(int(v * (len(chars) - 1)), len(chars) - 1) | |
| row += chars[idx] | |
| lines.append(row) | |
| return "\n".join(lines) | |
| def column_y(self, x: int, z: int) -> str: | |
| """ASCII vizualizace vertikálního sloupce.""" | |
| chars = " ░▒▓█" | |
| parts = [] | |
| for y in range(self.sy - 1, -1, -1): | |
| if self.solid[x, y, z]: | |
| parts.append("█") | |
| else: | |
| v = self.volume[x, y, z] | |
| idx = min(int(v * (len(chars) - 1)), len(chars) - 1) | |
| parts.append(chars[idx]) | |
| return "".join(parts) | |
| # ── FluidSimulation — modul pro Scheduler ──────────────────────────────────── | |
| class FluidSimulation: | |
| """ | |
| Obal FluidGrid pro použití jako SimModule v Scheduleru. | |
| Použití: | |
| fluid_sim = FluidSimulation(32, 16, 32, bus=event_bus) | |
| scheduler.register(SimModule("fluid", fluid_sim.step, every_n_ticks=1, priority=2)) | |
| """ | |
| def __init__(self, sx: int, sy: int, sz: int, bus=None): | |
| self.grid = FluidGrid(sx, sy, sz) | |
| self.bus = bus | |
| self._total_events = 0 | |
| def step(self, tick: int) -> None: | |
| events = self.grid.step(tick) | |
| self._total_events += len(events) | |
| if self.bus and events: | |
| for evt_data in events: | |
| try: | |
| from engine.core.events import Event | |
| self.bus.publish(Event( | |
| type=evt_data["type"], | |
| tick=tick, | |
| payload=evt_data, | |
| )) | |
| except Exception: | |
| pass # EventBus nemusí být dostupný | |
| def add_source(self, x: int, y: int, z: int, rate: float = 0.2) -> None: | |
| """Přidej zdroj kapaliny (pramen).""" | |
| self.grid.add_fluid(x, y, z, rate) | |
| def set_solid_from_materials(self, materials_array: np.ndarray, | |
| solid_ids: set) -> None: | |
| """ | |
| Načti solid mapu z material gridu světa. | |
| solid_ids = sada material_id které jsou pevné (neprostupné pro vodu). | |
| """ | |
| for mat_id in solid_ids: | |
| self.grid.solid |= (materials_array == mat_id) | |
| # Vzduch (0) a voda (10) nejsou solid | |
| self.grid.solid[materials_array == 0] = False | |
| def total_volume(self) -> float: | |
| return self.grid.total_volume() | |
| def stats(self) -> dict: | |
| v = self.grid.volume | |
| return { | |
| "total_volume": round(float(v.sum()), 4), | |
| "max_volume": round(float(v.max()), 4), | |
| "wet_cells": int(np.sum(v > 0.01)), | |
| "total_events": self._total_events, | |
| } | |