"""API endpoint definitions for the CAD review tool.""" from __future__ import annotations import asyncio import json import logging from pathlib import Path from fastapi import APIRouter, HTTPException, UploadFile, File from fastapi.responses import FileResponse, Response, StreamingResponse from src.api.job_manager import JobManager, PIPELINE_STEPS from src.api.samples import discover_samples, get_sample from src.api.schemas import ( AssemblyNodeResponse, AssemblyTreeResponse, ComplianceResponse, ComplianceResultResponse, ComplianceRuleResponse, DistanceMeasurementResponse, DistanceRequest, GroupInfoResponse, JobStatus, ProximityPairResponse, ProximityResponse, SampleFileResponse, SamplePairResponse, StepProgress, UploadResponse, ) from src.compliance.kmvss_checker import check_compliance from src.compliance.rule_loader import load_rules from src.geometry.measurement import measure_distance, scan_proximity logger = logging.getLogger(__name__) router = APIRouter(prefix="/api") # Singleton job manager _manager: JobManager | None = None def get_manager() -> JobManager: global _manager if _manager is None: _manager = JobManager() return _manager def _node_to_response(node_dict: dict) -> AssemblyNodeResponse: """Convert assembly node dict to response model.""" children = [_node_to_response(c) for c in node_dict.get("children", [])] return AssemblyNodeResponse( id=node_dict["id"], name=node_dict["name"], is_assembly=node_dict["is_assembly"], is_leaf=node_dict["is_leaf"], classification=node_dict.get("classification", "unknown"), num_faces=node_dict.get("num_faces", 0), num_solids=node_dict.get("num_solids", 0), bounding_box=node_dict.get("bounding_box"), children=children, ) @router.post("/upload", response_model=UploadResponse, status_code=201) async def upload_files( exterior: UploadFile = File(..., description="Exterior (design) STEP file"), interior: UploadFile = File(..., description="Interior (engineering) STEP file"), ) -> UploadResponse: """Upload exterior and interior STEP files and create a review job.""" manager = get_manager() for label, f in [("Exterior", exterior), ("Interior", interior)]: if f.filename and not f.filename.lower().endswith((".stp", ".step")): raise HTTPException(400, f"Invalid {label} file type: {f.filename}. Expected .stp or .step") exterior_path = manager.upload_dir / (exterior.filename or "exterior.step") exterior_content = await exterior.read() exterior_path.write_bytes(exterior_content) interior_path = manager.upload_dir / (interior.filename or "interior.step") interior_content = await interior.read() interior_path.write_bytes(interior_content) job = manager.create_job(exterior_path, interior_path) loop = asyncio.get_event_loop() manager.start_job(job, loop) return UploadResponse( job_id=job.job_id, status=job.status, exterior_filename=exterior.filename or "exterior.step", interior_filename=interior.filename or "interior.step", ) @router.get("/samples", response_model=list[SamplePairResponse]) async def list_samples() -> list[SamplePairResponse]: """List available sample STEP pairs discoverable in the samples directory.""" manager = get_manager() pairs = discover_samples(manager.settings.samples_dir) return [ SamplePairResponse( id=p.id, name=p.name, exterior=SampleFileResponse(filename=p.exterior.filename, size=p.exterior.size), interior=SampleFileResponse(filename=p.interior.filename, size=p.interior.size), ) for p in pairs ] @router.get("/samples/{sample_id}/files/{kind}") async def download_sample_file(sample_id: str, kind: str) -> FileResponse: """Download a sample STEP file (kind: 'exterior' or 'interior').""" if kind not in ("exterior", "interior"): raise HTTPException(400, f"Invalid kind: {kind}. Must be 'exterior' or 'interior'.") manager = get_manager() pair = get_sample(manager.settings.samples_dir, sample_id) if pair is None: raise HTTPException(404, f"Sample not found: {sample_id}") file = pair.exterior if kind == "exterior" else pair.interior return FileResponse( path=file.path, filename=file.filename, media_type="application/octet-stream", ) @router.post("/samples/{sample_id}/load", response_model=UploadResponse, status_code=201) async def load_sample(sample_id: str) -> UploadResponse: """Start a review job using a server-side sample pair (no upload required).""" manager = get_manager() pair = get_sample(manager.settings.samples_dir, sample_id) if pair is None: raise HTTPException(404, f"Sample not found: {sample_id}") job = manager.create_job(pair.exterior.path, pair.interior.path) loop = asyncio.get_event_loop() manager.start_job(job, loop) return UploadResponse( job_id=job.job_id, status=job.status, exterior_filename=pair.exterior.filename, interior_filename=pair.interior.filename, ) @router.get("/jobs/{job_id}", response_model=JobStatus) async def get_job_status(job_id: str) -> JobStatus: """Get the current status of a job.""" manager = get_manager() job = manager.get_job(job_id) if job is None: raise HTTPException(404, f"Job not found: {job_id}") steps = [ StepProgress(step=name, status=job.steps.get(name, "pending")) for name in PIPELINE_STEPS ] return JobStatus( job_id=job.job_id, status=job.status, steps=steps, error=job.error, ) @router.get("/jobs/{job_id}/events") async def job_events(job_id: str) -> StreamingResponse: """SSE stream of pipeline progress events.""" manager = get_manager() job = manager.get_job(job_id) if job is None: raise HTTPException(404, f"Job not found: {job_id}") queue = manager.subscribe(job) async def event_stream(): # Initial comment forces the client to see headers and the first byte # immediately, which defeats reverse-proxy buffering (HF Spaces, nginx). yield ": connected\n\n" try: while True: try: event = await asyncio.wait_for(queue.get(), timeout=10) except asyncio.TimeoutError: # Periodic keep-alive ping so the proxy flushes the buffer. yield ": keep-alive\n\n" continue yield f"data: {json.dumps(event)}\n\n" if event.get("overall_status") in ("completed", "failed"): break finally: manager.unsubscribe(job, queue) return StreamingResponse( event_stream(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache, no-transform", "X-Accel-Buffering": "no", "Connection": "keep-alive", }, ) @router.get("/jobs/{job_id}/assembly-tree", response_model=AssemblyTreeResponse) async def get_assembly_tree(job_id: str) -> AssemblyTreeResponse: """Get the assembly tree for a job.""" manager = get_manager() job = manager.get_job(job_id) if job is None: raise HTTPException(404, f"Job not found: {job_id}") if job.assembly_tree is None: raise HTTPException(409, "Assembly tree not available yet") tree_dict = job.assembly_tree.to_dict() root = _node_to_response(tree_dict) groups = [ GroupInfoResponse( name=g.name, part_count=len(g.part_ids), part_ids=g.part_ids, part_names=g.part_names, ) for g in job.groups.values() ] return AssemblyTreeResponse(root=root, groups=groups) @router.get("/jobs/{job_id}/parts/{part_id}/mesh") async def get_part_mesh(job_id: str, part_id: str) -> Response: """Get the GLB mesh for a specific part.""" manager = get_manager() job = manager.get_job(job_id) if job is None: raise HTTPException(404, f"Job not found: {job_id}") mesh_data = job.part_meshes.get(part_id) if mesh_data is None: raise HTTPException(404, f"Part mesh not found: {part_id}") if not mesh_data.glb: raise HTTPException(409, "Mesh not exported yet") return Response(content=mesh_data.glb, media_type="model/gltf-binary") @router.get("/jobs/{job_id}/groups/{group}/mesh") async def get_group_mesh(job_id: str, group: str) -> Response: """Get the combined GLB mesh for a group (exterior or interior).""" manager = get_manager() job = manager.get_job(job_id) if job is None: raise HTTPException(404, f"Job not found: {job_id}") if group not in ("exterior", "interior"): raise HTTPException(400, f"Invalid group: {group}. Must be 'exterior' or 'interior'.") mesh_data = job.group_meshes.get(group) if mesh_data is None: raise HTTPException(404, f"Group mesh not found: {group}") if not mesh_data.glb: raise HTTPException(409, "Group mesh not exported yet") return Response(content=mesh_data.glb, media_type="model/gltf-binary") @router.post("/jobs/{job_id}/analyze/proximity", response_model=ProximityResponse) async def analyze_proximity(job_id: str) -> ProximityResponse: """Run proximity/collision analysis on all parts.""" manager = get_manager() job = manager.get_job(job_id) if job is None: raise HTTPException(404, f"Job not found: {job_id}") if job.assembly_tree is None: raise HTTPException(409, "Assembly tree not available yet") parts = [] for leaf in job.assembly_tree.iter_leaves(): if leaf.shape is not None and not leaf.shape.IsNull(): parts.append({"id": leaf.id, "name": leaf.name, "shape": leaf.shape}) results = scan_proximity( parts, collision_threshold_mm=manager.settings.tolerance.gap_tolerance_mm, near_threshold_mm=5.0, ) pairs = [ ProximityPairResponse( part_a_id=r.part_a_id, part_a_name=r.part_a_name, part_b_id=r.part_b_id, part_b_name=r.part_b_name, min_distance_mm=r.min_distance_mm, point_a=r.point_a, point_b=r.point_b, status=r.status, ) for r in results ] return ProximityResponse( pairs=pairs, collision_count=sum(1 for p in pairs if p.status == "collision"), near_count=sum(1 for p in pairs if p.status == "near"), ) @router.post("/jobs/{job_id}/analyze/distance", response_model=DistanceMeasurementResponse) async def analyze_distance(job_id: str, request: DistanceRequest) -> DistanceMeasurementResponse: """Measure distance between two specific parts.""" manager = get_manager() job = manager.get_job(job_id) if job is None: raise HTTPException(404, f"Job not found: {job_id}") if job.assembly_tree is None: raise HTTPException(409, "Assembly tree not available yet") node_a = job.assembly_tree.find_by_id(request.part_a_id) node_b = job.assembly_tree.find_by_id(request.part_b_id) if node_a is None or node_a.shape is None: raise HTTPException(404, f"Part not found: {request.part_a_id}") if node_b is None or node_b.shape is None: raise HTTPException(404, f"Part not found: {request.part_b_id}") result = measure_distance(node_a.shape, node_b.shape) return DistanceMeasurementResponse( distance_mm=result.distance_mm, point_a=result.point_a, point_b=result.point_b, ) @router.get("/compliance/rules") async def get_compliance_rules() -> list[ComplianceRuleResponse]: """Get available compliance rules.""" rules_path = Path(__file__).parent.parent.parent / "config" / "kmvss_rules.yaml" rules = load_rules(rules_path) return [ ComplianceRuleResponse( id=r.id, name=r.name, description=r.description, type=r.type, severity=r.severity, ) for r in rules ] @router.post("/jobs/{job_id}/analyze/compliance", response_model=ComplianceResponse) async def analyze_compliance(job_id: str) -> ComplianceResponse: """Run compliance checks on the assembly.""" manager = get_manager() job = manager.get_job(job_id) if job is None: raise HTTPException(404, f"Job not found: {job_id}") if job.assembly_tree is None: raise HTTPException(409, "Assembly tree not available yet") rules_path = Path(__file__).parent.parent.parent / "config" / "kmvss_rules.yaml" rules = load_rules(rules_path) results = check_compliance(rules, job.assembly_tree) return ComplianceResponse( results=[ ComplianceResultResponse( rule_id=r.rule_id, rule_name=r.rule_name, passed=r.passed, severity=r.severity, measured_value=r.measured_value, threshold_value=r.threshold_value, unit=r.unit, message=r.message, affected_parts=r.affected_parts, ) for r in results ], pass_count=sum(1 for r in results if r.passed), fail_count=sum(1 for r in results if not r.passed), )