"""Shared utilities for ISR LLM modules (assessor, explainer).""" import base64 import json import logging import cv2 import numpy as np logger = logging.getLogger(__name__) def crop_and_encode(frame: np.ndarray, bbox: list, max_dim: int = 256, quality: int = 70) -> str | None: """Crop detection from frame and return base64 JPEG string. Args: frame: BGR numpy array. bbox: [x1, y1, x2, y2] pixel coordinates. max_dim: Max pixel size on longest side. quality: JPEG quality (0-100). Returns: Base64-encoded JPEG string, or None on failure. """ try: h, w = frame.shape[:2] x1 = max(0, int(bbox[0])) y1 = max(0, int(bbox[1])) x2 = min(w, int(bbox[2])) y2 = min(h, int(bbox[3])) if x2 <= x1 or y2 <= y1: return None crop = frame[y1:y2, x1:x2] longest = max(crop.shape[0], crop.shape[1]) if longest > max_dim: scale = max_dim / longest crop = cv2.resize(crop, None, fx=scale, fy=scale) _, buf = cv2.imencode(".jpg", crop, [cv2.IMWRITE_JPEG_QUALITY, quality]) return base64.b64encode(buf).decode("utf-8") except Exception: logger.warning("Failed to crop detection at bbox %s", bbox) return None def encode_frame(frame: np.ndarray, max_dim: int = 1024, quality: int = 70) -> str | None: """Encode a full frame as base64 JPEG, resized to max_dim.""" try: # Ensure uint8 3-channel BGR for JPEG encoding if frame.dtype != np.uint8: frame = np.clip(frame * 255 if frame.max() <= 1.0 else frame, 0, 255).astype(np.uint8) if frame.ndim == 2: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) elif frame.shape[2] == 4: frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) h, w = frame.shape[:2] longest = max(h, w) if longest > max_dim: scale = max_dim / longest frame = cv2.resize(frame, None, fx=scale, fy=scale) ok, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, quality]) if not ok: logger.warning("cv2.imencode failed for frame shape %s", frame.shape) return None return base64.b64encode(buf.tobytes()).decode("utf-8") except Exception: logger.warning("Failed to encode frame") return None def strip_json_fences(raw: str) -> str: """Strip markdown code fences from LLM responses.""" raw = raw.strip() if raw.startswith("```"): raw = raw.split("\n", 1)[1] if "\n" in raw else raw[3:] if raw.endswith("```"): raw = raw[:-3] raw = raw.strip() return raw def parse_llm_json(raw: str) -> dict | list | None: """Strip fences + parse JSON. Returns None on failure.""" try: cleaned = strip_json_fences(raw) return json.loads(cleaned) except (json.JSONDecodeError, ValueError): logger.warning("Failed to parse LLM JSON response") return None