import base64 import json import time from typing import Any, List, Tuple, Union import cv2 import numpy as np try: import websocket except ImportError: websocket = None COLOR_BGR = { "green": (0, 255, 0), "orange": (51, 140, 232), "red": (0, 0, 255), "pink": (93, 57, 240), "gray": (155, 155, 155), "white": (255, 255, 255), } UI_TEXT_STYLE = { "fontFace": cv2.FONT_HERSHEY_SIMPLEX, "fontScale": 0.5, "color": COLOR_BGR["white"], "thickness": 2, } DETECTION_RECT_STYLE = { "thickness": 2, } def render_detection_rectangle( frame: np.ndarray, text: str, xyxy: Union[List[float], np.ndarray], color: str, ) -> None: """ Draw one detection rectangle and label text on frame. """ if xyxy is None: return cv2.putText( frame, text, org=(int(xyxy[0]), int(xyxy[1]) - 5), fontFace=UI_TEXT_STYLE["fontFace"], fontScale=0.6, color=COLOR_BGR[color], thickness=UI_TEXT_STYLE["thickness"], ) cv2.rectangle( frame, pt1=(int(xyxy[0]), int(xyxy[1])), pt2=(int(xyxy[2]), int(xyxy[3])), color=COLOR_BGR[color], thickness=DETECTION_RECT_STYLE["thickness"], ) def render_ui_text( frame: np.ndarray, text: str, frame_wh: Tuple[int, int], margin_wh: Tuple[int, int], align: str, order: int, ) -> None: """ Draw UI text on frame. This is the cleaned version of your current UI renderer. """ frame_w, _ = frame_wh margin_w, margin_h = margin_wh (text_width, text_height), _ = cv2.getTextSize( text=text, fontFace=UI_TEXT_STYLE["fontFace"], fontScale=UI_TEXT_STYLE["fontScale"], thickness=UI_TEXT_STYLE["thickness"], ) if align == "left": origin = (margin_w, margin_h + order * (text_height + 5)) elif align == "right": origin = ( frame_w - margin_w - int((0.01 if frame_w <= 600 else 1.0) * text_width), margin_h + order * (text_height + 5), ) else: raise ValueError("align must be either 'left' or 'right'") cv2.putText( frame, text, org=origin, fontFace=UI_TEXT_STYLE["fontFace"], fontScale=UI_TEXT_STYLE["fontScale"], color=UI_TEXT_STYLE["color"], thickness=UI_TEXT_STYLE["thickness"], ) def crop_frame( frame: np.ndarray, center_xy: np.ndarray, crop_hw: Tuple[int, int], ) -> tuple[np.ndarray | None, list[int] | None]: """ Crop a sub-frame using center coordinate and crop size. Returns: - cropped frame - absolute xyxy box """ frame_h, frame_w, _ = frame.shape x_center, y_center = center_xy crop_h, crop_w = crop_hw if not (0 <= x_center <= frame_w and 0 <= y_center <= frame_h): return None, None xs = np.array([x_center - (crop_w // 2), x_center + (crop_w // 2)], dtype=np.int32) ys = np.array([y_center - (crop_h // 2), y_center + (crop_h // 2)], dtype=np.int32) np.clip(xs, 0, frame_w, out=xs) np.clip(ys, 0, frame_h, out=ys) xyxy = [int(xs[0]), int(ys[0]), int(xs[1]), int(ys[1])] cropped = frame[ys[0] : ys[1], xs[0] : xs[1], :] return cropped, xyxy def resize_frame_to_square( frame: np.ndarray, edge_length: int, ratio_threshold: float = 9 / 16, ) -> np.ndarray: """ Resize a frame to square. If frame ratio is too wide/tall, center-crop first. Otherwise, direct resize. This follows the same idea used in your current project. """ if edge_length <= 0: raise ValueError("edge_length must be greater than 0") if not (0 < ratio_threshold <= 1): raise ValueError("ratio_threshold must be in (0, 1]") height, width = frame.shape[:2] if height <= 0 or width <= 0: raise ValueError(f"Invalid frame shape: {frame.shape}") ratio = height / (width + np.finfo(np.float32).eps) if ratio_threshold < ratio < 1 / ratio_threshold: return cv2.resize( frame, (edge_length, edge_length), interpolation=cv2.INTER_AREA ) if width > height: start_x = (width - height) // 2 cropped = frame[:, start_x : start_x + height] else: start_y = (height - width) // 2 cropped = frame[start_y : start_y + width, :] return cv2.resize(cropped, (edge_length, edge_length), interpolation=cv2.INTER_AREA) def relative_to_absolute( from_mother_wh: Tuple[int, int], to_mother_wh: Tuple[int, int], from_child_xyxy: Union[List[float], np.ndarray], to_mother_xy: Tuple[int, int] = (0, 0), ) -> list[int]: """ Convert relative box coordinates from resized sub-frame to absolute coordinates. Example: - detection happens on resized square hand frame - convert detected phone box back to original frame coordinates """ from_mother_w, from_mother_h = from_mother_wh to_mother_w, to_mother_h = to_mother_wh offset_x, offset_y = to_mother_xy scale_x = to_mother_w / (from_mother_w + np.finfo(np.float32).eps) scale_y = to_mother_h / (from_mother_h + np.finfo(np.float32).eps) x1, y1, x2, y2 = from_child_xyxy[:4] abs_x1 = int(x1 * scale_x + offset_x) abs_y1 = int(y1 * scale_y + offset_y) abs_x2 = int(x2 * scale_x + offset_x) abs_y2 = int(y2 * scale_y + offset_y) return [abs_x1, abs_y1, abs_x2, abs_y2] def init_websocket(server_url: str) -> Any | None: """ Initialize websocket connection safely. """ if websocket is None: return None try: ws = websocket.WebSocket() ws.connect(server_url) return ws except Exception: return None def yield_video_feed(frame_to_yield: np.ndarray, title: str = "", ws=None) -> None: """ Show local OpenCV window and optionally send frame through websocket. """ if ws is not None: _, jpeg_encoded = cv2.imencode(".jpg", frame_to_yield) jpeg_bytes = jpeg_encoded.tobytes() jpeg_base64 = base64.b64encode(jpeg_bytes).decode("utf-8") ws.send( json.dumps( { "frameBase64": jpeg_base64, "timestamp": f"{float(time.time()):.3f}", } ) ) cv2.imshow(title, frame_to_yield) def announce_face_frame(face_frames: list[np.ndarray], ws) -> None: """ Send face crops to websocket client. """ encoded_frames = [] for frame in face_frames: try: _, jpeg_encoded = cv2.imencode(".jpg", frame) jpeg_bytes = jpeg_encoded.tobytes() jpeg_base64 = base64.b64encode(jpeg_bytes).decode("utf-8") encoded_frames.append(jpeg_base64) except Exception: continue if len(encoded_frames) <= 0: return ws.send( json.dumps( { "announced_face_frames": encoded_frames, "timestamp": f"{float(time.time()):.3f}", } ) )