Spaces:
Runtime error
Runtime error
| """Export functions for VTK, MP4, CSV, PNG, and HTML output.""" | |
| from __future__ import annotations | |
| import base64 | |
| import tempfile | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import TYPE_CHECKING | |
| import numpy as np | |
| import pyvista as pv | |
| import plotly.graph_objects as go | |
| from .state import state, _server | |
| from .globals import ( | |
| plotter, simulation_data, current_mesh, | |
| surface_clims, data_frames, X_grids, Y_grids, z_scale, snapshot_times, | |
| qpu_ts_cache, sim_ts_cache, | |
| ) | |
| if TYPE_CHECKING: | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # Surface / VTK Exports | |
| # --------------------------------------------------------------------------- | |
| def export_vtk(): | |
| """Export current surface mesh to user's Downloads as .vtp and notify via snackbar.""" | |
| if current_mesh is None: | |
| state.export_status_message = "No mesh to export." | |
| state.show_export_status = True | |
| return | |
| try: | |
| field = state.surface_field or "Ez" | |
| nx = int(state.nx or 16) | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"surface_{field}_nx{nx}_{suffix}.vtp" | |
| # Write to a temporary file first | |
| with tempfile.NamedTemporaryFile(suffix=".vtp", delete=False) as tmp: | |
| current_mesh.save(tmp.name) | |
| content = Path(tmp.name).read_bytes() | |
| # Encode content as base64 for browser download | |
| content_b64 = base64.b64encode(content).decode("ascii") | |
| # Trigger browser download via JavaScript | |
| if _server is not None: | |
| _server.js_call("utils", "download", filename, f"data:application/octet-stream;base64,{content_b64}") | |
| Path(tmp.name).unlink() # Clean up | |
| state.export_status_message = f"Exported VTK to {filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| state.show_export_status = True | |
| def export_vtk_all_frames(): | |
| """Export a .vtp file for each time frame of the selected component into a zip file.""" | |
| try: | |
| if not state.simulation_has_run: | |
| raise ValueError("Run a simulation before exporting all frames.") | |
| field = state.surface_field or "Ez" | |
| frames = data_frames.get(field) | |
| if not frames: | |
| raise ValueError(f"No frames available for {field}.") | |
| if snapshot_times is None: | |
| raise ValueError("Snapshot times are unavailable.") | |
| nx = int(state.nx or 16) | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| zip_filename = f"vtk_sequence_{field}_nx{nx}_{suffix}.zip" | |
| import zipfile | |
| with tempfile.NamedTemporaryFile(suffix=".zip", delete=False) as tmp_zip: | |
| temp_zip_path = tmp_zip.name | |
| with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zf: | |
| times = np.asarray(snapshot_times) | |
| for i, (z_data, t) in enumerate(zip(frames, times)): | |
| points = np.c_[X_grids[field].ravel(), Y_grids[field].ravel(), z_data.ravel() * z_scale] | |
| poly = pv.PolyData(points) | |
| mesh = poly.delaunay_2d() | |
| mesh["scalars"] = z_data.ravel() | |
| fname = f"{field}_frame_{i:04d}_t{t:.3f}s.vtp" | |
| with tempfile.NamedTemporaryFile(suffix=".vtp", delete=False) as tmp_vtp: | |
| tmp_vtp_path = tmp_vtp.name | |
| mesh.save(tmp_vtp_path) | |
| zf.write(tmp_vtp_path, arcname=fname) | |
| Path(tmp_vtp_path).unlink() | |
| content = Path(temp_zip_path).read_bytes() | |
| content_b64 = base64.b64encode(content).decode("ascii") | |
| if _server is not None: | |
| _server.js_call("utils", "download", zip_filename, f"data:application/zip;base64,{content_b64}") | |
| Path(temp_zip_path).unlink() | |
| state.export_status_message = f"Exported {len(frames)} frames to {zip_filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| finally: | |
| state.show_export_status = True | |
| def export_mp4(): | |
| """Export the surface plot time slider animation to MP4 using a dedicated off-screen plotter.""" | |
| try: | |
| if not state.simulation_has_run: | |
| raise ValueError("Run a simulation before exporting MP4.") | |
| field = state.surface_field or "Ez" | |
| frames = data_frames.get(field) | |
| if not frames: | |
| raise ValueError(f"No frames available for {field}.") | |
| if len(frames) < 2: | |
| raise ValueError("Only one frame available; increase T or simulation steps.") | |
| nx = int(state.nx or 16) | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"surface_anim_{field}_nx{nx}_{suffix}.mp4" | |
| # Create a temporary file for the MP4 | |
| with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp: | |
| temp_path = tmp.name | |
| # Build with a dedicated off-screen plotter at a macro-block friendly size | |
| movie_plotter = pv.Plotter(off_screen=True, window_size=(1280, 720)) | |
| # Initial mesh from first frame | |
| X = X_grids.get(field) | |
| Y = Y_grids.get(field) | |
| if X is None or Y is None: | |
| raise ValueError(f"Grid data not available for {field}.") | |
| first = frames[0] | |
| points = np.c_[X.ravel(), Y.ravel(), first.ravel() * z_scale] | |
| poly = pv.PolyData(points) | |
| mesh = poly.delaunay_2d() | |
| mesh['scalars'] = first.ravel() | |
| actor = movie_plotter.add_mesh( | |
| mesh, | |
| scalars='scalars', | |
| clim=surface_clims.get(field, (-1, 1)), | |
| cmap="RdBu", | |
| show_scalar_bar=False, | |
| show_edges=True, | |
| edge_color='grey', | |
| line_width=0.5, | |
| ) | |
| movie_plotter.add_axes() | |
| # Use similar camera if available, else default | |
| try: | |
| if hasattr(plotter, 'camera_position') and plotter.camera_position: | |
| movie_plotter.camera_position = plotter.camera_position | |
| else: | |
| movie_plotter.view_isometric() | |
| except Exception: | |
| movie_plotter.view_isometric() | |
| movie_plotter.open_movie(temp_path, framerate=20) | |
| for z_data in frames: | |
| if mesh.n_points != z_data.size: | |
| # Rebuild mesh if topology changes (unlikely here) | |
| points = np.c_[X.ravel(), Y.ravel(), z_data.ravel() * z_scale] | |
| poly = pv.PolyData(points) | |
| mesh = poly.delaunay_2d() | |
| mesh['scalars'] = z_data.ravel() | |
| movie_plotter.clear() | |
| actor = movie_plotter.add_mesh( | |
| mesh, | |
| scalars='scalars', | |
| clim=surface_clims.get(field, (-1, 1)), | |
| cmap="RdBu", | |
| show_scalar_bar=False, | |
| show_edges=True, | |
| edge_color='grey', | |
| line_width=0.5, | |
| ) | |
| else: | |
| mesh.points[:, 2] = z_data.ravel() * z_scale | |
| mesh['scalars'] = z_data.ravel() | |
| movie_plotter.render() | |
| movie_plotter.write_frame() | |
| movie_plotter.close() | |
| # Read the file content and trigger download | |
| content = Path(temp_path).read_bytes() | |
| content_b64 = base64.b64encode(content).decode("ascii") | |
| if _server is not None: | |
| _server.js_call("utils", "download", filename, f"data:video/mp4;base64,{content_b64}") | |
| Path(temp_path).unlink() # Clean up | |
| state.export_status_message = f"Exported MP4 to {filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| finally: | |
| state.show_export_status = True | |
| # --------------------------------------------------------------------------- | |
| # Simulator Time-Series Exports | |
| # --------------------------------------------------------------------------- | |
| def export_sim_timeseries_csv(): | |
| """Export Simulator time-series to CSV.""" | |
| try: | |
| times = sim_ts_cache.get("times") | |
| series_map = sim_ts_cache.get("series_map") | |
| if not times or not series_map: | |
| state.export_status_message = "No Simulator time series data to export." | |
| state.show_export_status = True | |
| return | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"sim_timeseries_{suffix}.csv" | |
| # Build CSV content | |
| lines = ["time"] | |
| keys = list(series_map.keys()) | |
| for k in keys: | |
| field, px, py = k | |
| lines[0] += f",{field}_({px},{py})" | |
| for i, t in enumerate(times): | |
| row = [str(t)] | |
| for k in keys: | |
| vals = series_map.get(k, []) | |
| row.append(str(vals[i]) if i < len(vals) else "") | |
| lines.append(",".join(row)) | |
| content = "\n".join(lines) | |
| content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") | |
| if _server is not None: | |
| _server.js_call("utils", "download", filename, f"data:text/csv;base64,{content_b64}") | |
| state.export_status_message = f"Exported CSV to {filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| finally: | |
| state.show_export_status = True | |
| def export_sim_timeseries_png(): | |
| """Export Simulator time-series plot to PNG.""" | |
| try: | |
| fig = sim_ts_cache.get("figure") | |
| if fig is None: | |
| state.export_status_message = "No Simulator time series figure to export." | |
| state.show_export_status = True | |
| return | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"sim_timeseries_{suffix}.png" | |
| # Export to PNG | |
| content = fig.to_image(format="png", width=1200, height=800) | |
| content_b64 = base64.b64encode(content).decode("ascii") | |
| if _server is not None: | |
| _server.js_call("utils", "download", filename, f"data:image/png;base64,{content_b64}") | |
| state.export_status_message = f"Exported PNG to {filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| finally: | |
| state.show_export_status = True | |
| def export_sim_timeseries_html(): | |
| """Export Simulator time-series plot to interactive HTML.""" | |
| try: | |
| fig = sim_ts_cache.get("figure") | |
| if fig is None: | |
| state.export_status_message = "No Simulator time series figure to export." | |
| state.show_export_status = True | |
| return | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"sim_timeseries_{suffix}.html" | |
| # Export to HTML | |
| content = fig.to_html(include_plotlyjs="cdn", full_html=True) | |
| content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") | |
| if _server is not None: | |
| _server.js_call("utils", "download", filename, f"data:text/html;base64,{content_b64}") | |
| state.export_status_message = f"Exported HTML to {filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| finally: | |
| state.show_export_status = True | |
| # --------------------------------------------------------------------------- | |
| # QPU Time-Series Exports | |
| # --------------------------------------------------------------------------- | |
| def export_qpu_timeseries_csv(): | |
| """Export QPU time-series to CSV.""" | |
| try: | |
| times = qpu_ts_cache.get("times") | |
| series_map = qpu_ts_cache.get("series_map") | |
| if not times or not series_map: | |
| state.export_status_message = "No QPU time series data to export." | |
| state.show_export_status = True | |
| return | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"qpu_timeseries_{suffix}.csv" | |
| # Build CSV content | |
| lines = ["time"] | |
| keys = list(series_map.keys()) | |
| for k in keys: | |
| field, px, py = k | |
| lines[0] += f",{field}_({px},{py})" | |
| for i, t in enumerate(times): | |
| row = [str(t)] | |
| for k in keys: | |
| vals = series_map.get(k, []) | |
| row.append(str(vals[i]) if i < len(vals) else "") | |
| lines.append(",".join(row)) | |
| content = "\n".join(lines) | |
| content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") | |
| if _server is not None: | |
| _server.js_call("utils", "download", filename, f"data:text/csv;base64,{content_b64}") | |
| state.export_status_message = f"Exported CSV to {filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| finally: | |
| state.show_export_status = True | |
| def export_qpu_timeseries_png(): | |
| """Export QPU time-series plot to PNG.""" | |
| try: | |
| fig = qpu_ts_cache.get("figure") | |
| if fig is None: | |
| state.export_status_message = "No QPU time series figure to export." | |
| state.show_export_status = True | |
| return | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"qpu_timeseries_{suffix}.png" | |
| # Export to PNG | |
| content = fig.to_image(format="png", width=1200, height=800) | |
| content_b64 = base64.b64encode(content).decode("ascii") | |
| if _server is not None: | |
| _server.js_call("utils", "download", filename, f"data:image/png;base64,{content_b64}") | |
| state.export_status_message = f"Exported PNG to {filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| finally: | |
| state.show_export_status = True | |
| def export_qpu_timeseries_html(): | |
| """Export QPU time-series plot to interactive HTML.""" | |
| try: | |
| fig = qpu_ts_cache.get("figure") | |
| if fig is None: | |
| state.export_status_message = "No QPU time series figure to export." | |
| state.show_export_status = True | |
| return | |
| suffix = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"qpu_timeseries_{suffix}.html" | |
| # Export to HTML | |
| content = fig.to_html(include_plotlyjs="cdn", full_html=True) | |
| content_b64 = base64.b64encode(content.encode("utf-8")).decode("ascii") | |
| if _server is not None: | |
| _server.js_call("utils", "download", filename, f"data:text/html;base64,{content_b64}") | |
| state.export_status_message = f"Exported HTML to {filename}" | |
| except Exception as e: | |
| state.export_status_message = f"Export failed: {e}" | |
| finally: | |
| state.show_export_status = True | |