voxforge-world / engine /sim /fluid.py
peiti's picture
Upload folder using huggingface_hub
b154e4c verified
"""
engine/sim/fluid.py
===================
Hydrodynamika — Level 1 (Cellular Fluid) + Level 2 (Pressure Fluid).
Blueprint §4.4: "Chybějící hlavní modul."
Blueprint §15: Detailní návrh první verze — implementujeme zde přesně.
Level 1: voda se přesouvá mezi buňkami, gravitace, hladina,
jednoduché šíření, zaplavování, odtok, tlak jako aproximace.
Level 2: tlakové pole, tok podle gradientu tlaku, objemová konzervace.
Algoritmus (blueprint §15):
1. aplikuj gravitaci
2. spočítej možné přesuny dolů
3. rozlij přebytek do stran
4. vyrovnej hladinu v sousedních buňkách
5. aplikuj tlak pro uzavřené prostory
6. aktualizuj interakce s materiály
7. emituj události
Události (blueprint §15):
FLUID_CELL_FILLED, FLUID_CELL_DRAINED, FLUID_PRESSURE_HIGH,
FLUID_CONTACT_MATERIAL, FLUID_EXTINGUISH_FIRE, FLUID_DAMAGE_STRUCTURE,
FLUID_ACTIVATE_SENSOR
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Tuple
import numpy as np
# ── Event types pro fluid (blueprint §15) ────────────────────────────────────
class FluidEventType:
CELL_FILLED = "fluid_cell_filled"
CELL_DRAINED = "fluid_cell_drained"
PRESSURE_HIGH = "fluid_pressure_high"
CONTACT_MATERIAL = "fluid_contact_material"
EXTINGUISH_FIRE = "fluid_extinguish_fire"
DAMAGE_STRUCTURE = "fluid_damage_structure"
ACTIVATE_SENSOR = "fluid_activate_sensor"
# ── FluidGrid — datová vrstva ──────────────────────────────────────────────────
class FluidGrid:
"""
Voxelový fluid field (Level 1 + Level 2).
Každá buňka má:
- volume : množství kapaliny (0.0 – 1.0, kde 1.0 = plná buňka)
- pressure : tlakové pole (pro Level 2)
- settled : buňka v klidu (optimalizace — přeskočit settled buňky)
Integruje se přes World: materials grid určuje kde je pevná hmota,
fluid_vol grid nese množství kapaliny.
"""
MAX_VOLUME = 1.0 # maximální objem buňky
MIN_FLOW = 0.005 # minimální tok (pod tímto ignoruj)
MAX_FLOW = 0.95 # maximální tok v jednom kroku
FLOW_SPEED = 0.5 # rychlost toku (0-1)
def __init__(self, sx: int, sy: int, sz: int):
self.sx = sx
self.sy = sy
self.sz = sz
self.volume = np.zeros((sx, sy, sz), dtype=np.float32)
self.pressure = np.zeros((sx, sy, sz), dtype=np.float32)
self._new_vol = np.zeros((sx, sy, sz), dtype=np.float32)
self.solid = np.zeros((sx, sy, sz), dtype=bool) # true = pevná hmota
self.ignited = np.zeros((sx, sy, sz), dtype=bool) # hoří zde?
self._tick = 0
def set_solid(self, x: int, y: int, z: int, is_solid: bool) -> None:
self.solid[x, y, z] = is_solid
if is_solid:
self.volume[x, y, z] = 0.0
def add_fluid(self, x: int, y: int, z: int, amount: float) -> float:
"""Přidej kapalinu. Vrátí přebytek."""
if self.solid[x, y, z]:
return amount
space = self.MAX_VOLUME - self.volume[x, y, z]
added = min(space, amount)
self.volume[x, y, z] += added
return amount - added
def drain_fluid(self, x: int, y: int, z: int, amount: float) -> float:
"""Odeber kapalinu. Vrátí odebrané množství."""
drained = min(self.volume[x, y, z], amount)
self.volume[x, y, z] -= drained
return drained
def total_volume(self) -> float:
return float(self.volume.sum())
# ── Hlavní simulační krok ─────────────────────────────────────────────────
def step(self, tick: int) -> List[dict]:
"""
Jeden tick fluidní simulace.
Vrátí seznam událostí pro EventBus.
"""
self._tick = tick
events: List[dict] = []
old_vol = self.volume.copy()
self._new_vol[:] = self.volume
sx, sy, sz = self.sx, self.sy, self.sz
# 1. Gravitace — tok dolů (y-osa = výška, y=0 je dno)
self._flow_down(events)
# 2. Rozlití do stran (x, z)
self._flow_horizontal(events)
# 3. Pressure flow — vyrovnání hladiny (Level 2 approximace)
self._equalize_pressure(events)
# 4. Aplikuj nový objem
self.volume[:] = self._new_vol
# Clamping: zajisti 0 ≤ volume ≤ MAX_VOLUME a ne v solid
self.volume = np.clip(self.volume, 0.0, self.MAX_VOLUME)
self.volume[self.solid] = 0.0
# 5. Materiálové interakce
self._material_interactions(old_vol, events)
# 6. Tlakové pole (pro Level 2, aproximace)
self._update_pressure()
return events
def _flow_down(self, events: List[dict]) -> None:
"""Gravitační tok — kapalina teče dolů (y-1)."""
for y in range(1, self.sy):
# Buňky kde je kapalina
has_fluid = (self._new_vol[:, y, :] > self.MIN_FLOW) & ~self.solid[:, y, :]
# Spodní buňka musí být volná
below_free = ~self.solid[:, y-1, :]
if not np.any(has_fluid & below_free):
continue
# Kolik může odtéct dolů
can_take = self.MAX_VOLUME - self._new_vol[:, y-1, :]
can_take = np.maximum(can_take, 0.0)
flow = np.minimum(self._new_vol[:, y, :], can_take)
flow = np.minimum(flow, self.MAX_FLOW) * self.FLOW_SPEED
flow[~(has_fluid & below_free)] = 0.0
self._new_vol[:, y, :] -= flow
self._new_vol[:, y-1, :] += flow
def _flow_horizontal(self, events: List[dict]) -> None:
"""Horizontální tok — kapalina se šíří do stran."""
DIRS = [(1, 0), (-1, 0), (0, 1), (0, -1)] # (dx, dz)
for dx, dz in DIRS:
x_src = slice(max(0, -dx), self.sx - max(0, dx))
z_src = slice(max(0, -dz), self.sz - max(0, dz))
x_dst = slice(max(0, dx), self.sx - max(0, -dx))
z_dst = slice(max(0, dz), self.sz - max(0, -dz))
src_vol = self._new_vol[x_src, :, z_src]
dst_vol = self._new_vol[x_dst, :, z_dst]
src_solid = self.solid[x_src, :, z_src]
dst_solid = self.solid[x_dst, :, z_dst]
# Tok jen pokud src > dst a dst není solid
diff = (src_vol - dst_vol) * 0.25
flow = np.maximum(diff, 0.0) * self.FLOW_SPEED * 0.5
flow[src_solid | dst_solid] = 0.0
flow = np.minimum(flow, self.MAX_FLOW * 0.25)
self._new_vol[x_src, :, z_src] -= flow
self._new_vol[x_dst, :, z_dst] += flow
def _equalize_pressure(self, events: List[dict]) -> None:
"""
Level 2: Pressure equalization — tok podle gradientu tlaku.
Umožňuje kapalině stoupat pod tlakem (communicating vessels).
"""
# Spočítej "hydrostatic pressure" = součet kapaliny ve sloupci nad buňkou
for y in range(self.sy - 2, -1, -1):
above = self._new_vol[:, y+1, :]
here = self._new_vol[:, y, :]
# Pokud je buňka plná a nad ní je kapalina → tlak
full = here >= self.MAX_VOLUME - self.MIN_FLOW
pressure = above * 0.15 # tlakový koeficient
can_push = ~self.solid[:, y, :] & full
flow = np.where(can_push, pressure, 0.0)
flow = np.minimum(flow, self._new_vol[:, y+1, :])
self._new_vol[:, y, :] = np.minimum(self._new_vol[:, y, :] + flow * 0.1,
self.MAX_VOLUME + 0.15) # slight overpressure
# Emituj high pressure event (sampling)
if np.any(self._new_vol[:, y, :] > self.MAX_VOLUME):
events.append({
"type": FluidEventType.PRESSURE_HIGH,
"y": y,
"max_pressure": float(self._new_vol[:, y, :].max()),
})
break
def _material_interactions(self, old_vol: np.ndarray, events: List[dict]) -> None:
"""
Blueprint §15: interakce kapaliny s materiály světa.
- FLUID_EXTINGUISH_FIRE: voda + oheň = zhasnutí
- FLUID_DAMAGE_STRUCTURE: voda (nebo lava) oslabí materiál
"""
# Nově zaplavené buňky
newly_filled = (self.volume > 0.3) & (old_vol <= 0.05)
newly_drained = (self.volume < 0.05) & (old_vol > 0.3)
if np.any(newly_filled):
coords = list(zip(*np.where(newly_filled)))
for (x, y, z) in coords[:5]: # limituj počet eventů
events.append({
"type": FluidEventType.CELL_FILLED,
"pos": (int(x), int(y), int(z)),
"volume": float(self.volume[x, y, z]),
})
# Zhasnutí ohně vodou
if np.any(newly_filled & self.ignited):
events.append({"type": FluidEventType.EXTINGUISH_FIRE})
if np.any(newly_drained):
events.append({"type": FluidEventType.CELL_DRAINED})
def _update_pressure(self) -> None:
"""Aktualizuj pressure field z aktuálního volume."""
# Jednoduchá aproximace: pressure = volume + kolik je nad tím
self.pressure[:] = self.volume
for y in range(self.sy - 2, -1, -1):
self.pressure[:, y, :] += self.pressure[:, y+1, :] * 0.3
np.clip(self.pressure, 0, None, out=self.pressure)
# ── Debug ─────────────────────────────────────────────────────────────────
def cross_section_y(self, y: int) -> str:
"""ASCII vizualizace horizontálního řezu na výšce y."""
chars = " .░▒▓█"
lines = []
for z in range(self.sz):
row = ""
for x in range(self.sx):
if self.solid[x, y, z]:
row += "█"
else:
v = self.volume[x, y, z]
idx = min(int(v * (len(chars) - 1)), len(chars) - 1)
row += chars[idx]
lines.append(row)
return "\n".join(lines)
def column_y(self, x: int, z: int) -> str:
"""ASCII vizualizace vertikálního sloupce."""
chars = " ░▒▓█"
parts = []
for y in range(self.sy - 1, -1, -1):
if self.solid[x, y, z]:
parts.append("█")
else:
v = self.volume[x, y, z]
idx = min(int(v * (len(chars) - 1)), len(chars) - 1)
parts.append(chars[idx])
return "".join(parts)
# ── FluidSimulation — modul pro Scheduler ────────────────────────────────────
class FluidSimulation:
"""
Obal FluidGrid pro použití jako SimModule v Scheduleru.
Použití:
fluid_sim = FluidSimulation(32, 16, 32, bus=event_bus)
scheduler.register(SimModule("fluid", fluid_sim.step, every_n_ticks=1, priority=2))
"""
def __init__(self, sx: int, sy: int, sz: int, bus=None):
self.grid = FluidGrid(sx, sy, sz)
self.bus = bus
self._total_events = 0
def step(self, tick: int) -> None:
events = self.grid.step(tick)
self._total_events += len(events)
if self.bus and events:
for evt_data in events:
try:
from engine.core.events import Event
self.bus.publish(Event(
type=evt_data["type"],
tick=tick,
payload=evt_data,
))
except Exception:
pass # EventBus nemusí být dostupný
def add_source(self, x: int, y: int, z: int, rate: float = 0.2) -> None:
"""Přidej zdroj kapaliny (pramen)."""
self.grid.add_fluid(x, y, z, rate)
def set_solid_from_materials(self, materials_array: np.ndarray,
solid_ids: set) -> None:
"""
Načti solid mapu z material gridu světa.
solid_ids = sada material_id které jsou pevné (neprostupné pro vodu).
"""
for mat_id in solid_ids:
self.grid.solid |= (materials_array == mat_id)
# Vzduch (0) a voda (10) nejsou solid
self.grid.solid[materials_array == 0] = False
@property
def total_volume(self) -> float:
return self.grid.total_volume()
@property
def stats(self) -> dict:
v = self.grid.volume
return {
"total_volume": round(float(v.sum()), 4),
"max_volume": round(float(v.max()), 4),
"wet_cells": int(np.sum(v > 0.01)),
"total_events": self._total_events,
}