Spaces:
Sleeping
Sleeping
| """Asynchronous job management for the review pipeline.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import logging | |
| import uuid | |
| from concurrent.futures import ThreadPoolExecutor | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| from config.settings import Settings | |
| from src.api.mesh_export import mesh_to_glb | |
| from src.geometry.tessellator import tessellate_faces | |
| from src.loader.assembly_tree import AssemblyNode, extract_assembly_tree, extract_assembly_from_shape | |
| from src.loader.step_loader import load_step | |
| logger = logging.getLogger(__name__) | |
| PIPELINE_STEPS = [ | |
| "loading", | |
| "assembly_tree", | |
| "tessellation", | |
| "classification", | |
| "mesh_export", | |
| ] | |
| class PartMeshData: | |
| """Mesh data for a single part.""" | |
| part_id: str | |
| vertices: np.ndarray | |
| triangles: np.ndarray | |
| glb: bytes | |
| class GroupInfo: | |
| """Info about a part group (exterior / interior).""" | |
| name: str | |
| part_ids: list[str] | |
| part_names: list[str] | |
| class JobData: | |
| """Stores all data associated with a review job.""" | |
| job_id: str | |
| exterior_path: Path | |
| interior_path: Path | |
| status: str = "pending" # pending, running, completed, failed | |
| error: str | None = None | |
| # Step tracking | |
| steps: dict[str, str] = field(default_factory=dict) | |
| # Results | |
| assembly_tree: AssemblyNode | None = None | |
| part_meshes: dict[str, PartMeshData] = field(default_factory=dict) | |
| group_meshes: dict[str, PartMeshData] = field(default_factory=dict) | |
| groups: dict[str, GroupInfo] = field(default_factory=dict) | |
| exterior_step_info: Any = None | |
| interior_step_info: Any = None | |
| # SSE subscribers | |
| _listeners: list[asyncio.Queue] = field(default_factory=list) | |
| _loop: asyncio.AbstractEventLoop | None = None | |
| def __post_init__(self) -> None: | |
| for step in PIPELINE_STEPS: | |
| self.steps[step] = "pending" | |
| class JobManager: | |
| """Manages review jobs with thread-based execution and SSE streaming.""" | |
| def __init__(self, upload_dir: Path | None = None) -> None: | |
| self._jobs: dict[str, JobData] = {} | |
| self._executor = ThreadPoolExecutor(max_workers=2) | |
| self._upload_dir = upload_dir or Path("/tmp/cad-review-uploads") | |
| self._upload_dir.mkdir(parents=True, exist_ok=True) | |
| self._settings = Settings() | |
| def upload_dir(self) -> Path: | |
| return self._upload_dir | |
| def settings(self) -> Settings: | |
| return self._settings | |
| def create_job(self, exterior_path: Path, interior_path: Path) -> JobData: | |
| """Create a new review job with exterior and interior STEP files.""" | |
| job_id = uuid.uuid4().hex[:12] | |
| job = JobData(job_id=job_id, exterior_path=exterior_path, interior_path=interior_path) | |
| self._jobs[job_id] = job | |
| return job | |
| def get_job(self, job_id: str) -> JobData | None: | |
| return self._jobs.get(job_id) | |
| def subscribe(self, job: JobData) -> asyncio.Queue: | |
| queue: asyncio.Queue = asyncio.Queue() | |
| job._listeners.append(queue) | |
| return queue | |
| def unsubscribe(self, job: JobData, queue: asyncio.Queue) -> None: | |
| if queue in job._listeners: | |
| job._listeners.remove(queue) | |
| def _emit(self, job: JobData, event: dict) -> None: | |
| loop = job._loop | |
| if loop is None: | |
| return | |
| for q in job._listeners: | |
| loop.call_soon_threadsafe(q.put_nowait, event) | |
| def _emit_step(self, job: JobData, step: str, status: str, pct: int = 0, **kwargs) -> None: | |
| job.steps[step] = status | |
| event = {"step": step, "status": status, "progress_pct": pct, **kwargs} | |
| self._emit(job, event) | |
| def start_job(self, job: JobData, loop: asyncio.AbstractEventLoop) -> None: | |
| """Start job execution in a background thread.""" | |
| job.status = "running" | |
| job._loop = loop | |
| self._executor.submit(self._run_pipeline, job) | |
| def _extract_tree(self, step_info, id_prefix: str = "") -> AssemblyNode: | |
| """Extract assembly tree from a loaded STEP file, trying XDE first.""" | |
| tree = None | |
| if step_info.shape_tool is not None: | |
| try: | |
| tree = extract_assembly_tree(step_info.shape_tool) | |
| except Exception as e: | |
| logger.warning("XDE tree extraction failed: %s, falling back to shape topology", e) | |
| if tree is None: | |
| tree = extract_assembly_from_shape( | |
| step_info.shape, step_info.path.stem, | |
| reader=step_info.reader, doc=step_info.doc, | |
| step_path=step_info.path, | |
| id_prefix=id_prefix, | |
| ) | |
| return tree | |
| def _run_pipeline(self, job: JobData) -> None: | |
| """Execute the review pipeline in a worker thread.""" | |
| try: | |
| settings = self._settings | |
| # Step 1: Load both STEP files | |
| self._emit_step(job, "loading", "running") | |
| exterior_info = load_step(job.exterior_path) | |
| job.exterior_step_info = exterior_info | |
| self._emit_step(job, "loading", "running", 50) | |
| interior_info = load_step(job.interior_path) | |
| job.interior_step_info = interior_info | |
| self._emit_step(job, "loading", "completed", 100) | |
| # Step 2: Extract assembly trees and build combined root | |
| self._emit_step(job, "assembly_tree", "running") | |
| exterior_tree = self._extract_tree(exterior_info, id_prefix="ext_") | |
| exterior_tree.name = f"Exterior - {exterior_tree.name}" | |
| interior_tree = self._extract_tree(interior_info, id_prefix="int_") | |
| interior_tree.name = f"Interior - {interior_tree.name}" | |
| # Tag all leaves with their classification | |
| for leaf in exterior_tree.iter_leaves(): | |
| leaf.classification = "exterior" | |
| for leaf in interior_tree.iter_leaves(): | |
| leaf.classification = "interior" | |
| # Build combined root | |
| tree = AssemblyNode( | |
| id="root", | |
| name="Combined Assembly", | |
| is_assembly=True, | |
| children=[exterior_tree, interior_tree], | |
| ) | |
| job.assembly_tree = tree | |
| self._emit_step(job, "assembly_tree", "completed", 100) | |
| # Step 3: Tessellate each leaf part | |
| self._emit_step(job, "tessellation", "running") | |
| leaves = list(tree.iter_leaves()) | |
| total_leaves = len(leaves) | |
| for idx, leaf in enumerate(leaves): | |
| if leaf.shape is None or leaf.shape.IsNull(): | |
| continue | |
| try: | |
| verts, tris = tessellate_faces( | |
| leaf.shape, settings.tessellation, | |
| target_tolerance_mm=settings.tolerance.g0_position_mm, | |
| ) | |
| leaf.num_faces = len(tris) | |
| pct = int((idx + 1) / total_leaves * 100) | |
| self._emit_step(job, "tessellation", "running", pct) | |
| # Store mesh data | |
| job.part_meshes[leaf.id] = PartMeshData( | |
| part_id=leaf.id, | |
| vertices=verts, | |
| triangles=tris, | |
| glb=b"", # Generate later | |
| ) | |
| except Exception as e: | |
| logger.warning("Failed to tessellate part '%s': %s", leaf.name, e) | |
| self._emit_step(job, "tessellation", "completed", 100) | |
| # Step 4: Classification (by file origin - already tagged above) | |
| self._emit_step(job, "classification", "running") | |
| for group_name, subtree in [("exterior", exterior_tree), ("interior", interior_tree)]: | |
| group_leaves = list(subtree.iter_leaves()) | |
| job.groups[group_name] = GroupInfo( | |
| name=group_name, | |
| part_ids=[leaf.id for leaf in group_leaves if leaf.shape is not None], | |
| part_names=[leaf.name for leaf in group_leaves if leaf.shape is not None], | |
| ) | |
| self._emit_step(job, "classification", "completed", 100) | |
| # Step 5: Export meshes to GLB (individual + combined groups) | |
| self._emit_step(job, "mesh_export", "running") | |
| for part_id, mesh_data in job.part_meshes.items(): | |
| try: | |
| mesh_data.glb = mesh_to_glb(mesh_data.vertices, mesh_data.triangles) | |
| except Exception as e: | |
| logger.warning("Failed to export GLB for part %s: %s", part_id, e) | |
| # Create combined group meshes | |
| for group_name, group_info in job.groups.items(): | |
| group_verts_list = [] | |
| group_tris_list = [] | |
| vert_offset = 0 | |
| for pid in group_info.part_ids: | |
| md = job.part_meshes.get(pid) | |
| if md is None: | |
| continue | |
| group_verts_list.append(md.vertices) | |
| group_tris_list.append(md.triangles + vert_offset) | |
| vert_offset += len(md.vertices) | |
| if group_verts_list: | |
| combined_verts = np.concatenate(group_verts_list, axis=0) | |
| combined_tris = np.concatenate(group_tris_list, axis=0) | |
| try: | |
| combined_glb = mesh_to_glb(combined_verts, combined_tris) | |
| job.group_meshes[group_name] = PartMeshData( | |
| part_id=f"{group_name}_combined", | |
| vertices=combined_verts, | |
| triangles=combined_tris, | |
| glb=combined_glb, | |
| ) | |
| except Exception as e: | |
| logger.warning("Failed to create combined GLB for %s: %s", group_name, e) | |
| self._emit_step(job, "mesh_export", "completed", 100, overall_status="completed") | |
| job.status = "completed" | |
| except Exception as e: | |
| logger.exception("Job %s failed", job.job_id) | |
| job.status = "failed" | |
| job.error = str(e) | |
| self._emit(job, { | |
| "step": "error", | |
| "status": "failed", | |
| "error": str(e), | |
| "overall_status": "failed", | |
| }) | |