porous-cfd / app /server.py
Gallinator
Server: use forkserver as worker context
2e7ff4d unverified
import asyncio
import os.path
import shutil
import traceback
from concurrent.futures import ProcessPoolExecutor
from contextlib import asynccontextmanager
from functools import partial
from multiprocessing import get_context
import numpy as np
import torch
from fastapi import FastAPI, HTTPException
from lightning import Trainer
from lightning.pytorch.callbacks import RichProgressBar
from scipy.interpolate import griddata
from starlette.staticfiles import StaticFiles
from torch.utils.data import DataLoader
from app.config import AppSettings
from porous_cfd.dataset.foam_data import FoamData
from porous_cfd.dataset.foam_dataset import FoamDataset, collate_fn
from porous_cfd.models.pi_gano.pi_gano import PiGano
from porous_cfd.models.pi_gano.pi_gano_pp import PiGanoPp
from porous_cfd.models.pipn.pipn_foam import PipnFoam, PipnFoamPp, PipnFoamPpMrg
from app.api_models import Predict2dInput, Response2d
def get_interpolation_grid(points, grid_res) -> list[np.ndarray]:
points_x, points_y = points[:, 0].flatten(), points[:, 1].flatten()
xx = np.linspace(points_x.min(), points_x.max(), grid_res)
yy = np.linspace(points_y.min(), points_y.max(), grid_res)
return np.meshgrid(xx, yy)
def interpolate_on_grid(grid, points, *data) -> list:
return [griddata(points, d, tuple(grid), method='cubic', fill_value=0).flatten() for d in data]
def ndarrays_to_list(data: dict[str:np.ndarray]):
for k, v in data.items():
data[k] = v.flatten().tolist()
return data
def inverse_transform_output(dataset: FoamDataset, data: FoamData, *fields) -> list[np.ndarray]:
return [dataset.normalizers[f].inverse_transform(data[f].numpy(force=True)) for f in fields]
def generate_data(input_data: Predict2dInput, session_root: str):
# Only import blender in a subprocess, as the path is passed from the main process (see https://projects.blender.org/blender/blender/issues/98534)
# This has to be done here otherwise the context is incorrect
from porous_cfd.examples.duct_variable_boundary.generator_2d_variable import Generator2DVariable
from app.preprocessing import path_to_obj, create_session_folders
create_session_folders("assets", session_root, input_data)
path_to_obj(input_data.points["x"], input_data.points["y"], f"{session_root}/assets/meshes/split")
datagen = Generator2DVariable(f"{session_root}/assets", openfoam_cmd, settings.n_procs, 0, False)
datagen.write_momentum = False
datagen.save_plots = False
datagen.generate(f"{session_root}/data")
# Override meta and min_points from training set
model_dir = "pipn" if "pipn" in input_data.model else "pi-gano"
shutil.copy(f"assets/{model_dir}/min_points.json", f"{session_root}/data")
shutil.copy(f"assets/{model_dir}/meta.json", f"{session_root}/data/split")
def postprocess(dataset: FoamDataset, predicted: FoamData, residuals: FoamData):
c, tgt_u, tgt_p = inverse_transform_output(dataset, dataset[0], "C", "U", "p")
points = {"x": c[..., 0],
"y": c[..., 1]}
target = {"Ux": tgt_u[..., 0],
"Uy": tgt_u[..., 1],
"U": np.linalg.norm(tgt_u, axis=1),
"p": tgt_p}
pred_u, pred_p = inverse_transform_output(dataset, predicted, "U", "p")
pred = {"Ux": pred_u[0, ..., 0],
"Uy": pred_u[0, ..., 1],
"U": np.linalg.norm(pred_u[0], axis=1),
"p": pred_p[0]}
error_u, error_p = np.abs(pred_u - tgt_u), np.abs(pred_p - tgt_p)
error = {"Ux": error_u[0, ..., 0],
"Uy": error_u[0, ..., 1],
"U": np.linalg.norm(error_u[0], axis=1),
"p": error_p[0]}
residuals = {"Momentumx": residuals["Momentumx"].numpy(force=True)[0],
"Momentumy": residuals["Momentumy"].numpy(force=True)[0],
"Momentum": np.linalg.norm(residuals["Momentum"].numpy(force=True)[0], axis=1),
"div": residuals["div"].numpy(force=True)[0]}
porous_ids = dataset[0]["cellToRegion"].flatten().numpy(force=True)
grid = get_interpolation_grid(c, 50)
grid_points = {"x": grid[0].flatten(), "y": grid[1].flatten()}
grid_pred = interpolate_on_grid(grid, c, pred["Ux"], pred["Uy"], pred["U"], pred["p"])
grid_pred = dict(zip(pred.keys(), grid_pred))
grid_target = interpolate_on_grid(grid, c, target["Ux"], target["Uy"], target["U"], target["p"])
grid_target = dict(zip(target.keys(), grid_target))
grid_error = interpolate_on_grid(grid, c, error["Ux"], error["Uy"], error["U"], error["p"])
grid_error = dict(zip(pred.keys(), grid_error))
internal_c = dataset.normalizers["C"].inverse_transform(dataset[0]["internal"]["C"].numpy(force=True))
grid_residuals = interpolate_on_grid(grid, internal_c, residuals["Momentumx"],
residuals["Momentumy"],
residuals["Momentum"],
residuals["div"])
grid_residuals = dict(zip(residuals.keys(), grid_residuals))
raw_data = Response2d(points=ndarrays_to_list(points),
target=ndarrays_to_list(target),
porous_ids=porous_ids.tolist(),
predicted=ndarrays_to_list(pred),
error=ndarrays_to_list(error),
residuals=ndarrays_to_list(residuals))
grid_data = Response2d(points=ndarrays_to_list(grid_points),
target=ndarrays_to_list(grid_target),
predicted=ndarrays_to_list(grid_pred),
error=ndarrays_to_list(grid_error),
residuals=ndarrays_to_list(grid_residuals))
return {"raw_data": raw_data, "grid_data": grid_data}
@asynccontextmanager
async def lifespan(app: FastAPI):
app.models['pipn'] = PipnFoam.load_from_checkpoint("assets/pipn/pipn.ckpt")
app.models['pipn'].verbose_predict = True
app.models['pipn_pp'] = PipnFoamPp.load_from_checkpoint("assets/pipn/pipn-pp.ckpt")
app.models['pipn_pp'].verbose_predict = True
app.models['pipn_pp_mrg'] = PipnFoamPpMrg.load_from_checkpoint("assets/pipn/pipn-pp-mrg.ckpt")
app.models['pipn_pp_mrg'].verbose_predict = True
app.models['pi_gano'] = PiGano.load_from_checkpoint("assets/pi-gano/pi_gano.ckpt")
app.models['pi_gano'].verbose_predict = True
app.models['pi_gano_pp'] = PiGanoPp.load_from_checkpoint("assets/pi-gano/pi_gano_pp.ckpt")
app.models['pi_gano_pp'].verbose_predict = True
yield
app.process_pool.shutdown()
settings = AppSettings()
app = FastAPI(lifespan=lifespan)
openfoam_cmd = f'{settings.openfoam_dir}/etc/openfoam'
app.process_pool = ProcessPoolExecutor(mp_context=get_context("forkserver"))
app.models = {}
@app.post("/predict", summary="Predict flow from porous object", response_model=dict[str, Response2d])
async def predict(input_data: Predict2dInput):
session_dir = f"sessions/{input_data.uuid}"
try:
event_loop = asyncio.get_running_loop()
partial_f = partial(generate_data, input_data, session_dir)
await event_loop.run_in_executor(app.process_pool, partial_f)
dataset = FoamDataset(f"{session_dir}/data/split", 1000, 200, 500,
np.random.default_rng(8421))
torch.manual_seed(8421)
data_loader = DataLoader(dataset,
1,
num_workers=settings.n_procs,
persistent_workers=True,
shuffle=False,
pin_memory=True,
collate_fn=collate_fn)
trainer = Trainer(logger=False,
enable_checkpointing=False,
inference_mode=False,
callbacks=[RichProgressBar()])
# No lock as async coroutines run on the main thread
model = app.models[input_data.model]
predicted, residuals = trainer.predict(model, dataloaders=data_loader)[0]
shutil.rmtree(session_dir)
# Detach predicted data as autograd computation graphs are not supported with multiprocessing
partial_f = partial(postprocess, dataset, predicted.detach(), residuals.detach())
return await event_loop.run_in_executor(app.process_pool, partial_f)
except:
traceback.print_exc()
if os.path.exists(session_dir):
shutil.rmtree(session_dir)
raise HTTPException(status_code=500)
app.mount("/", StaticFiles(directory="static", html=True), name="static")