Spaces:
Sleeping
Sleeping
| import copy | |
| import yaml | |
| import gymnasium as gym | |
| import numpy as np | |
| import pandas as pd | |
| import random | |
| from tqdm import tqdm | |
| from numpy import ndarray | |
| from stable_baselines3 import PPO | |
| from mesh_model.mesh_analysis.quadmesh_analysis import QuadMeshTopoAnalysis | |
| from mesh_model.mesh_struct.mesh import Mesh | |
| from mesh_model.mesh_struct.mesh_elements import Dart | |
| from mesh_model.reader import read_gmsh, read_dataset, read_json | |
| from mesh_model.writer import write_dataset | |
| from view.mesh_plotter.create_plots import plot_test_results, plot_density | |
| from view.mesh_plotter.mesh_plots import plot_dataset, plot_mesh, save_dataset_plot | |
| from environment.actions.smoothing import smoothing_mean | |
| from environment.quadmesh_env.wrappers import MeanRewardWrapper, WeightedRewardWrapper, CleanupWrapper | |
| from environment import quadmesh_env | |
| def testPolicy( | |
| model, | |
| n_eval_episodes: int, | |
| config, | |
| dataset: list[Mesh] | |
| ) -> pd.DataFrame: | |
| """ | |
| Tests policy on each mesh of a dataset with n_eval_episodes. | |
| :param model: the model to test | |
| :param n_eval_episodes: number of evaluation episodes on each mesh | |
| :param config: configuration | |
| :param dataset: list of mesh objects | |
| :return: average length of evaluation episodes, number of wins,average reward per mesh, dataset with the modified meshes | |
| """ | |
| print('------------Testing policy-------------') | |
| avg_length = np.zeros(len(dataset)) | |
| avg_mesh_rewards = np.zeros(len(dataset)) | |
| avg_normalized_return = np.zeros(len(dataset)) | |
| avg_mean_std = np.zeros(len(dataset)) | |
| list_normalized_return = np.zeros(n_eval_episodes) | |
| nb_wins = np.zeros(len(dataset)) | |
| final_meshes = [] | |
| for i, mesh in tqdm(enumerate(dataset, 1)): | |
| best_mesh = mesh | |
| env = gym.make( | |
| config["eval"]["eval_env_id"], | |
| max_episode_steps=config["eval"]["max_episode_steps"], | |
| learning_mesh = mesh, | |
| n_darts_selected=config["eval"]["n_darts_selected"], | |
| deep= config["eval"]["deep"], | |
| action_restriction=config["eval"]["action_restriction"], | |
| with_degree_obs=config["eval"]["with_quality_observation"], | |
| render_mode = config["eval"]["render_mode"], | |
| analysis_type=config["env"]["analysis_type"], | |
| debug=False, | |
| ) | |
| # env = CleanupWrapper(env) | |
| for ne in range(n_eval_episodes): | |
| terminated = False | |
| truncated = False | |
| ep_mesh_rewards: int = 0 | |
| ep_length: int = 0 | |
| cpt = 0 | |
| obs, info = env.reset(options={"mesh": copy.deepcopy(mesh)}) | |
| best_mesh_episode = mesh | |
| mesh_init_score = info["mesh_score"] | |
| mesh_ideal_score = info["mesh_ideal_score"] | |
| best_mesh_episode_score = info["mesh_score"] | |
| while terminated == False and truncated == False: | |
| action, _states = model.predict(obs, deterministic=False) | |
| if action is None: | |
| env.terminal = True | |
| break | |
| obs, reward, terminated, truncated, info = env.step(action) | |
| if info["mesh_score"]<best_mesh_episode_score: | |
| best_mesh_episode = copy.deepcopy(info['mesh']) | |
| best_mesh_episode_score = info["mesh_score"] | |
| ep_mesh_rewards += info['mesh_reward'] | |
| if ep_mesh_rewards <= 0: | |
| cpt +=1 | |
| if cpt > 30: | |
| truncated = True | |
| ep_length += 1 | |
| if terminated: | |
| nb_wins[i-1] += 1 | |
| if isBetterMesh(best_mesh, best_mesh_episode, config["env"]["analysis_type"]): | |
| best_mesh = copy.deepcopy(best_mesh_episode) | |
| avg_length[i-1] += ep_length | |
| avg_mesh_rewards[i-1] += ep_mesh_rewards | |
| list_normalized_return[ne-1] = 0 if mesh_init_score == mesh_ideal_score else (mesh_init_score - best_mesh_episode_score) /(mesh_init_score- mesh_ideal_score) | |
| avg_normalized_return[i-1] += 0 if mesh_init_score == mesh_ideal_score else (mesh_init_score - best_mesh_episode_score) /(mesh_init_score- mesh_ideal_score) | |
| final_meshes.append(best_mesh) | |
| avg_length[i-1] = avg_length[i-1]/n_eval_episodes | |
| avg_mesh_rewards[i-1] = avg_mesh_rewards[i-1]/n_eval_episodes | |
| avg_normalized_return[i-1] = avg_normalized_return[i-1]/n_eval_episodes | |
| avg_mean_std[i-1] = np.std(list_normalized_return) | |
| # Création du DataFrame | |
| df_results = pd.DataFrame({ | |
| "mesh_id": range(len(dataset)), | |
| "avg_length": avg_length, | |
| "nb_wins": nb_wins, | |
| "avg_mesh_rewards": avg_mesh_rewards, | |
| "avg_normalized_return": avg_normalized_return, | |
| "std_normalized_return": avg_mean_std, | |
| "final_mesh": final_meshes | |
| }) | |
| return df_results | |
| def isBetterPolicy(actual_best_policy, policy_to_test): | |
| if actual_best_policy is None: | |
| return True | |
| def isBetterMesh(best_mesh, actual_mesh, analysis_type): | |
| ma_best_mesh = QuadMeshTopoAnalysis(best_mesh) | |
| ma_actual_mesh = QuadMeshTopoAnalysis(actual_mesh) | |
| if best_mesh is None or ma_best_mesh.global_score()[1] > ma_actual_mesh.global_score()[1]: | |
| return True | |
| else: | |
| return False | |
| if __name__ == '__main__': | |
| # PARAMETERS CONFIGURATION | |
| with open("environment/config.yaml", "r") as f: | |
| config = yaml.safe_load(f) | |
| print("------------Reading dataset---------------") | |
| dataset = [read_gmsh("mesh_files/imr3.msh")]# read_dataset(config["dataset"]["exploit_dataset_dir"]) #[ read_gmsh("../mesh_files/bunny.msh")] | |
| #plot_dataset(dataset) | |
| print("------------Loading Model-----------------") | |
| #Load the model | |
| model = PPO.load("trained_models/full_dataset_ob36-v0.zip") | |
| df_results = testPolicy(model, 50, config, dataset) | |
| #plot_test_results(df_results["avg_mesh_rewards"], df_results["nb_wins"], df_results["avg_length"], df_results["avg_normalized_return"]) | |
| final_meshes = df_results["final_mesh"] | |
| #plot_dataset(final_meshes) | |
| for m in final_meshes: | |
| smoothing_mean(m) | |
| save_dataset_plot(final_meshes, "training/results_IMR/imr3_results.png") | |
| # print(df_results[["mesh_id", "avg_normalized_return", "std_normalized_return"]]) | |
| df_results.drop(columns=["final_mesh"], inplace=True) | |
| df_results = df_results.transpose() | |
| df_results.to_csv("training/results_IMR/imr3_results.csv", index=True) | |
| write_dataset("training/dataset/results/imr3_results", final_meshes) |