ISR / models /isr /utils.py
Zhen Ye
refactor: migrate to uv, prompt-tune BAML mission planner and assessor
18a11bc
"""Shared utilities for ISR LLM modules (assessor, explainer)."""
import base64
import json
import logging
import cv2
import numpy as np
logger = logging.getLogger(__name__)
def crop_and_encode(frame: np.ndarray, bbox: list, max_dim: int = 256, quality: int = 70) -> str | None:
"""Crop detection from frame and return base64 JPEG string.
Args:
frame: BGR numpy array.
bbox: [x1, y1, x2, y2] pixel coordinates.
max_dim: Max pixel size on longest side.
quality: JPEG quality (0-100).
Returns:
Base64-encoded JPEG string, or None on failure.
"""
try:
h, w = frame.shape[:2]
x1 = max(0, int(bbox[0]))
y1 = max(0, int(bbox[1]))
x2 = min(w, int(bbox[2]))
y2 = min(h, int(bbox[3]))
if x2 <= x1 or y2 <= y1:
return None
crop = frame[y1:y2, x1:x2]
longest = max(crop.shape[0], crop.shape[1])
if longest > max_dim:
scale = max_dim / longest
crop = cv2.resize(crop, None, fx=scale, fy=scale)
_, buf = cv2.imencode(".jpg", crop, [cv2.IMWRITE_JPEG_QUALITY, quality])
return base64.b64encode(buf).decode("utf-8")
except Exception:
logger.warning("Failed to crop detection at bbox %s", bbox)
return None
def encode_frame(frame: np.ndarray, max_dim: int = 1024, quality: int = 70) -> str | None:
"""Encode a full frame as base64 JPEG, resized to max_dim."""
try:
# Ensure uint8 3-channel BGR for JPEG encoding
if frame.dtype != np.uint8:
frame = np.clip(frame * 255 if frame.max() <= 1.0 else frame, 0, 255).astype(np.uint8)
if frame.ndim == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR)
elif frame.shape[2] == 4:
frame = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR)
h, w = frame.shape[:2]
longest = max(h, w)
if longest > max_dim:
scale = max_dim / longest
frame = cv2.resize(frame, None, fx=scale, fy=scale)
ok, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, quality])
if not ok:
logger.warning("cv2.imencode failed for frame shape %s", frame.shape)
return None
return base64.b64encode(buf.tobytes()).decode("utf-8")
except Exception:
logger.warning("Failed to encode frame")
return None
def strip_json_fences(raw: str) -> str:
"""Strip markdown code fences from LLM responses."""
raw = raw.strip()
if raw.startswith("```"):
raw = raw.split("\n", 1)[1] if "\n" in raw else raw[3:]
if raw.endswith("```"):
raw = raw[:-3]
raw = raw.strip()
return raw
def parse_llm_json(raw: str) -> dict | list | None:
"""Strip fences + parse JSON. Returns None on failure."""
try:
cleaned = strip_json_fences(raw)
return json.loads(cleaned)
except (json.JSONDecodeError, ValueError):
logger.warning("Failed to parse LLM JSON response")
return None