Spaces:
Sleeping
Sleeping
| """API endpoint definitions for the CAD review tool.""" | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| from pathlib import Path | |
| from fastapi import APIRouter, HTTPException, UploadFile, File | |
| from fastapi.responses import FileResponse, Response, StreamingResponse | |
| from src.api.job_manager import JobManager, PIPELINE_STEPS | |
| from src.api.samples import discover_samples, get_sample | |
| from src.api.schemas import ( | |
| AssemblyNodeResponse, | |
| AssemblyTreeResponse, | |
| ComplianceResponse, | |
| ComplianceResultResponse, | |
| ComplianceRuleResponse, | |
| DistanceMeasurementResponse, | |
| DistanceRequest, | |
| GroupInfoResponse, | |
| JobStatus, | |
| ProximityPairResponse, | |
| ProximityResponse, | |
| SampleFileResponse, | |
| SamplePairResponse, | |
| StepProgress, | |
| UploadResponse, | |
| ) | |
| from src.compliance.kmvss_checker import check_compliance | |
| from src.compliance.rule_loader import load_rules | |
| from src.geometry.measurement import measure_distance, scan_proximity | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(prefix="/api") | |
| # Singleton job manager | |
| _manager: JobManager | None = None | |
| def get_manager() -> JobManager: | |
| global _manager | |
| if _manager is None: | |
| _manager = JobManager() | |
| return _manager | |
| def _node_to_response(node_dict: dict) -> AssemblyNodeResponse: | |
| """Convert assembly node dict to response model.""" | |
| children = [_node_to_response(c) for c in node_dict.get("children", [])] | |
| return AssemblyNodeResponse( | |
| id=node_dict["id"], | |
| name=node_dict["name"], | |
| is_assembly=node_dict["is_assembly"], | |
| is_leaf=node_dict["is_leaf"], | |
| classification=node_dict.get("classification", "unknown"), | |
| num_faces=node_dict.get("num_faces", 0), | |
| num_solids=node_dict.get("num_solids", 0), | |
| bounding_box=node_dict.get("bounding_box"), | |
| children=children, | |
| ) | |
| async def upload_files( | |
| exterior: UploadFile = File(..., description="Exterior (design) STEP file"), | |
| interior: UploadFile = File(..., description="Interior (engineering) STEP file"), | |
| ) -> UploadResponse: | |
| """Upload exterior and interior STEP files and create a review job.""" | |
| manager = get_manager() | |
| for label, f in [("Exterior", exterior), ("Interior", interior)]: | |
| if f.filename and not f.filename.lower().endswith((".stp", ".step")): | |
| raise HTTPException(400, f"Invalid {label} file type: {f.filename}. Expected .stp or .step") | |
| exterior_path = manager.upload_dir / (exterior.filename or "exterior.step") | |
| exterior_content = await exterior.read() | |
| exterior_path.write_bytes(exterior_content) | |
| interior_path = manager.upload_dir / (interior.filename or "interior.step") | |
| interior_content = await interior.read() | |
| interior_path.write_bytes(interior_content) | |
| job = manager.create_job(exterior_path, interior_path) | |
| loop = asyncio.get_event_loop() | |
| manager.start_job(job, loop) | |
| return UploadResponse( | |
| job_id=job.job_id, | |
| status=job.status, | |
| exterior_filename=exterior.filename or "exterior.step", | |
| interior_filename=interior.filename or "interior.step", | |
| ) | |
| async def list_samples() -> list[SamplePairResponse]: | |
| """List available sample STEP pairs discoverable in the samples directory.""" | |
| manager = get_manager() | |
| pairs = discover_samples(manager.settings.samples_dir) | |
| return [ | |
| SamplePairResponse( | |
| id=p.id, | |
| name=p.name, | |
| exterior=SampleFileResponse(filename=p.exterior.filename, size=p.exterior.size), | |
| interior=SampleFileResponse(filename=p.interior.filename, size=p.interior.size), | |
| ) | |
| for p in pairs | |
| ] | |
| async def download_sample_file(sample_id: str, kind: str) -> FileResponse: | |
| """Download a sample STEP file (kind: 'exterior' or 'interior').""" | |
| if kind not in ("exterior", "interior"): | |
| raise HTTPException(400, f"Invalid kind: {kind}. Must be 'exterior' or 'interior'.") | |
| manager = get_manager() | |
| pair = get_sample(manager.settings.samples_dir, sample_id) | |
| if pair is None: | |
| raise HTTPException(404, f"Sample not found: {sample_id}") | |
| file = pair.exterior if kind == "exterior" else pair.interior | |
| return FileResponse( | |
| path=file.path, | |
| filename=file.filename, | |
| media_type="application/octet-stream", | |
| ) | |
| async def load_sample(sample_id: str) -> UploadResponse: | |
| """Start a review job using a server-side sample pair (no upload required).""" | |
| manager = get_manager() | |
| pair = get_sample(manager.settings.samples_dir, sample_id) | |
| if pair is None: | |
| raise HTTPException(404, f"Sample not found: {sample_id}") | |
| job = manager.create_job(pair.exterior.path, pair.interior.path) | |
| loop = asyncio.get_event_loop() | |
| manager.start_job(job, loop) | |
| return UploadResponse( | |
| job_id=job.job_id, | |
| status=job.status, | |
| exterior_filename=pair.exterior.filename, | |
| interior_filename=pair.interior.filename, | |
| ) | |
| async def get_job_status(job_id: str) -> JobStatus: | |
| """Get the current status of a job.""" | |
| manager = get_manager() | |
| job = manager.get_job(job_id) | |
| if job is None: | |
| raise HTTPException(404, f"Job not found: {job_id}") | |
| steps = [ | |
| StepProgress(step=name, status=job.steps.get(name, "pending")) | |
| for name in PIPELINE_STEPS | |
| ] | |
| return JobStatus( | |
| job_id=job.job_id, | |
| status=job.status, | |
| steps=steps, | |
| error=job.error, | |
| ) | |
| async def job_events(job_id: str) -> StreamingResponse: | |
| """SSE stream of pipeline progress events.""" | |
| manager = get_manager() | |
| job = manager.get_job(job_id) | |
| if job is None: | |
| raise HTTPException(404, f"Job not found: {job_id}") | |
| queue = manager.subscribe(job) | |
| async def event_stream(): | |
| # Initial comment forces the client to see headers and the first byte | |
| # immediately, which defeats reverse-proxy buffering (HF Spaces, nginx). | |
| yield ": connected\n\n" | |
| try: | |
| while True: | |
| try: | |
| event = await asyncio.wait_for(queue.get(), timeout=10) | |
| except asyncio.TimeoutError: | |
| # Periodic keep-alive ping so the proxy flushes the buffer. | |
| yield ": keep-alive\n\n" | |
| continue | |
| yield f"data: {json.dumps(event)}\n\n" | |
| if event.get("overall_status") in ("completed", "failed"): | |
| break | |
| finally: | |
| manager.unsubscribe(job, queue) | |
| return StreamingResponse( | |
| event_stream(), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache, no-transform", | |
| "X-Accel-Buffering": "no", | |
| "Connection": "keep-alive", | |
| }, | |
| ) | |
| async def get_assembly_tree(job_id: str) -> AssemblyTreeResponse: | |
| """Get the assembly tree for a job.""" | |
| manager = get_manager() | |
| job = manager.get_job(job_id) | |
| if job is None: | |
| raise HTTPException(404, f"Job not found: {job_id}") | |
| if job.assembly_tree is None: | |
| raise HTTPException(409, "Assembly tree not available yet") | |
| tree_dict = job.assembly_tree.to_dict() | |
| root = _node_to_response(tree_dict) | |
| groups = [ | |
| GroupInfoResponse( | |
| name=g.name, | |
| part_count=len(g.part_ids), | |
| part_ids=g.part_ids, | |
| part_names=g.part_names, | |
| ) | |
| for g in job.groups.values() | |
| ] | |
| return AssemblyTreeResponse(root=root, groups=groups) | |
| async def get_part_mesh(job_id: str, part_id: str) -> Response: | |
| """Get the GLB mesh for a specific part.""" | |
| manager = get_manager() | |
| job = manager.get_job(job_id) | |
| if job is None: | |
| raise HTTPException(404, f"Job not found: {job_id}") | |
| mesh_data = job.part_meshes.get(part_id) | |
| if mesh_data is None: | |
| raise HTTPException(404, f"Part mesh not found: {part_id}") | |
| if not mesh_data.glb: | |
| raise HTTPException(409, "Mesh not exported yet") | |
| return Response(content=mesh_data.glb, media_type="model/gltf-binary") | |
| async def get_group_mesh(job_id: str, group: str) -> Response: | |
| """Get the combined GLB mesh for a group (exterior or interior).""" | |
| manager = get_manager() | |
| job = manager.get_job(job_id) | |
| if job is None: | |
| raise HTTPException(404, f"Job not found: {job_id}") | |
| if group not in ("exterior", "interior"): | |
| raise HTTPException(400, f"Invalid group: {group}. Must be 'exterior' or 'interior'.") | |
| mesh_data = job.group_meshes.get(group) | |
| if mesh_data is None: | |
| raise HTTPException(404, f"Group mesh not found: {group}") | |
| if not mesh_data.glb: | |
| raise HTTPException(409, "Group mesh not exported yet") | |
| return Response(content=mesh_data.glb, media_type="model/gltf-binary") | |
| async def analyze_proximity(job_id: str) -> ProximityResponse: | |
| """Run proximity/collision analysis on all parts.""" | |
| manager = get_manager() | |
| job = manager.get_job(job_id) | |
| if job is None: | |
| raise HTTPException(404, f"Job not found: {job_id}") | |
| if job.assembly_tree is None: | |
| raise HTTPException(409, "Assembly tree not available yet") | |
| parts = [] | |
| for leaf in job.assembly_tree.iter_leaves(): | |
| if leaf.shape is not None and not leaf.shape.IsNull(): | |
| parts.append({"id": leaf.id, "name": leaf.name, "shape": leaf.shape}) | |
| results = scan_proximity( | |
| parts, | |
| collision_threshold_mm=manager.settings.tolerance.gap_tolerance_mm, | |
| near_threshold_mm=5.0, | |
| ) | |
| pairs = [ | |
| ProximityPairResponse( | |
| part_a_id=r.part_a_id, | |
| part_a_name=r.part_a_name, | |
| part_b_id=r.part_b_id, | |
| part_b_name=r.part_b_name, | |
| min_distance_mm=r.min_distance_mm, | |
| point_a=r.point_a, | |
| point_b=r.point_b, | |
| status=r.status, | |
| ) | |
| for r in results | |
| ] | |
| return ProximityResponse( | |
| pairs=pairs, | |
| collision_count=sum(1 for p in pairs if p.status == "collision"), | |
| near_count=sum(1 for p in pairs if p.status == "near"), | |
| ) | |
| async def analyze_distance(job_id: str, request: DistanceRequest) -> DistanceMeasurementResponse: | |
| """Measure distance between two specific parts.""" | |
| manager = get_manager() | |
| job = manager.get_job(job_id) | |
| if job is None: | |
| raise HTTPException(404, f"Job not found: {job_id}") | |
| if job.assembly_tree is None: | |
| raise HTTPException(409, "Assembly tree not available yet") | |
| node_a = job.assembly_tree.find_by_id(request.part_a_id) | |
| node_b = job.assembly_tree.find_by_id(request.part_b_id) | |
| if node_a is None or node_a.shape is None: | |
| raise HTTPException(404, f"Part not found: {request.part_a_id}") | |
| if node_b is None or node_b.shape is None: | |
| raise HTTPException(404, f"Part not found: {request.part_b_id}") | |
| result = measure_distance(node_a.shape, node_b.shape) | |
| return DistanceMeasurementResponse( | |
| distance_mm=result.distance_mm, | |
| point_a=result.point_a, | |
| point_b=result.point_b, | |
| ) | |
| async def get_compliance_rules() -> list[ComplianceRuleResponse]: | |
| """Get available compliance rules.""" | |
| rules_path = Path(__file__).parent.parent.parent / "config" / "kmvss_rules.yaml" | |
| rules = load_rules(rules_path) | |
| return [ | |
| ComplianceRuleResponse( | |
| id=r.id, name=r.name, description=r.description, | |
| type=r.type, severity=r.severity, | |
| ) | |
| for r in rules | |
| ] | |
| async def analyze_compliance(job_id: str) -> ComplianceResponse: | |
| """Run compliance checks on the assembly.""" | |
| manager = get_manager() | |
| job = manager.get_job(job_id) | |
| if job is None: | |
| raise HTTPException(404, f"Job not found: {job_id}") | |
| if job.assembly_tree is None: | |
| raise HTTPException(409, "Assembly tree not available yet") | |
| rules_path = Path(__file__).parent.parent.parent / "config" / "kmvss_rules.yaml" | |
| rules = load_rules(rules_path) | |
| results = check_compliance(rules, job.assembly_tree) | |
| return ComplianceResponse( | |
| results=[ | |
| ComplianceResultResponse( | |
| rule_id=r.rule_id, | |
| rule_name=r.rule_name, | |
| passed=r.passed, | |
| severity=r.severity, | |
| measured_value=r.measured_value, | |
| threshold_value=r.threshold_value, | |
| unit=r.unit, | |
| message=r.message, | |
| affected_parts=r.affected_parts, | |
| ) | |
| for r in results | |
| ], | |
| pass_count=sum(1 for r in results if r.passed), | |
| fail_count=sum(1 for r in results if not r.passed), | |
| ) | |