pvs_backend / src /utils /opencv_utils.py
adnankhan-11's picture
PVD System - Initial deployment
d2885a7
import base64
import json
import time
from typing import Any, List, Tuple, Union
import cv2
import numpy as np
try:
import websocket
except ImportError:
websocket = None
COLOR_BGR = {
"green": (0, 255, 0),
"orange": (51, 140, 232),
"red": (0, 0, 255),
"pink": (93, 57, 240),
"gray": (155, 155, 155),
"white": (255, 255, 255),
}
UI_TEXT_STYLE = {
"fontFace": cv2.FONT_HERSHEY_SIMPLEX,
"fontScale": 0.5,
"color": COLOR_BGR["white"],
"thickness": 2,
}
DETECTION_RECT_STYLE = {
"thickness": 2,
}
def render_detection_rectangle(
frame: np.ndarray,
text: str,
xyxy: Union[List[float], np.ndarray],
color: str,
) -> None:
"""
Draw one detection rectangle and label text on frame.
"""
if xyxy is None:
return
cv2.putText(
frame,
text,
org=(int(xyxy[0]), int(xyxy[1]) - 5),
fontFace=UI_TEXT_STYLE["fontFace"],
fontScale=0.6,
color=COLOR_BGR[color],
thickness=UI_TEXT_STYLE["thickness"],
)
cv2.rectangle(
frame,
pt1=(int(xyxy[0]), int(xyxy[1])),
pt2=(int(xyxy[2]), int(xyxy[3])),
color=COLOR_BGR[color],
thickness=DETECTION_RECT_STYLE["thickness"],
)
def render_ui_text(
frame: np.ndarray,
text: str,
frame_wh: Tuple[int, int],
margin_wh: Tuple[int, int],
align: str,
order: int,
) -> None:
"""
Draw UI text on frame.
This is the cleaned version of your current UI renderer.
"""
frame_w, _ = frame_wh
margin_w, margin_h = margin_wh
(text_width, text_height), _ = cv2.getTextSize(
text=text,
fontFace=UI_TEXT_STYLE["fontFace"],
fontScale=UI_TEXT_STYLE["fontScale"],
thickness=UI_TEXT_STYLE["thickness"],
)
if align == "left":
origin = (margin_w, margin_h + order * (text_height + 5))
elif align == "right":
origin = (
frame_w - margin_w - int((0.01 if frame_w <= 600 else 1.0) * text_width),
margin_h + order * (text_height + 5),
)
else:
raise ValueError("align must be either 'left' or 'right'")
cv2.putText(
frame,
text,
org=origin,
fontFace=UI_TEXT_STYLE["fontFace"],
fontScale=UI_TEXT_STYLE["fontScale"],
color=UI_TEXT_STYLE["color"],
thickness=UI_TEXT_STYLE["thickness"],
)
def crop_frame(
frame: np.ndarray,
center_xy: np.ndarray,
crop_hw: Tuple[int, int],
) -> tuple[np.ndarray | None, list[int] | None]:
"""
Crop a sub-frame using center coordinate and crop size.
Returns:
- cropped frame
- absolute xyxy box
"""
frame_h, frame_w, _ = frame.shape
x_center, y_center = center_xy
crop_h, crop_w = crop_hw
if not (0 <= x_center <= frame_w and 0 <= y_center <= frame_h):
return None, None
xs = np.array([x_center - (crop_w // 2), x_center + (crop_w // 2)], dtype=np.int32)
ys = np.array([y_center - (crop_h // 2), y_center + (crop_h // 2)], dtype=np.int32)
np.clip(xs, 0, frame_w, out=xs)
np.clip(ys, 0, frame_h, out=ys)
xyxy = [int(xs[0]), int(ys[0]), int(xs[1]), int(ys[1])]
cropped = frame[ys[0] : ys[1], xs[0] : xs[1], :]
return cropped, xyxy
def resize_frame_to_square(
frame: np.ndarray,
edge_length: int,
ratio_threshold: float = 9 / 16,
) -> np.ndarray:
"""
Resize a frame to square.
If frame ratio is too wide/tall, center-crop first.
Otherwise, direct resize.
This follows the same idea used in your current project.
"""
if edge_length <= 0:
raise ValueError("edge_length must be greater than 0")
if not (0 < ratio_threshold <= 1):
raise ValueError("ratio_threshold must be in (0, 1]")
height, width = frame.shape[:2]
if height <= 0 or width <= 0:
raise ValueError(f"Invalid frame shape: {frame.shape}")
ratio = height / (width + np.finfo(np.float32).eps)
if ratio_threshold < ratio < 1 / ratio_threshold:
return cv2.resize(
frame, (edge_length, edge_length), interpolation=cv2.INTER_AREA
)
if width > height:
start_x = (width - height) // 2
cropped = frame[:, start_x : start_x + height]
else:
start_y = (height - width) // 2
cropped = frame[start_y : start_y + width, :]
return cv2.resize(cropped, (edge_length, edge_length), interpolation=cv2.INTER_AREA)
def relative_to_absolute(
from_mother_wh: Tuple[int, int],
to_mother_wh: Tuple[int, int],
from_child_xyxy: Union[List[float], np.ndarray],
to_mother_xy: Tuple[int, int] = (0, 0),
) -> list[int]:
"""
Convert relative box coordinates from resized sub-frame to absolute coordinates.
Example:
- detection happens on resized square hand frame
- convert detected phone box back to original frame coordinates
"""
from_mother_w, from_mother_h = from_mother_wh
to_mother_w, to_mother_h = to_mother_wh
offset_x, offset_y = to_mother_xy
scale_x = to_mother_w / (from_mother_w + np.finfo(np.float32).eps)
scale_y = to_mother_h / (from_mother_h + np.finfo(np.float32).eps)
x1, y1, x2, y2 = from_child_xyxy[:4]
abs_x1 = int(x1 * scale_x + offset_x)
abs_y1 = int(y1 * scale_y + offset_y)
abs_x2 = int(x2 * scale_x + offset_x)
abs_y2 = int(y2 * scale_y + offset_y)
return [abs_x1, abs_y1, abs_x2, abs_y2]
def init_websocket(server_url: str) -> Any | None:
"""
Initialize websocket connection safely.
"""
if websocket is None:
return None
try:
ws = websocket.WebSocket()
ws.connect(server_url)
return ws
except Exception:
return None
def yield_video_feed(frame_to_yield: np.ndarray, title: str = "", ws=None) -> None:
"""
Show local OpenCV window and optionally send frame through websocket.
"""
if ws is not None:
_, jpeg_encoded = cv2.imencode(".jpg", frame_to_yield)
jpeg_bytes = jpeg_encoded.tobytes()
jpeg_base64 = base64.b64encode(jpeg_bytes).decode("utf-8")
ws.send(
json.dumps(
{
"frameBase64": jpeg_base64,
"timestamp": f"{float(time.time()):.3f}",
}
)
)
cv2.imshow(title, frame_to_yield)
def announce_face_frame(face_frames: list[np.ndarray], ws) -> None:
"""
Send face crops to websocket client.
"""
encoded_frames = []
for frame in face_frames:
try:
_, jpeg_encoded = cv2.imencode(".jpg", frame)
jpeg_bytes = jpeg_encoded.tobytes()
jpeg_base64 = base64.b64encode(jpeg_bytes).decode("utf-8")
encoded_frames.append(jpeg_base64)
except Exception:
continue
if len(encoded_frames) <= 0:
return
ws.send(
json.dumps(
{
"announced_face_frames": encoded_frames,
"timestamp": f"{float(time.time()):.3f}",
}
)
)