Spaces:
Sleeping
Sleeping
| import copy | |
| import io | |
| import logging | |
| import os | |
| import gradio as gr | |
| import gymnasium as gym | |
| import numpy as np | |
| import matplotlib | |
| matplotlib.use('Agg') # Non-interactive backend for Gradio | |
| import matplotlib.pyplot as plt | |
| from stable_baselines3 import PPO | |
| from PIL import Image | |
| from huggingface_hub import hf_hub_download | |
| from mesh_model.reader import read_gmsh | |
| from mesh_model.mesh_analysis.quadmesh_analysis import QuadMeshTopoAnalysis | |
| from environment.actions.smoothing import smoothing_mean | |
| from view.mesh_plotter.mesh_plots import subplot_mesh | |
| import environment.quadmesh_env # To register the Gymnasium environment | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Default configuration for evaluation | |
| EVAL_CONFIG = { | |
| "eval": { | |
| "eval_env_id": "Quadmesh-v0", | |
| "max_episode_steps": 300, | |
| "n_darts_selected": 5, | |
| "deep": 36, | |
| "action_restriction": False, | |
| "with_quality_observation": False, | |
| "render_mode": None, | |
| }, | |
| "env": { | |
| "analysis_type": "topo" | |
| } | |
| } | |
| # Model will be loaded on first use (lazy loading) | |
| model = None | |
| def get_model(): | |
| """Load model on demand to save memory at startup.""" | |
| global model | |
| if model is None: | |
| try: | |
| model_path = hf_hub_download( | |
| repo_id="arzhela/QuadOpt-RL-ppo-sb3", | |
| filename="test_model.zip" | |
| ) | |
| model = PPO.load(model_path) | |
| logger.info(f"PPO model loaded from Hugging Face Hub: arzhela/QuadOpt-RL-ppo-sb3") | |
| except Exception as e: | |
| logger.warning(f"Unable to load model from Hugging Face Hub: {e}") | |
| return None | |
| return model | |
| def mesh_to_image(mesh, title="Mesh", scores=True): | |
| """Convert a mesh to a PIL image.""" | |
| fig, ax = plt.subplots(figsize=(10, 11) if scores else (10, 10)) | |
| subplot_mesh(mesh, debug=False, scores=scores) | |
| if title: | |
| plt.suptitle(title, fontsize=14, y=1.02) | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', dpi=150, bbox_inches='tight', pad_inches=0.1) | |
| buf.seek(0) | |
| image = Image.open(buf) | |
| plt.close(fig) | |
| return image | |
| def evaluate_mesh_with_agent(mesh, n_episodes=10, max_steps=300): | |
| """Evaluate a mesh with the pre-trained PPO agent.""" | |
| model = get_model() | |
| if model is None: | |
| return mesh, "β Model not available", {} | |
| config = EVAL_CONFIG | |
| best_mesh = mesh | |
| best_mesh_score = float('inf') | |
| log_lines = [] | |
| log_lines.append("π€ Evaluating with PPO agent...\n") | |
| env = gym.make( | |
| config["eval"]["eval_env_id"], | |
| max_episode_steps=max_steps, | |
| learning_mesh=mesh, | |
| n_darts_selected=config["eval"]["n_darts_selected"], | |
| deep=config["eval"]["deep"], | |
| action_restriction=config["eval"]["action_restriction"], | |
| with_degree_obs=config["eval"]["with_quality_observation"], | |
| render_mode=config["eval"]["render_mode"], | |
| analysis_type=config["env"]["analysis_type"], | |
| debug=False, | |
| ) | |
| total_rewards = [] | |
| normalized_returns = [] | |
| mesh_init_score = 0 | |
| mesh_ideal_score = 0 | |
| for episode in range(n_episodes): | |
| obs, info = env.reset(options={"mesh": copy.deepcopy(mesh)}) | |
| mesh_init_score = info["mesh_score"] | |
| mesh_ideal_score = info["mesh_ideal_score"] | |
| best_episode_score = mesh_init_score | |
| best_episode_mesh = mesh | |
| episode_reward = 0 | |
| step = 0 | |
| terminated = False | |
| truncated = False | |
| no_improvement_count = 0 | |
| while not terminated and not truncated: | |
| action, _states = model.predict(obs, deterministic=False) | |
| if action is None: | |
| break | |
| obs, reward, terminated, truncated, info = env.step(action) | |
| episode_reward += info.get('mesh_reward', reward) | |
| if info["mesh_score"] < best_episode_score: | |
| best_episode_mesh = copy.deepcopy(info['mesh']) | |
| best_episode_score = info["mesh_score"] | |
| no_improvement_count = 0 | |
| else: | |
| no_improvement_count += 1 | |
| if episode_reward <= 0: | |
| no_improvement_count += 1 | |
| if no_improvement_count > 30: | |
| truncated = True | |
| step += 1 | |
| if mesh_init_score != mesh_ideal_score: | |
| normalized_return = (mesh_init_score - best_episode_score) / (mesh_init_score - mesh_ideal_score) | |
| else: | |
| normalized_return = 0.0 | |
| normalized_returns.append(normalized_return) | |
| total_rewards.append(episode_reward) | |
| if best_episode_score < best_mesh_score: | |
| best_mesh = copy.deepcopy(best_episode_mesh) | |
| best_mesh_score = best_episode_score | |
| log_lines.append(f"Episode {episode + 1}/{n_episodes}: steps={step}, reward={episode_reward:.2f}, normalized_return={normalized_return:.3f}") | |
| env.close() | |
| metrics = { | |
| "score_initial": mesh_init_score, | |
| "score_final": best_mesh_score, | |
| "score_ideal": mesh_ideal_score, | |
| "avg_normalized_return": float(np.mean(normalized_returns)), | |
| "std_normalized_return": float(np.std(normalized_returns)), | |
| "avg_reward": float(np.mean(total_rewards)), | |
| "n_episodes": n_episodes | |
| } | |
| log_lines.append(f"\nπ Summary:") | |
| log_lines.append(f" - Initial score: {mesh_init_score:.0f}") | |
| log_lines.append(f" - Final score: {best_mesh_score:.0f}") | |
| log_lines.append(f" - Ideal score: {mesh_ideal_score:.0f}") | |
| log_lines.append(f" - Mean normalized return: {metrics['avg_normalized_return']:.3f} Β± {metrics['std_normalized_return']:.3f}") | |
| return best_mesh, "\n".join(log_lines), metrics | |
| # Example meshes available | |
| EXAMPLE_MESHES = { | |
| "π° Bunny": "mesh_files_examples/bunny.msh", | |
| "β Star": "mesh_files_examples/star.msh", | |
| "β Cup of coffee": "mesh_files_examples/cup.msh", | |
| } | |
| def process_mesh_file(mesh_choice, mesh_file, n_episodes, apply_smoothing): | |
| """ | |
| Process a .msh file with the RL agent. | |
| mesh_choice: selected example mesh or "Upload custom file" | |
| mesh_file: uploaded file (used only if mesh_choice is "Upload custom file") | |
| """ | |
| # Determine mesh path | |
| if mesh_choice and mesh_choice != "π Upload custom file": | |
| mesh_path = EXAMPLE_MESHES.get(mesh_choice) | |
| if not mesh_path or not os.path.exists(mesh_path): | |
| return None, None, f"β Example mesh not found: {mesh_choice}" | |
| elif mesh_file: | |
| if isinstance(mesh_file, str): | |
| mesh_path = mesh_file | |
| elif hasattr(mesh_file, "name"): | |
| mesh_path = mesh_file.name | |
| else: | |
| return None, None, f"β Unsupported mesh_file type: {type(mesh_file)}" | |
| else: | |
| return None, None, "β Please select an example mesh or upload a .msh file" | |
| # Convert n_episodes to int, apply_smoothing is already a bool | |
| n_episodes = int(n_episodes) | |
| do_smoothing = apply_smoothing | |
| try: | |
| logger.info(f"Reading file: {mesh_path}") | |
| mesh = read_gmsh(mesh_path) | |
| ma_initial = QuadMeshTopoAnalysis(mesh) | |
| _, initial_score, ideal_score = ma_initial.global_score() | |
| log_output = f"β Mesh file loaded\n" | |
| log_output += f"π Number of nodes: {len([n for n in mesh.nodes if n[2] >= 0])}\n" | |
| log_output += f"π Initial score: {initial_score:.0f} (ideal: {ideal_score:.0f})\n\n" | |
| image_initial = mesh_to_image( | |
| mesh, | |
| title=f"Initial Mesh (score: {initial_score:.0f})", | |
| scores=True | |
| ) | |
| if get_model() is None: | |
| return image_initial, None, log_output + "β RL model not available" | |
| best_mesh, eval_log, metrics = evaluate_mesh_with_agent( | |
| mesh, | |
| n_episodes=n_episodes, | |
| max_steps=300 | |
| ) | |
| log_output += eval_log | |
| # Apply smoothing if requested | |
| if do_smoothing: | |
| log_output += "\n\nπ Applying smoothing..." | |
| smoothing_mean(best_mesh) | |
| log_output += " β " | |
| ma_final = QuadMeshTopoAnalysis(best_mesh) | |
| _, final_score, _ = ma_final.global_score() | |
| image_final = mesh_to_image( | |
| best_mesh, | |
| title=f"Optimized Mesh (score: {final_score:.0f})", | |
| scores=True | |
| ) | |
| improvement = ((initial_score - final_score) / initial_score * 100) if initial_score > 0 else 0 | |
| log_output += f"\n\nπ― Improvement: {improvement:.1f}%" | |
| return image_initial, image_final, log_output | |
| except Exception as e: | |
| logger.error(f"Error during processing: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None, None, f"β Error: {str(e)}" | |
| # Interface Gradio avec gr.Blocks | |
| with gr.Blocks( | |
| title="QuadOpt-RL - RL Mesh Optimization", | |
| theme=gr.themes.Soft() | |
| ) as demo: | |
| gr.Markdown(""" | |
| # QuadOpt-RL β 2D Quadrangular Mesh Optimization | |
| This application uses a pre-trained **Reinforcement Learning (PPO)** agent | |
| to topologically optimize quadrangular meshes. | |
| > **Note:** Only 2D quadrangular meshes are supported (no triangles or mixed elements). For best results, use TQuad meshes (meshes where each triangle has been split into three quads). | |
| ### π How to use | |
| 1. Select an example mesh (Bunny, Star, Cup of coffee) **or** upload your own `.msh` file | |
| 2. Choose the number of evaluation episodes | |
| 3. Choose whether to apply geometric smoothing after optimization | |
| 4. Click **Optimize mesh** | |
| 5. View the results! | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π€ Configuration") | |
| mesh_choice = gr.Dropdown( | |
| choices=["π Upload custom file"] + list(EXAMPLE_MESHES.keys()), | |
| value="π Upload custom file", | |
| label="Select a mesh" | |
| ) | |
| mesh_input = gr.File( | |
| label=".msh file (Gmsh)", | |
| file_types=[".msh"], | |
| type="filepath", | |
| visible=True | |
| ) | |
| n_episodes_input = gr.Slider( | |
| minimum=1, | |
| maximum=50, | |
| value=5, | |
| step=1, | |
| label="Number of evaluation episodes" | |
| ) | |
| smoothing_input = gr.Checkbox( | |
| value=True, | |
| label="Apply geometric smoothing" | |
| ) | |
| submit_btn = gr.Button( | |
| "π Optimize mesh", | |
| variant="primary", | |
| size="lg", | |
| interactive=False | |
| ) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π Results") | |
| with gr.Row(): | |
| image_before = gr.Image(label="π· Initial Mesh", type="pil") | |
| image_after = gr.Image(label="π’ Optimized Mesh", type="pil") | |
| with gr.Row(): | |
| log_output = gr.Textbox( | |
| label="π Execution logs", | |
| lines=15, | |
| interactive=False | |
| ) | |
| # Show/hide file upload based on mesh choice | |
| def update_file_visibility(choice): | |
| return gr.update(visible=(choice == "π Upload custom file")) | |
| mesh_choice.change( | |
| fn=update_file_visibility, | |
| inputs=mesh_choice, | |
| outputs=mesh_input | |
| ) | |
| # Enable button when an example is selected or a file is uploaded | |
| def update_button_state(choice, file): | |
| if choice != "π Upload custom file": | |
| return gr.update(interactive=True) | |
| return gr.update(interactive=bool(file)) | |
| mesh_choice.change( | |
| fn=update_button_state, | |
| inputs=[mesh_choice, mesh_input], | |
| outputs=submit_btn | |
| ) | |
| mesh_input.change( | |
| fn=update_button_state, | |
| inputs=[mesh_choice, mesh_input], | |
| outputs=submit_btn | |
| ) | |
| # Button connection | |
| submit_btn.click( | |
| fn=process_mesh_file, | |
| inputs=[mesh_choice, mesh_input, n_episodes_input, smoothing_input], | |
| outputs=[image_before, image_after, log_output] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### π¬ About | |
| - **Agent**: pre-trained PPO (Proximal Policy Optimization) from Stable-Baselines3 | |
| - **Environment**: Custom Gymnasium for quadrangular mesh optimization | |
| - **Available actions**: Flip clockwise, Flip counterclockwise, Split, Collapse | |
| - **Observation**: Local topological view of the mesh | |
| The score represents the sum of topological irregularities. Lower is better. | |
| The ideal score is when all internal nodes have degree 4. | |
| """) | |
| # Alias for Hugging Face Spaces | |
| app = demo | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |