Spaces:
Configuration error
Configuration error
| from typing import Optional | |
| from fastapi import FastAPI | |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Request | |
| from pydantic import BaseModel | |
| import base64, cv2, numpy as np, math | |
| from pupil_apriltags import Detector | |
| from scipy.spatial.transform import Rotation as R | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app = FastAPI() | |
| app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) | |
| detector = Detector( | |
| families="tagStandard41h12", | |
| nthreads=4, | |
| quad_decimate=1.0, | |
| quad_sigma=0.0, | |
| refine_edges=True, | |
| decode_sharpening=0.25, | |
| debug=False, | |
| ) | |
| # --- models ------------------------------------------------------------ | |
| class CameraParams(BaseModel): | |
| fx: float | |
| fy: float | |
| cx: float | |
| cy: float | |
| class DetectionRequest(BaseModel): | |
| image_base64: str | |
| camera_params: CameraParams | |
| tag_size: float | |
| class TagDetection(BaseModel): | |
| tag_id: int | |
| corners: list[list[float]] | |
| center: list[float] | |
| translation_x: float | |
| translation_y: float | |
| translation_z: float | |
| quaternion_w: float | |
| quaternion_x: float | |
| quaternion_y: float | |
| quaternion_z: float | |
| class DetectionResponse(BaseModel): | |
| detections: list[TagDetection] | |
| # Pose with explicit float fields for Unity/C# consumption | |
| class Pose(BaseModel): | |
| x: float | |
| y: float | |
| z: float | |
| qw: float | |
| qx: float | |
| qy: float | |
| qz: float | |
| class TransformRequest(BaseModel): | |
| headset_local: Pose # headset pose in local frame | |
| detection: TagDetection # single tag detection | |
| global_tag_pose: Pose # known tag pose in world frame | |
| house_id : Optional[str] = None # optional house ID for future use | |
| class TransformResponse(BaseModel): | |
| headset_global: Pose # headset pose in world frame | |
| local_to_global: Pose # transform local->world | |
| global_to_local: Pose # transform world->local | |
| async def log_requests(request: Request, call_next): | |
| print(f"[REQ] {request.method} {request.url}") | |
| return await call_next(request) | |
| async def root(): | |
| return {"message": "Hello, Localization!"} | |
| # --- endpoint: detect ------------------------------------------------- | |
| def detect_apriltag(req: DetectionRequest): | |
| img = cv2.imdecode( | |
| np.frombuffer(base64.b64decode(req.image_base64), np.uint8), | |
| cv2.IMREAD_GRAYSCALE | |
| ) | |
| #img_ud = cv2.undistort(img, cameraMatrix=np.array([req.camera_params.fx, 0, req.camera_params.cx])) | |
| tags = detector.detect( | |
| img, | |
| estimate_tag_pose=True, | |
| camera_params=( | |
| req.camera_params.fx, | |
| req.camera_params.fy, | |
| req.camera_params.cx, | |
| req.camera_params.cy | |
| ), | |
| tag_size=req.tag_size | |
| ) | |
| out = [] | |
| for t in tags: | |
| tx, ty, tz = t.pose_t.flatten().tolist() | |
| mat = t.pose_R | |
| qw = math.sqrt(max(0.0, 1 + mat[0,0] + mat[1,1] + mat[2,2]))/2 | |
| qx = math.copysign(math.sqrt(max(0.0, 1 + mat[0,0] - mat[1,1] - mat[2,2]))/2, mat[2,1] - mat[1,2]) | |
| qy = math.copysign(math.sqrt(max(0.0, 1 - mat[0,0] + mat[1,1] - mat[2,2]))/2, mat[0,2] - mat[2,0]) | |
| qz = math.copysign(math.sqrt(max(0.0, 1 - mat[0,0] - mat[1,1] + mat[2,2]))/2, mat[1,0] - mat[0,1]) | |
| out.append(TagDetection( | |
| tag_id=t.tag_id, | |
| corners=t.corners.tolist(), | |
| center=t.center.tolist(), | |
| translation_x=tx, | |
| translation_y=ty, | |
| translation_z=tz, | |
| quaternion_w=qw, | |
| quaternion_x=qx, | |
| quaternion_y=qy, | |
| quaternion_z=qz, | |
| )) | |
| return DetectionResponse(detections=out) | |
| # --- endpoint: compute transform --------------------------------------- | |
| def compute_transform(req: TransformRequest): | |
| # unpack detection: tag->camera | |
| d = req.detection | |
| t_ct = np.array([d.translation_x, d.translation_y, d.translation_z]) | |
| R_ct = R.from_quat([d.quaternion_x, d.quaternion_y, d.quaternion_z, d.quaternion_w]).as_matrix() | |
| # invert: camera->tag | |
| R_tc = R_ct.T | |
| t_tc = -R_tc.dot(t_ct) | |
| # unpack global tag pose: tag->world | |
| gt = req.global_tag_pose | |
| t_wt = np.array([gt.x, gt.y, gt.z]) | |
| R_wt = R.from_quat([gt.qx, gt.qy, gt.qz, gt.qw]).as_matrix() | |
| # compute camera->world | |
| R_wc = R_wt.dot(R_tc) | |
| t_wc = R_wt.dot(t_tc) + t_wt | |
| headset_global = Pose( | |
| x=float(t_wc[0]), y=float(t_wc[1]), z=float(t_wc[2]), | |
| qw=float(R.from_matrix(R_wc).as_quat()[3]), | |
| qx=float(R.from_matrix(R_wc).as_quat()[0]), | |
| qy=float(R.from_matrix(R_wc).as_quat()[1]), | |
| qz=float(R.from_matrix(R_wc).as_quat()[2]) | |
| ) | |
| # unpack local headset pose: local->headset | |
| hl = req.headset_local | |
| t_lh = np.array([hl.x, hl.y, hl.z]) | |
| R_lh = R.from_quat([hl.qx, hl.qy, hl.qz, hl.qw]).as_matrix() | |
| # invert: headset->local | |
| R_hl = R_lh.T | |
| t_hl = -R_hl.dot(t_lh) | |
| # compute local->world | |
| R_wl = R_wc.dot(R_hl) | |
| t_wl = R_wc.dot(t_hl) + t_wc | |
| local_to_global = Pose( | |
| x=float(t_wl[0]), y=float(t_wl[1]), z=float(t_wl[2]), | |
| qw=float(R.from_matrix(R_wl).as_quat()[3]), | |
| qx=float(R.from_matrix(R_wl).as_quat()[0]), | |
| qy=float(R.from_matrix(R_wl).as_quat()[1]), | |
| qz=float(R.from_matrix(R_wl).as_quat()[2]) | |
| ) | |
| # compute global->local | |
| R_lw = R_wl.T | |
| t_lw = -R_lw.dot(t_wl) | |
| global_to_local = Pose( | |
| x=float(t_lw[0]), y=float(t_lw[1]), z=float(t_lw[2]), | |
| qw=float(R.from_matrix(R_lw).as_quat()[3]), | |
| qx=float(R.from_matrix(R_lw).as_quat()[0]), | |
| qy=float(R.from_matrix(R_lw).as_quat()[1]), | |
| qz=float(R.from_matrix(R_lw).as_quat()[2]) | |
| ) | |
| return TransformResponse( | |
| headset_global=headset_global, | |
| local_to_global=local_to_global, | |
| global_to_local=global_to_local | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("api.mapping_server:app", host="0.0.0.0", port=8000) |