QuadOpt-RL / exploit_SB3_policy.py
ropercha
Mesh environement
ba246bb
import copy
import yaml
import gymnasium as gym
import numpy as np
import pandas as pd
import random
from tqdm import tqdm
from numpy import ndarray
from stable_baselines3 import PPO
from mesh_model.mesh_analysis.quadmesh_analysis import QuadMeshTopoAnalysis
from mesh_model.mesh_struct.mesh import Mesh
from mesh_model.mesh_struct.mesh_elements import Dart
from mesh_model.reader import read_gmsh, read_dataset, read_json
from mesh_model.writer import write_dataset
from view.mesh_plotter.create_plots import plot_test_results, plot_density
from view.mesh_plotter.mesh_plots import plot_dataset, plot_mesh, save_dataset_plot
from environment.actions.smoothing import smoothing_mean
from environment.quadmesh_env.wrappers import MeanRewardWrapper, WeightedRewardWrapper, CleanupWrapper
from environment import quadmesh_env
def testPolicy(
model,
n_eval_episodes: int,
config,
dataset: list[Mesh]
) -> pd.DataFrame:
"""
Tests policy on each mesh of a dataset with n_eval_episodes.
:param model: the model to test
:param n_eval_episodes: number of evaluation episodes on each mesh
:param config: configuration
:param dataset: list of mesh objects
:return: average length of evaluation episodes, number of wins,average reward per mesh, dataset with the modified meshes
"""
print('------------Testing policy-------------')
avg_length = np.zeros(len(dataset))
avg_mesh_rewards = np.zeros(len(dataset))
avg_normalized_return = np.zeros(len(dataset))
avg_mean_std = np.zeros(len(dataset))
list_normalized_return = np.zeros(n_eval_episodes)
nb_wins = np.zeros(len(dataset))
final_meshes = []
for i, mesh in tqdm(enumerate(dataset, 1)):
best_mesh = mesh
env = gym.make(
config["eval"]["eval_env_id"],
max_episode_steps=config["eval"]["max_episode_steps"],
learning_mesh = mesh,
n_darts_selected=config["eval"]["n_darts_selected"],
deep= config["eval"]["deep"],
action_restriction=config["eval"]["action_restriction"],
with_degree_obs=config["eval"]["with_quality_observation"],
render_mode = config["eval"]["render_mode"],
analysis_type=config["env"]["analysis_type"],
debug=False,
)
# env = CleanupWrapper(env)
for ne in range(n_eval_episodes):
terminated = False
truncated = False
ep_mesh_rewards: int = 0
ep_length: int = 0
cpt = 0
obs, info = env.reset(options={"mesh": copy.deepcopy(mesh)})
best_mesh_episode = mesh
mesh_init_score = info["mesh_score"]
mesh_ideal_score = info["mesh_ideal_score"]
best_mesh_episode_score = info["mesh_score"]
while terminated == False and truncated == False:
action, _states = model.predict(obs, deterministic=False)
if action is None:
env.terminal = True
break
obs, reward, terminated, truncated, info = env.step(action)
if info["mesh_score"]<best_mesh_episode_score:
best_mesh_episode = copy.deepcopy(info['mesh'])
best_mesh_episode_score = info["mesh_score"]
ep_mesh_rewards += info['mesh_reward']
if ep_mesh_rewards <= 0:
cpt +=1
if cpt > 30:
truncated = True
ep_length += 1
if terminated:
nb_wins[i-1] += 1
if isBetterMesh(best_mesh, best_mesh_episode, config["env"]["analysis_type"]):
best_mesh = copy.deepcopy(best_mesh_episode)
avg_length[i-1] += ep_length
avg_mesh_rewards[i-1] += ep_mesh_rewards
list_normalized_return[ne-1] = 0 if mesh_init_score == mesh_ideal_score else (mesh_init_score - best_mesh_episode_score) /(mesh_init_score- mesh_ideal_score)
avg_normalized_return[i-1] += 0 if mesh_init_score == mesh_ideal_score else (mesh_init_score - best_mesh_episode_score) /(mesh_init_score- mesh_ideal_score)
final_meshes.append(best_mesh)
avg_length[i-1] = avg_length[i-1]/n_eval_episodes
avg_mesh_rewards[i-1] = avg_mesh_rewards[i-1]/n_eval_episodes
avg_normalized_return[i-1] = avg_normalized_return[i-1]/n_eval_episodes
avg_mean_std[i-1] = np.std(list_normalized_return)
# Création du DataFrame
df_results = pd.DataFrame({
"mesh_id": range(len(dataset)),
"avg_length": avg_length,
"nb_wins": nb_wins,
"avg_mesh_rewards": avg_mesh_rewards,
"avg_normalized_return": avg_normalized_return,
"std_normalized_return": avg_mean_std,
"final_mesh": final_meshes
})
return df_results
def isBetterPolicy(actual_best_policy, policy_to_test):
if actual_best_policy is None:
return True
def isBetterMesh(best_mesh, actual_mesh, analysis_type):
ma_best_mesh = QuadMeshTopoAnalysis(best_mesh)
ma_actual_mesh = QuadMeshTopoAnalysis(actual_mesh)
if best_mesh is None or ma_best_mesh.global_score()[1] > ma_actual_mesh.global_score()[1]:
return True
else:
return False
if __name__ == '__main__':
# PARAMETERS CONFIGURATION
with open("environment/config.yaml", "r") as f:
config = yaml.safe_load(f)
print("------------Reading dataset---------------")
dataset = [read_gmsh("mesh_files/imr3.msh")]# read_dataset(config["dataset"]["exploit_dataset_dir"]) #[ read_gmsh("../mesh_files/bunny.msh")]
#plot_dataset(dataset)
print("------------Loading Model-----------------")
#Load the model
model = PPO.load("trained_models/full_dataset_ob36-v0.zip")
df_results = testPolicy(model, 50, config, dataset)
#plot_test_results(df_results["avg_mesh_rewards"], df_results["nb_wins"], df_results["avg_length"], df_results["avg_normalized_return"])
final_meshes = df_results["final_mesh"]
#plot_dataset(final_meshes)
for m in final_meshes:
smoothing_mean(m)
save_dataset_plot(final_meshes, "training/results_IMR/imr3_results.png")
# print(df_results[["mesh_id", "avg_normalized_return", "std_normalized_return"]])
df_results.drop(columns=["final_mesh"], inplace=True)
df_results = df_results.transpose()
df_results.to_csv("training/results_IMR/imr3_results.csv", index=True)
write_dataset("training/dataset/results/imr3_results", final_meshes)