"""Asynchronous job management for the review pipeline.""" from __future__ import annotations import asyncio import logging import uuid from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from pathlib import Path from typing import Any import numpy as np from config.settings import Settings from src.api.mesh_export import mesh_to_glb from src.geometry.tessellator import tessellate_faces from src.loader.assembly_tree import AssemblyNode, extract_assembly_tree, extract_assembly_from_shape from src.loader.step_loader import load_step logger = logging.getLogger(__name__) PIPELINE_STEPS = [ "loading", "assembly_tree", "tessellation", "classification", "mesh_export", ] @dataclass class PartMeshData: """Mesh data for a single part.""" part_id: str vertices: np.ndarray triangles: np.ndarray glb: bytes @dataclass class GroupInfo: """Info about a part group (exterior / interior).""" name: str part_ids: list[str] part_names: list[str] @dataclass class JobData: """Stores all data associated with a review job.""" job_id: str exterior_path: Path interior_path: Path status: str = "pending" # pending, running, completed, failed error: str | None = None # Step tracking steps: dict[str, str] = field(default_factory=dict) # Results assembly_tree: AssemblyNode | None = None part_meshes: dict[str, PartMeshData] = field(default_factory=dict) group_meshes: dict[str, PartMeshData] = field(default_factory=dict) groups: dict[str, GroupInfo] = field(default_factory=dict) exterior_step_info: Any = None interior_step_info: Any = None # SSE subscribers _listeners: list[asyncio.Queue] = field(default_factory=list) _loop: asyncio.AbstractEventLoop | None = None def __post_init__(self) -> None: for step in PIPELINE_STEPS: self.steps[step] = "pending" class JobManager: """Manages review jobs with thread-based execution and SSE streaming.""" def __init__(self, upload_dir: Path | None = None) -> None: self._jobs: dict[str, JobData] = {} self._executor = ThreadPoolExecutor(max_workers=2) self._upload_dir = upload_dir or Path("/tmp/cad-review-uploads") self._upload_dir.mkdir(parents=True, exist_ok=True) self._settings = Settings() @property def upload_dir(self) -> Path: return self._upload_dir @property def settings(self) -> Settings: return self._settings def create_job(self, exterior_path: Path, interior_path: Path) -> JobData: """Create a new review job with exterior and interior STEP files.""" job_id = uuid.uuid4().hex[:12] job = JobData(job_id=job_id, exterior_path=exterior_path, interior_path=interior_path) self._jobs[job_id] = job return job def get_job(self, job_id: str) -> JobData | None: return self._jobs.get(job_id) def subscribe(self, job: JobData) -> asyncio.Queue: queue: asyncio.Queue = asyncio.Queue() job._listeners.append(queue) return queue def unsubscribe(self, job: JobData, queue: asyncio.Queue) -> None: if queue in job._listeners: job._listeners.remove(queue) def _emit(self, job: JobData, event: dict) -> None: loop = job._loop if loop is None: return for q in job._listeners: loop.call_soon_threadsafe(q.put_nowait, event) def _emit_step(self, job: JobData, step: str, status: str, pct: int = 0, **kwargs) -> None: job.steps[step] = status event = {"step": step, "status": status, "progress_pct": pct, **kwargs} self._emit(job, event) def start_job(self, job: JobData, loop: asyncio.AbstractEventLoop) -> None: """Start job execution in a background thread.""" job.status = "running" job._loop = loop self._executor.submit(self._run_pipeline, job) def _extract_tree(self, step_info, id_prefix: str = "") -> AssemblyNode: """Extract assembly tree from a loaded STEP file, trying XDE first.""" tree = None if step_info.shape_tool is not None: try: tree = extract_assembly_tree(step_info.shape_tool) except Exception as e: logger.warning("XDE tree extraction failed: %s, falling back to shape topology", e) if tree is None: tree = extract_assembly_from_shape( step_info.shape, step_info.path.stem, reader=step_info.reader, doc=step_info.doc, step_path=step_info.path, id_prefix=id_prefix, ) return tree def _run_pipeline(self, job: JobData) -> None: """Execute the review pipeline in a worker thread.""" try: settings = self._settings # Step 1: Load both STEP files self._emit_step(job, "loading", "running") exterior_info = load_step(job.exterior_path) job.exterior_step_info = exterior_info self._emit_step(job, "loading", "running", 50) interior_info = load_step(job.interior_path) job.interior_step_info = interior_info self._emit_step(job, "loading", "completed", 100) # Step 2: Extract assembly trees and build combined root self._emit_step(job, "assembly_tree", "running") exterior_tree = self._extract_tree(exterior_info, id_prefix="ext_") exterior_tree.name = f"Exterior - {exterior_tree.name}" interior_tree = self._extract_tree(interior_info, id_prefix="int_") interior_tree.name = f"Interior - {interior_tree.name}" # Tag all leaves with their classification for leaf in exterior_tree.iter_leaves(): leaf.classification = "exterior" for leaf in interior_tree.iter_leaves(): leaf.classification = "interior" # Build combined root tree = AssemblyNode( id="root", name="Combined Assembly", is_assembly=True, children=[exterior_tree, interior_tree], ) job.assembly_tree = tree self._emit_step(job, "assembly_tree", "completed", 100) # Step 3: Tessellate each leaf part self._emit_step(job, "tessellation", "running") leaves = list(tree.iter_leaves()) total_leaves = len(leaves) for idx, leaf in enumerate(leaves): if leaf.shape is None or leaf.shape.IsNull(): continue try: verts, tris = tessellate_faces( leaf.shape, settings.tessellation, target_tolerance_mm=settings.tolerance.g0_position_mm, ) leaf.num_faces = len(tris) pct = int((idx + 1) / total_leaves * 100) self._emit_step(job, "tessellation", "running", pct) # Store mesh data job.part_meshes[leaf.id] = PartMeshData( part_id=leaf.id, vertices=verts, triangles=tris, glb=b"", # Generate later ) except Exception as e: logger.warning("Failed to tessellate part '%s': %s", leaf.name, e) self._emit_step(job, "tessellation", "completed", 100) # Step 4: Classification (by file origin - already tagged above) self._emit_step(job, "classification", "running") for group_name, subtree in [("exterior", exterior_tree), ("interior", interior_tree)]: group_leaves = list(subtree.iter_leaves()) job.groups[group_name] = GroupInfo( name=group_name, part_ids=[leaf.id for leaf in group_leaves if leaf.shape is not None], part_names=[leaf.name for leaf in group_leaves if leaf.shape is not None], ) self._emit_step(job, "classification", "completed", 100) # Step 5: Export meshes to GLB (individual + combined groups) self._emit_step(job, "mesh_export", "running") for part_id, mesh_data in job.part_meshes.items(): try: mesh_data.glb = mesh_to_glb(mesh_data.vertices, mesh_data.triangles) except Exception as e: logger.warning("Failed to export GLB for part %s: %s", part_id, e) # Create combined group meshes for group_name, group_info in job.groups.items(): group_verts_list = [] group_tris_list = [] vert_offset = 0 for pid in group_info.part_ids: md = job.part_meshes.get(pid) if md is None: continue group_verts_list.append(md.vertices) group_tris_list.append(md.triangles + vert_offset) vert_offset += len(md.vertices) if group_verts_list: combined_verts = np.concatenate(group_verts_list, axis=0) combined_tris = np.concatenate(group_tris_list, axis=0) try: combined_glb = mesh_to_glb(combined_verts, combined_tris) job.group_meshes[group_name] = PartMeshData( part_id=f"{group_name}_combined", vertices=combined_verts, triangles=combined_tris, glb=combined_glb, ) except Exception as e: logger.warning("Failed to create combined GLB for %s: %s", group_name, e) self._emit_step(job, "mesh_export", "completed", 100, overall_status="completed") job.status = "completed" except Exception as e: logger.exception("Job %s failed", job.job_id) job.status = "failed" job.error = str(e) self._emit(job, { "step": "error", "status": "failed", "error": str(e), "overall_status": "failed", })