forma-3d-review-api / src /api /router.py
lomit's picture
Sync from forma-3d-review@413ffa07c6dd2d81a67f3e811883bcdd79f684a3
d57d0d8 verified
"""API endpoint definitions for the CAD review tool."""
from __future__ import annotations
import asyncio
import json
import logging
from pathlib import Path
from fastapi import APIRouter, HTTPException, UploadFile, File
from fastapi.responses import FileResponse, Response, StreamingResponse
from src.api.job_manager import JobManager, PIPELINE_STEPS
from src.api.samples import discover_samples, get_sample
from src.api.schemas import (
AssemblyNodeResponse,
AssemblyTreeResponse,
ComplianceResponse,
ComplianceResultResponse,
ComplianceRuleResponse,
DistanceMeasurementResponse,
DistanceRequest,
GroupInfoResponse,
JobStatus,
ProximityPairResponse,
ProximityResponse,
SampleFileResponse,
SamplePairResponse,
StepProgress,
UploadResponse,
)
from src.compliance.kmvss_checker import check_compliance
from src.compliance.rule_loader import load_rules
from src.geometry.measurement import measure_distance, scan_proximity
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api")
# Singleton job manager
_manager: JobManager | None = None
def get_manager() -> JobManager:
global _manager
if _manager is None:
_manager = JobManager()
return _manager
def _node_to_response(node_dict: dict) -> AssemblyNodeResponse:
"""Convert assembly node dict to response model."""
children = [_node_to_response(c) for c in node_dict.get("children", [])]
return AssemblyNodeResponse(
id=node_dict["id"],
name=node_dict["name"],
is_assembly=node_dict["is_assembly"],
is_leaf=node_dict["is_leaf"],
classification=node_dict.get("classification", "unknown"),
num_faces=node_dict.get("num_faces", 0),
num_solids=node_dict.get("num_solids", 0),
bounding_box=node_dict.get("bounding_box"),
children=children,
)
@router.post("/upload", response_model=UploadResponse, status_code=201)
async def upload_files(
exterior: UploadFile = File(..., description="Exterior (design) STEP file"),
interior: UploadFile = File(..., description="Interior (engineering) STEP file"),
) -> UploadResponse:
"""Upload exterior and interior STEP files and create a review job."""
manager = get_manager()
for label, f in [("Exterior", exterior), ("Interior", interior)]:
if f.filename and not f.filename.lower().endswith((".stp", ".step")):
raise HTTPException(400, f"Invalid {label} file type: {f.filename}. Expected .stp or .step")
exterior_path = manager.upload_dir / (exterior.filename or "exterior.step")
exterior_content = await exterior.read()
exterior_path.write_bytes(exterior_content)
interior_path = manager.upload_dir / (interior.filename or "interior.step")
interior_content = await interior.read()
interior_path.write_bytes(interior_content)
job = manager.create_job(exterior_path, interior_path)
loop = asyncio.get_event_loop()
manager.start_job(job, loop)
return UploadResponse(
job_id=job.job_id,
status=job.status,
exterior_filename=exterior.filename or "exterior.step",
interior_filename=interior.filename or "interior.step",
)
@router.get("/samples", response_model=list[SamplePairResponse])
async def list_samples() -> list[SamplePairResponse]:
"""List available sample STEP pairs discoverable in the samples directory."""
manager = get_manager()
pairs = discover_samples(manager.settings.samples_dir)
return [
SamplePairResponse(
id=p.id,
name=p.name,
exterior=SampleFileResponse(filename=p.exterior.filename, size=p.exterior.size),
interior=SampleFileResponse(filename=p.interior.filename, size=p.interior.size),
)
for p in pairs
]
@router.get("/samples/{sample_id}/files/{kind}")
async def download_sample_file(sample_id: str, kind: str) -> FileResponse:
"""Download a sample STEP file (kind: 'exterior' or 'interior')."""
if kind not in ("exterior", "interior"):
raise HTTPException(400, f"Invalid kind: {kind}. Must be 'exterior' or 'interior'.")
manager = get_manager()
pair = get_sample(manager.settings.samples_dir, sample_id)
if pair is None:
raise HTTPException(404, f"Sample not found: {sample_id}")
file = pair.exterior if kind == "exterior" else pair.interior
return FileResponse(
path=file.path,
filename=file.filename,
media_type="application/octet-stream",
)
@router.post("/samples/{sample_id}/load", response_model=UploadResponse, status_code=201)
async def load_sample(sample_id: str) -> UploadResponse:
"""Start a review job using a server-side sample pair (no upload required)."""
manager = get_manager()
pair = get_sample(manager.settings.samples_dir, sample_id)
if pair is None:
raise HTTPException(404, f"Sample not found: {sample_id}")
job = manager.create_job(pair.exterior.path, pair.interior.path)
loop = asyncio.get_event_loop()
manager.start_job(job, loop)
return UploadResponse(
job_id=job.job_id,
status=job.status,
exterior_filename=pair.exterior.filename,
interior_filename=pair.interior.filename,
)
@router.get("/jobs/{job_id}", response_model=JobStatus)
async def get_job_status(job_id: str) -> JobStatus:
"""Get the current status of a job."""
manager = get_manager()
job = manager.get_job(job_id)
if job is None:
raise HTTPException(404, f"Job not found: {job_id}")
steps = [
StepProgress(step=name, status=job.steps.get(name, "pending"))
for name in PIPELINE_STEPS
]
return JobStatus(
job_id=job.job_id,
status=job.status,
steps=steps,
error=job.error,
)
@router.get("/jobs/{job_id}/events")
async def job_events(job_id: str) -> StreamingResponse:
"""SSE stream of pipeline progress events."""
manager = get_manager()
job = manager.get_job(job_id)
if job is None:
raise HTTPException(404, f"Job not found: {job_id}")
queue = manager.subscribe(job)
async def event_stream():
# Initial comment forces the client to see headers and the first byte
# immediately, which defeats reverse-proxy buffering (HF Spaces, nginx).
yield ": connected\n\n"
try:
while True:
try:
event = await asyncio.wait_for(queue.get(), timeout=10)
except asyncio.TimeoutError:
# Periodic keep-alive ping so the proxy flushes the buffer.
yield ": keep-alive\n\n"
continue
yield f"data: {json.dumps(event)}\n\n"
if event.get("overall_status") in ("completed", "failed"):
break
finally:
manager.unsubscribe(job, queue)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
@router.get("/jobs/{job_id}/assembly-tree", response_model=AssemblyTreeResponse)
async def get_assembly_tree(job_id: str) -> AssemblyTreeResponse:
"""Get the assembly tree for a job."""
manager = get_manager()
job = manager.get_job(job_id)
if job is None:
raise HTTPException(404, f"Job not found: {job_id}")
if job.assembly_tree is None:
raise HTTPException(409, "Assembly tree not available yet")
tree_dict = job.assembly_tree.to_dict()
root = _node_to_response(tree_dict)
groups = [
GroupInfoResponse(
name=g.name,
part_count=len(g.part_ids),
part_ids=g.part_ids,
part_names=g.part_names,
)
for g in job.groups.values()
]
return AssemblyTreeResponse(root=root, groups=groups)
@router.get("/jobs/{job_id}/parts/{part_id}/mesh")
async def get_part_mesh(job_id: str, part_id: str) -> Response:
"""Get the GLB mesh for a specific part."""
manager = get_manager()
job = manager.get_job(job_id)
if job is None:
raise HTTPException(404, f"Job not found: {job_id}")
mesh_data = job.part_meshes.get(part_id)
if mesh_data is None:
raise HTTPException(404, f"Part mesh not found: {part_id}")
if not mesh_data.glb:
raise HTTPException(409, "Mesh not exported yet")
return Response(content=mesh_data.glb, media_type="model/gltf-binary")
@router.get("/jobs/{job_id}/groups/{group}/mesh")
async def get_group_mesh(job_id: str, group: str) -> Response:
"""Get the combined GLB mesh for a group (exterior or interior)."""
manager = get_manager()
job = manager.get_job(job_id)
if job is None:
raise HTTPException(404, f"Job not found: {job_id}")
if group not in ("exterior", "interior"):
raise HTTPException(400, f"Invalid group: {group}. Must be 'exterior' or 'interior'.")
mesh_data = job.group_meshes.get(group)
if mesh_data is None:
raise HTTPException(404, f"Group mesh not found: {group}")
if not mesh_data.glb:
raise HTTPException(409, "Group mesh not exported yet")
return Response(content=mesh_data.glb, media_type="model/gltf-binary")
@router.post("/jobs/{job_id}/analyze/proximity", response_model=ProximityResponse)
async def analyze_proximity(job_id: str) -> ProximityResponse:
"""Run proximity/collision analysis on all parts."""
manager = get_manager()
job = manager.get_job(job_id)
if job is None:
raise HTTPException(404, f"Job not found: {job_id}")
if job.assembly_tree is None:
raise HTTPException(409, "Assembly tree not available yet")
parts = []
for leaf in job.assembly_tree.iter_leaves():
if leaf.shape is not None and not leaf.shape.IsNull():
parts.append({"id": leaf.id, "name": leaf.name, "shape": leaf.shape})
results = scan_proximity(
parts,
collision_threshold_mm=manager.settings.tolerance.gap_tolerance_mm,
near_threshold_mm=5.0,
)
pairs = [
ProximityPairResponse(
part_a_id=r.part_a_id,
part_a_name=r.part_a_name,
part_b_id=r.part_b_id,
part_b_name=r.part_b_name,
min_distance_mm=r.min_distance_mm,
point_a=r.point_a,
point_b=r.point_b,
status=r.status,
)
for r in results
]
return ProximityResponse(
pairs=pairs,
collision_count=sum(1 for p in pairs if p.status == "collision"),
near_count=sum(1 for p in pairs if p.status == "near"),
)
@router.post("/jobs/{job_id}/analyze/distance", response_model=DistanceMeasurementResponse)
async def analyze_distance(job_id: str, request: DistanceRequest) -> DistanceMeasurementResponse:
"""Measure distance between two specific parts."""
manager = get_manager()
job = manager.get_job(job_id)
if job is None:
raise HTTPException(404, f"Job not found: {job_id}")
if job.assembly_tree is None:
raise HTTPException(409, "Assembly tree not available yet")
node_a = job.assembly_tree.find_by_id(request.part_a_id)
node_b = job.assembly_tree.find_by_id(request.part_b_id)
if node_a is None or node_a.shape is None:
raise HTTPException(404, f"Part not found: {request.part_a_id}")
if node_b is None or node_b.shape is None:
raise HTTPException(404, f"Part not found: {request.part_b_id}")
result = measure_distance(node_a.shape, node_b.shape)
return DistanceMeasurementResponse(
distance_mm=result.distance_mm,
point_a=result.point_a,
point_b=result.point_b,
)
@router.get("/compliance/rules")
async def get_compliance_rules() -> list[ComplianceRuleResponse]:
"""Get available compliance rules."""
rules_path = Path(__file__).parent.parent.parent / "config" / "kmvss_rules.yaml"
rules = load_rules(rules_path)
return [
ComplianceRuleResponse(
id=r.id, name=r.name, description=r.description,
type=r.type, severity=r.severity,
)
for r in rules
]
@router.post("/jobs/{job_id}/analyze/compliance", response_model=ComplianceResponse)
async def analyze_compliance(job_id: str) -> ComplianceResponse:
"""Run compliance checks on the assembly."""
manager = get_manager()
job = manager.get_job(job_id)
if job is None:
raise HTTPException(404, f"Job not found: {job_id}")
if job.assembly_tree is None:
raise HTTPException(409, "Assembly tree not available yet")
rules_path = Path(__file__).parent.parent.parent / "config" / "kmvss_rules.yaml"
rules = load_rules(rules_path)
results = check_compliance(rules, job.assembly_tree)
return ComplianceResponse(
results=[
ComplianceResultResponse(
rule_id=r.rule_id,
rule_name=r.rule_name,
passed=r.passed,
severity=r.severity,
measured_value=r.measured_value,
threshold_value=r.threshold_value,
unit=r.unit,
message=r.message,
affected_parts=r.affected_parts,
)
for r in results
],
pass_count=sum(1 for r in results if r.passed),
fail_count=sum(1 for r in results if not r.passed),
)