""" engine/sim/fluid.py =================== Hydrodynamika — Level 1 (Cellular Fluid) + Level 2 (Pressure Fluid). Blueprint §4.4: "Chybějící hlavní modul." Blueprint §15: Detailní návrh první verze — implementujeme zde přesně. Level 1: voda se přesouvá mezi buňkami, gravitace, hladina, jednoduché šíření, zaplavování, odtok, tlak jako aproximace. Level 2: tlakové pole, tok podle gradientu tlaku, objemová konzervace. Algoritmus (blueprint §15): 1. aplikuj gravitaci 2. spočítej možné přesuny dolů 3. rozlij přebytek do stran 4. vyrovnej hladinu v sousedních buňkách 5. aplikuj tlak pro uzavřené prostory 6. aktualizuj interakce s materiály 7. emituj události Události (blueprint §15): FLUID_CELL_FILLED, FLUID_CELL_DRAINED, FLUID_PRESSURE_HIGH, FLUID_CONTACT_MATERIAL, FLUID_EXTINGUISH_FIRE, FLUID_DAMAGE_STRUCTURE, FLUID_ACTIVATE_SENSOR """ from __future__ import annotations from dataclasses import dataclass from typing import List, Optional, Tuple import numpy as np # ── Event types pro fluid (blueprint §15) ──────────────────────────────────── class FluidEventType: CELL_FILLED = "fluid_cell_filled" CELL_DRAINED = "fluid_cell_drained" PRESSURE_HIGH = "fluid_pressure_high" CONTACT_MATERIAL = "fluid_contact_material" EXTINGUISH_FIRE = "fluid_extinguish_fire" DAMAGE_STRUCTURE = "fluid_damage_structure" ACTIVATE_SENSOR = "fluid_activate_sensor" # ── FluidGrid — datová vrstva ────────────────────────────────────────────────── class FluidGrid: """ Voxelový fluid field (Level 1 + Level 2). Každá buňka má: - volume : množství kapaliny (0.0 – 1.0, kde 1.0 = plná buňka) - pressure : tlakové pole (pro Level 2) - settled : buňka v klidu (optimalizace — přeskočit settled buňky) Integruje se přes World: materials grid určuje kde je pevná hmota, fluid_vol grid nese množství kapaliny. """ MAX_VOLUME = 1.0 # maximální objem buňky MIN_FLOW = 0.005 # minimální tok (pod tímto ignoruj) MAX_FLOW = 0.95 # maximální tok v jednom kroku FLOW_SPEED = 0.5 # rychlost toku (0-1) def __init__(self, sx: int, sy: int, sz: int): self.sx = sx self.sy = sy self.sz = sz self.volume = np.zeros((sx, sy, sz), dtype=np.float32) self.pressure = np.zeros((sx, sy, sz), dtype=np.float32) self._new_vol = np.zeros((sx, sy, sz), dtype=np.float32) self.solid = np.zeros((sx, sy, sz), dtype=bool) # true = pevná hmota self.ignited = np.zeros((sx, sy, sz), dtype=bool) # hoří zde? self._tick = 0 def set_solid(self, x: int, y: int, z: int, is_solid: bool) -> None: self.solid[x, y, z] = is_solid if is_solid: self.volume[x, y, z] = 0.0 def add_fluid(self, x: int, y: int, z: int, amount: float) -> float: """Přidej kapalinu. Vrátí přebytek.""" if self.solid[x, y, z]: return amount space = self.MAX_VOLUME - self.volume[x, y, z] added = min(space, amount) self.volume[x, y, z] += added return amount - added def drain_fluid(self, x: int, y: int, z: int, amount: float) -> float: """Odeber kapalinu. Vrátí odebrané množství.""" drained = min(self.volume[x, y, z], amount) self.volume[x, y, z] -= drained return drained def total_volume(self) -> float: return float(self.volume.sum()) # ── Hlavní simulační krok ───────────────────────────────────────────────── def step(self, tick: int) -> List[dict]: """ Jeden tick fluidní simulace. Vrátí seznam událostí pro EventBus. """ self._tick = tick events: List[dict] = [] old_vol = self.volume.copy() self._new_vol[:] = self.volume sx, sy, sz = self.sx, self.sy, self.sz # 1. Gravitace — tok dolů (y-osa = výška, y=0 je dno) self._flow_down(events) # 2. Rozlití do stran (x, z) self._flow_horizontal(events) # 3. Pressure flow — vyrovnání hladiny (Level 2 approximace) self._equalize_pressure(events) # 4. Aplikuj nový objem self.volume[:] = self._new_vol # Clamping: zajisti 0 ≤ volume ≤ MAX_VOLUME a ne v solid self.volume = np.clip(self.volume, 0.0, self.MAX_VOLUME) self.volume[self.solid] = 0.0 # 5. Materiálové interakce self._material_interactions(old_vol, events) # 6. Tlakové pole (pro Level 2, aproximace) self._update_pressure() return events def _flow_down(self, events: List[dict]) -> None: """Gravitační tok — kapalina teče dolů (y-1).""" for y in range(1, self.sy): # Buňky kde je kapalina has_fluid = (self._new_vol[:, y, :] > self.MIN_FLOW) & ~self.solid[:, y, :] # Spodní buňka musí být volná below_free = ~self.solid[:, y-1, :] if not np.any(has_fluid & below_free): continue # Kolik může odtéct dolů can_take = self.MAX_VOLUME - self._new_vol[:, y-1, :] can_take = np.maximum(can_take, 0.0) flow = np.minimum(self._new_vol[:, y, :], can_take) flow = np.minimum(flow, self.MAX_FLOW) * self.FLOW_SPEED flow[~(has_fluid & below_free)] = 0.0 self._new_vol[:, y, :] -= flow self._new_vol[:, y-1, :] += flow def _flow_horizontal(self, events: List[dict]) -> None: """Horizontální tok — kapalina se šíří do stran.""" DIRS = [(1, 0), (-1, 0), (0, 1), (0, -1)] # (dx, dz) for dx, dz in DIRS: x_src = slice(max(0, -dx), self.sx - max(0, dx)) z_src = slice(max(0, -dz), self.sz - max(0, dz)) x_dst = slice(max(0, dx), self.sx - max(0, -dx)) z_dst = slice(max(0, dz), self.sz - max(0, -dz)) src_vol = self._new_vol[x_src, :, z_src] dst_vol = self._new_vol[x_dst, :, z_dst] src_solid = self.solid[x_src, :, z_src] dst_solid = self.solid[x_dst, :, z_dst] # Tok jen pokud src > dst a dst není solid diff = (src_vol - dst_vol) * 0.25 flow = np.maximum(diff, 0.0) * self.FLOW_SPEED * 0.5 flow[src_solid | dst_solid] = 0.0 flow = np.minimum(flow, self.MAX_FLOW * 0.25) self._new_vol[x_src, :, z_src] -= flow self._new_vol[x_dst, :, z_dst] += flow def _equalize_pressure(self, events: List[dict]) -> None: """ Level 2: Pressure equalization — tok podle gradientu tlaku. Umožňuje kapalině stoupat pod tlakem (communicating vessels). """ # Spočítej "hydrostatic pressure" = součet kapaliny ve sloupci nad buňkou for y in range(self.sy - 2, -1, -1): above = self._new_vol[:, y+1, :] here = self._new_vol[:, y, :] # Pokud je buňka plná a nad ní je kapalina → tlak full = here >= self.MAX_VOLUME - self.MIN_FLOW pressure = above * 0.15 # tlakový koeficient can_push = ~self.solid[:, y, :] & full flow = np.where(can_push, pressure, 0.0) flow = np.minimum(flow, self._new_vol[:, y+1, :]) self._new_vol[:, y, :] = np.minimum(self._new_vol[:, y, :] + flow * 0.1, self.MAX_VOLUME + 0.15) # slight overpressure # Emituj high pressure event (sampling) if np.any(self._new_vol[:, y, :] > self.MAX_VOLUME): events.append({ "type": FluidEventType.PRESSURE_HIGH, "y": y, "max_pressure": float(self._new_vol[:, y, :].max()), }) break def _material_interactions(self, old_vol: np.ndarray, events: List[dict]) -> None: """ Blueprint §15: interakce kapaliny s materiály světa. - FLUID_EXTINGUISH_FIRE: voda + oheň = zhasnutí - FLUID_DAMAGE_STRUCTURE: voda (nebo lava) oslabí materiál """ # Nově zaplavené buňky newly_filled = (self.volume > 0.3) & (old_vol <= 0.05) newly_drained = (self.volume < 0.05) & (old_vol > 0.3) if np.any(newly_filled): coords = list(zip(*np.where(newly_filled))) for (x, y, z) in coords[:5]: # limituj počet eventů events.append({ "type": FluidEventType.CELL_FILLED, "pos": (int(x), int(y), int(z)), "volume": float(self.volume[x, y, z]), }) # Zhasnutí ohně vodou if np.any(newly_filled & self.ignited): events.append({"type": FluidEventType.EXTINGUISH_FIRE}) if np.any(newly_drained): events.append({"type": FluidEventType.CELL_DRAINED}) def _update_pressure(self) -> None: """Aktualizuj pressure field z aktuálního volume.""" # Jednoduchá aproximace: pressure = volume + kolik je nad tím self.pressure[:] = self.volume for y in range(self.sy - 2, -1, -1): self.pressure[:, y, :] += self.pressure[:, y+1, :] * 0.3 np.clip(self.pressure, 0, None, out=self.pressure) # ── Debug ───────────────────────────────────────────────────────────────── def cross_section_y(self, y: int) -> str: """ASCII vizualizace horizontálního řezu na výšce y.""" chars = " .░▒▓█" lines = [] for z in range(self.sz): row = "" for x in range(self.sx): if self.solid[x, y, z]: row += "█" else: v = self.volume[x, y, z] idx = min(int(v * (len(chars) - 1)), len(chars) - 1) row += chars[idx] lines.append(row) return "\n".join(lines) def column_y(self, x: int, z: int) -> str: """ASCII vizualizace vertikálního sloupce.""" chars = " ░▒▓█" parts = [] for y in range(self.sy - 1, -1, -1): if self.solid[x, y, z]: parts.append("█") else: v = self.volume[x, y, z] idx = min(int(v * (len(chars) - 1)), len(chars) - 1) parts.append(chars[idx]) return "".join(parts) # ── FluidSimulation — modul pro Scheduler ──────────────────────────────────── class FluidSimulation: """ Obal FluidGrid pro použití jako SimModule v Scheduleru. Použití: fluid_sim = FluidSimulation(32, 16, 32, bus=event_bus) scheduler.register(SimModule("fluid", fluid_sim.step, every_n_ticks=1, priority=2)) """ def __init__(self, sx: int, sy: int, sz: int, bus=None): self.grid = FluidGrid(sx, sy, sz) self.bus = bus self._total_events = 0 def step(self, tick: int) -> None: events = self.grid.step(tick) self._total_events += len(events) if self.bus and events: for evt_data in events: try: from engine.core.events import Event self.bus.publish(Event( type=evt_data["type"], tick=tick, payload=evt_data, )) except Exception: pass # EventBus nemusí být dostupný def add_source(self, x: int, y: int, z: int, rate: float = 0.2) -> None: """Přidej zdroj kapaliny (pramen).""" self.grid.add_fluid(x, y, z, rate) def set_solid_from_materials(self, materials_array: np.ndarray, solid_ids: set) -> None: """ Načti solid mapu z material gridu světa. solid_ids = sada material_id které jsou pevné (neprostupné pro vodu). """ for mat_id in solid_ids: self.grid.solid |= (materials_array == mat_id) # Vzduch (0) a voda (10) nejsou solid self.grid.solid[materials_array == 0] = False @property def total_volume(self) -> float: return self.grid.total_volume() @property def stats(self) -> dict: v = self.grid.volume return { "total_volume": round(float(v.sum()), 4), "max_volume": round(float(v.max()), 4), "wet_cells": int(np.sum(v > 0.01)), "total_events": self._total_events, }