Spaces:
Sleeping
Sleeping
| import asyncio | |
| import os.path | |
| import shutil | |
| import traceback | |
| from concurrent.futures import ProcessPoolExecutor | |
| from contextlib import asynccontextmanager | |
| from functools import partial | |
| from multiprocessing import get_context | |
| import numpy as np | |
| import torch | |
| from fastapi import FastAPI, HTTPException | |
| from lightning import Trainer | |
| from lightning.pytorch.callbacks import RichProgressBar | |
| from scipy.interpolate import griddata | |
| from starlette.staticfiles import StaticFiles | |
| from torch.utils.data import DataLoader | |
| from app.config import AppSettings | |
| from porous_cfd.dataset.foam_data import FoamData | |
| from porous_cfd.dataset.foam_dataset import FoamDataset, collate_fn | |
| from porous_cfd.models.pi_gano.pi_gano import PiGano | |
| from porous_cfd.models.pi_gano.pi_gano_pp import PiGanoPp | |
| from porous_cfd.models.pipn.pipn_foam import PipnFoam, PipnFoamPp, PipnFoamPpMrg | |
| from app.api_models import Predict2dInput, Response2d | |
| def get_interpolation_grid(points, grid_res) -> list[np.ndarray]: | |
| points_x, points_y = points[:, 0].flatten(), points[:, 1].flatten() | |
| xx = np.linspace(points_x.min(), points_x.max(), grid_res) | |
| yy = np.linspace(points_y.min(), points_y.max(), grid_res) | |
| return np.meshgrid(xx, yy) | |
| def interpolate_on_grid(grid, points, *data) -> list: | |
| return [griddata(points, d, tuple(grid), method='cubic', fill_value=0).flatten() for d in data] | |
| def ndarrays_to_list(data: dict[str:np.ndarray]): | |
| for k, v in data.items(): | |
| data[k] = v.flatten().tolist() | |
| return data | |
| def inverse_transform_output(dataset: FoamDataset, data: FoamData, *fields) -> list[np.ndarray]: | |
| return [dataset.normalizers[f].inverse_transform(data[f].numpy(force=True)) for f in fields] | |
| def generate_data(input_data: Predict2dInput, session_root: str): | |
| # Only import blender in a subprocess, as the path is passed from the main process (see https://projects.blender.org/blender/blender/issues/98534) | |
| # This has to be done here otherwise the context is incorrect | |
| from porous_cfd.examples.duct_variable_boundary.generator_2d_variable import Generator2DVariable | |
| from app.preprocessing import path_to_obj, create_session_folders | |
| create_session_folders("assets", session_root, input_data) | |
| path_to_obj(input_data.points["x"], input_data.points["y"], f"{session_root}/assets/meshes/split") | |
| datagen = Generator2DVariable(f"{session_root}/assets", openfoam_cmd, settings.n_procs, 0, False) | |
| datagen.write_momentum = False | |
| datagen.save_plots = False | |
| datagen.generate(f"{session_root}/data") | |
| # Override meta and min_points from training set | |
| model_dir = "pipn" if "pipn" in input_data.model else "pi-gano" | |
| shutil.copy(f"assets/{model_dir}/min_points.json", f"{session_root}/data") | |
| shutil.copy(f"assets/{model_dir}/meta.json", f"{session_root}/data/split") | |
| def postprocess(dataset: FoamDataset, predicted: FoamData, residuals: FoamData): | |
| c, tgt_u, tgt_p = inverse_transform_output(dataset, dataset[0], "C", "U", "p") | |
| points = {"x": c[..., 0], | |
| "y": c[..., 1]} | |
| target = {"Ux": tgt_u[..., 0], | |
| "Uy": tgt_u[..., 1], | |
| "U": np.linalg.norm(tgt_u, axis=1), | |
| "p": tgt_p} | |
| pred_u, pred_p = inverse_transform_output(dataset, predicted, "U", "p") | |
| pred = {"Ux": pred_u[0, ..., 0], | |
| "Uy": pred_u[0, ..., 1], | |
| "U": np.linalg.norm(pred_u[0], axis=1), | |
| "p": pred_p[0]} | |
| error_u, error_p = np.abs(pred_u - tgt_u), np.abs(pred_p - tgt_p) | |
| error = {"Ux": error_u[0, ..., 0], | |
| "Uy": error_u[0, ..., 1], | |
| "U": np.linalg.norm(error_u[0], axis=1), | |
| "p": error_p[0]} | |
| residuals = {"Momentumx": residuals["Momentumx"].numpy(force=True)[0], | |
| "Momentumy": residuals["Momentumy"].numpy(force=True)[0], | |
| "Momentum": np.linalg.norm(residuals["Momentum"].numpy(force=True)[0], axis=1), | |
| "div": residuals["div"].numpy(force=True)[0]} | |
| porous_ids = dataset[0]["cellToRegion"].flatten().numpy(force=True) | |
| grid = get_interpolation_grid(c, 50) | |
| grid_points = {"x": grid[0].flatten(), "y": grid[1].flatten()} | |
| grid_pred = interpolate_on_grid(grid, c, pred["Ux"], pred["Uy"], pred["U"], pred["p"]) | |
| grid_pred = dict(zip(pred.keys(), grid_pred)) | |
| grid_target = interpolate_on_grid(grid, c, target["Ux"], target["Uy"], target["U"], target["p"]) | |
| grid_target = dict(zip(target.keys(), grid_target)) | |
| grid_error = interpolate_on_grid(grid, c, error["Ux"], error["Uy"], error["U"], error["p"]) | |
| grid_error = dict(zip(pred.keys(), grid_error)) | |
| internal_c = dataset.normalizers["C"].inverse_transform(dataset[0]["internal"]["C"].numpy(force=True)) | |
| grid_residuals = interpolate_on_grid(grid, internal_c, residuals["Momentumx"], | |
| residuals["Momentumy"], | |
| residuals["Momentum"], | |
| residuals["div"]) | |
| grid_residuals = dict(zip(residuals.keys(), grid_residuals)) | |
| raw_data = Response2d(points=ndarrays_to_list(points), | |
| target=ndarrays_to_list(target), | |
| porous_ids=porous_ids.tolist(), | |
| predicted=ndarrays_to_list(pred), | |
| error=ndarrays_to_list(error), | |
| residuals=ndarrays_to_list(residuals)) | |
| grid_data = Response2d(points=ndarrays_to_list(grid_points), | |
| target=ndarrays_to_list(grid_target), | |
| predicted=ndarrays_to_list(grid_pred), | |
| error=ndarrays_to_list(grid_error), | |
| residuals=ndarrays_to_list(grid_residuals)) | |
| return {"raw_data": raw_data, "grid_data": grid_data} | |
| async def lifespan(app: FastAPI): | |
| app.models['pipn'] = PipnFoam.load_from_checkpoint("assets/pipn/pipn.ckpt") | |
| app.models['pipn'].verbose_predict = True | |
| app.models['pipn_pp'] = PipnFoamPp.load_from_checkpoint("assets/pipn/pipn-pp.ckpt") | |
| app.models['pipn_pp'].verbose_predict = True | |
| app.models['pipn_pp_mrg'] = PipnFoamPpMrg.load_from_checkpoint("assets/pipn/pipn-pp-mrg.ckpt") | |
| app.models['pipn_pp_mrg'].verbose_predict = True | |
| app.models['pi_gano'] = PiGano.load_from_checkpoint("assets/pi-gano/pi_gano.ckpt") | |
| app.models['pi_gano'].verbose_predict = True | |
| app.models['pi_gano_pp'] = PiGanoPp.load_from_checkpoint("assets/pi-gano/pi_gano_pp.ckpt") | |
| app.models['pi_gano_pp'].verbose_predict = True | |
| yield | |
| app.process_pool.shutdown() | |
| settings = AppSettings() | |
| app = FastAPI(lifespan=lifespan) | |
| openfoam_cmd = f'{settings.openfoam_dir}/etc/openfoam' | |
| app.process_pool = ProcessPoolExecutor(mp_context=get_context("forkserver")) | |
| app.models = {} | |
| async def predict(input_data: Predict2dInput): | |
| session_dir = f"sessions/{input_data.uuid}" | |
| try: | |
| event_loop = asyncio.get_running_loop() | |
| partial_f = partial(generate_data, input_data, session_dir) | |
| await event_loop.run_in_executor(app.process_pool, partial_f) | |
| dataset = FoamDataset(f"{session_dir}/data/split", 1000, 200, 500, | |
| np.random.default_rng(8421)) | |
| torch.manual_seed(8421) | |
| data_loader = DataLoader(dataset, | |
| 1, | |
| num_workers=settings.n_procs, | |
| persistent_workers=True, | |
| shuffle=False, | |
| pin_memory=True, | |
| collate_fn=collate_fn) | |
| trainer = Trainer(logger=False, | |
| enable_checkpointing=False, | |
| inference_mode=False, | |
| callbacks=[RichProgressBar()]) | |
| # No lock as async coroutines run on the main thread | |
| model = app.models[input_data.model] | |
| predicted, residuals = trainer.predict(model, dataloaders=data_loader)[0] | |
| shutil.rmtree(session_dir) | |
| # Detach predicted data as autograd computation graphs are not supported with multiprocessing | |
| partial_f = partial(postprocess, dataset, predicted.detach(), residuals.detach()) | |
| return await event_loop.run_in_executor(app.process_pool, partial_f) | |
| except: | |
| traceback.print_exc() | |
| if os.path.exists(session_dir): | |
| shutil.rmtree(session_dir) | |
| raise HTTPException(status_code=500) | |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") | |