Pedestrian_Analysis / model_utils.py
gajjukhan's picture
Update model_utils.py
d26e221 verified
"""
PP-Human Pedestrian Analysis β€” PaddlePaddle Inference.
Downloads Paddle models β†’ uses Paddle Inference API directly for CPU inference.
"""
import os
import urllib.request
import zipfile
import numpy as np
import cv2
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
MODEL_CACHE = os.path.join(os.path.expanduser("~"), "models")
# Use the SMALL model for speed (swap to _l_ for accuracy)
DET_MODEL_URL = "https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_pipeline.zip"
ATTR_MODEL_URL = "https://bj.bcebos.com/v1/paddledet/models/pipeline/PPLCNet_x1_0_person_attribute_945_infer.zip"
DET_INPUT_SHAPE = (640, 640)
ATTR_INPUT_SHAPE = (256, 192) # H, W
MAX_IMAGE_SIDE = 960 # downscale large images
ATTR_GROUPS = {
"Age": [("Over 60", 19), ("18-60", 20), ("Under 18", 21)],
"Gender": [("Female", 22), ("Male", -22)],
"Direction": [("Front", 23), ("Side", 24), ("Back", 25)],
"Accessories": [("Hat", 0), ("Glasses", 1)],
"Upper Body": [("Short Sleeve", 2), ("Long Sleeve", 3)],
"Upper Style": [("Stride", 4), ("Logo", 5), ("Plaid", 6), ("Splice", 7)],
"Lower Body": [("Trousers", 11), ("Shorts", 12), ("Skirt/Dress", 13)],
"Lower Style": [("Stripe", 8), ("Pattern", 9)],
"Coat": [("Long Coat", 10)],
"Shoes": [("Boots", 14)],
"Bag": [("Handbag", 15), ("Shoulder Bag", 16), ("Backpack", 17)],
"Action": [("Holding Objects", 18)],
}
# ---------------------------------------------------------------------------
# Download helpers
# ---------------------------------------------------------------------------
class _ProgressBar(tqdm):
def update_to(self, b=1, bsize=1, tsize=None):
if tsize is not None:
self.total = tsize
self.update(b * bsize - self.n)
def _download_and_extract(url: str, dest_dir: str) -> str:
"""Download zip, extract, return model directory path."""
os.makedirs(dest_dir, exist_ok=True)
fname = url.split("/")[-1]
local_path = os.path.join(dest_dir, fname)
# Check if already extracted
if os.path.exists(dest_dir):
for d in os.listdir(dest_dir):
full = os.path.join(dest_dir, d)
if os.path.isdir(full):
if any(f.endswith(".pdmodel") for f in os.listdir(full)):
print(f" [cached] {full}")
return full
print(f" [download] {fname}")
with _ProgressBar(unit="B", unit_scale=True, miniters=1, desc=fname) as t:
urllib.request.urlretrieve(url, filename=local_path, reporthook=t.update_to)
if fname.endswith(".zip"):
with zipfile.ZipFile(local_path, "r") as z:
z.extractall(dest_dir)
os.remove(local_path)
for d in sorted(os.listdir(dest_dir)):
full = os.path.join(dest_dir, d)
if os.path.isdir(full):
return full
return dest_dir
# ---------------------------------------------------------------------------
# Find Paddle model files
# ---------------------------------------------------------------------------
def _find_paddle_files(model_dir: str):
pdmodel, pdiparams = None, None
for f in os.listdir(model_dir):
if f.endswith(".pdmodel"):
pdmodel = os.path.join(model_dir, f)
elif f.endswith(".pdiparams"):
pdiparams = os.path.join(model_dir, f)
return pdmodel, pdiparams
# ---------------------------------------------------------------------------
# Paddle Inference predictors
# ---------------------------------------------------------------------------
_det_predictor = None
_attr_predictor = None
def _create_predictor(model_dir: str):
"""Create a Paddle Inference predictor from a model directory."""
from paddle.inference import Config, create_predictor
pdmodel, pdiparams = _find_paddle_files(model_dir)
if pdmodel is None or pdiparams is None:
raise FileNotFoundError(f"No .pdmodel/.pdiparams in {model_dir}")
config = Config(pdmodel, pdiparams)
config.disable_gpu()
config.set_cpu_math_library_num_threads(6)
config.switch_ir_optim(True)
config.enable_memory_optim()
config.enable_mkldnn()
config.set_mkldnn_cache_capacity(10)
config.disable_glog_info()
predictor = create_predictor(config)
return predictor
def load_models():
"""Download and load detection + attribute models with Paddle Inference."""
global _det_predictor, _attr_predictor
print("[1/3] Downloading detection model...")
det_dir = _download_and_extract(DET_MODEL_URL, os.path.join(MODEL_CACHE, "det"))
print("[2/3] Downloading attribute model...")
attr_dir = _download_and_extract(ATTR_MODEL_URL, os.path.join(MODEL_CACHE, "attr"))
print("[3/3] Loading Paddle Inference predictors...")
_det_predictor = _create_predictor(det_dir)
_attr_predictor = _create_predictor(attr_dir)
# Warmup
_warmup()
print("[OK] Models ready.")
def _warmup():
"""Run dummy inference to trigger optimizations."""
# Detection warmup
dummy_img = np.zeros((1, 3, *DET_INPUT_SHAPE), dtype=np.float32)
dummy_scale = np.array([[1.0, 1.0]], dtype=np.float32)
for name in _det_predictor.get_input_names():
handle = _det_predictor.get_input_handle(name)
if "image" in name.lower() or name == "image":
handle.reshape(dummy_img.shape)
handle.copy_from_cpu(dummy_img)
elif "scale" in name.lower():
handle.reshape(dummy_scale.shape)
handle.copy_from_cpu(dummy_scale)
try:
_det_predictor.run()
except Exception:
pass
# Attribute warmup
dummy_attr = np.zeros((1, 3, ATTR_INPUT_SHAPE[0], ATTR_INPUT_SHAPE[1]), dtype=np.float32)
attr_names = _attr_predictor.get_input_names()
handle = _attr_predictor.get_input_handle(attr_names[0])
handle.reshape(dummy_attr.shape)
handle.copy_from_cpu(dummy_attr)
try:
_attr_predictor.run()
except Exception:
pass
# ---------------------------------------------------------------------------
# Preprocessing
# ---------------------------------------------------------------------------
_MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32)
_STD = np.array([0.229, 0.224, 0.225], dtype=np.float32)
def _preprocess_det(image: np.ndarray):
h, w = image.shape[:2]
th, tw = DET_INPUT_SHAPE
resized = cv2.resize(image, (tw, th), interpolation=cv2.INTER_LINEAR)
img = resized.astype(np.float32) * (1.0 / 255.0)
img = (img - _MEAN) / _STD
img = img.transpose(2, 0, 1)[np.newaxis]
scale = np.array([[float(th) / h, float(tw) / w]], dtype=np.float32)
return img, scale
def _preprocess_attr(crop: np.ndarray):
h, w = ATTR_INPUT_SHAPE
resized = cv2.resize(crop, (w, h), interpolation=cv2.INTER_LINEAR)
img = resized.astype(np.float32) * (1.0 / 255.0)
img = (img - _MEAN) / _STD
img = img.transpose(2, 0, 1)[np.newaxis]
return img
# ---------------------------------------------------------------------------
# Inference
# ---------------------------------------------------------------------------
def detect_pedestrians(image: np.ndarray, conf_thresh: float = 0.5):
if _det_predictor is None:
load_models()
img, scale = _preprocess_det(image)
for name in _det_predictor.get_input_names():
handle = _det_predictor.get_input_handle(name)
if "image" in name.lower() or name == "image":
handle.reshape(img.shape)
handle.copy_from_cpu(img)
elif "scale" in name.lower():
handle.reshape(scale.shape)
handle.copy_from_cpu(scale)
elif "im_shape" in name.lower():
im_shape = np.array([[DET_INPUT_SHAPE[0], DET_INPUT_SHAPE[1]]], dtype=np.float32)
handle.reshape(im_shape.shape)
handle.copy_from_cpu(im_shape)
_det_predictor.run()
output_names = _det_predictor.get_output_names()
output_handle = _det_predictor.get_output_handle(output_names[0])
dets = output_handle.copy_to_cpu()
boxes = []
h, w = image.shape[:2]
if dets is not None and len(dets) > 0:
for det in dets:
if len(det) >= 6:
_, score, x1, y1, x2, y2 = det[:6]
if score >= conf_thresh:
x1, y1 = max(0, int(x1)), max(0, int(y1))
x2, y2 = min(w, int(x2)), min(h, int(y2))
if x2 > x1 and y2 > y1:
boxes.append((x1, y1, x2, y2, float(score)))
return boxes
def recognize_attributes(image: np.ndarray, box):
if _attr_predictor is None:
load_models()
x1, y1, x2, y2 = box[:4]
crop = image[y1:y2, x1:x2]
if crop.size == 0:
return {}
crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
img = _preprocess_attr(crop_rgb)
input_names = _attr_predictor.get_input_names()
handle = _attr_predictor.get_input_handle(input_names[0])
handle.reshape(img.shape)
handle.copy_from_cpu(img)
_attr_predictor.run()
output_names = _attr_predictor.get_output_names()
output_handle = _attr_predictor.get_output_handle(output_names[0])
logits = output_handle.copy_to_cpu()[0]
probs = 1.0 / (1.0 + np.exp(-logits))
result = {}
for group, attrs in ATTR_GROUPS.items():
if group in ("Age", "Gender", "Direction"):
best_label, best_score = None, -1
for label, idx in attrs:
s = (1.0 - probs[abs(idx)]) if idx < 0 else (probs[idx] if idx < len(probs) else 0.0)
if s > best_score:
best_score, best_label = s, label
if best_label:
result[group] = (best_label, float(best_score))
else:
for label, idx in attrs:
if idx < len(probs) and probs[idx] > 0.5:
result[label] = float(probs[idx])
return result
# ---------------------------------------------------------------------------
# Batch attribute recognition (faster for multiple pedestrians)
# ---------------------------------------------------------------------------
def recognize_attributes_batch(image: np.ndarray, boxes):
"""Process all detected pedestrians in one batch for speed."""
if not boxes:
return []
if _attr_predictor is None:
load_models()
crops = []
valid_indices = []
for i, box in enumerate(boxes):
x1, y1, x2, y2 = box[:4]
crop = image[y1:y2, x1:x2]
if crop.size > 0:
crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
crops.append(_preprocess_attr(crop_rgb))
valid_indices.append(i)
if not crops:
return [{} for _ in boxes]
# Stack into batch
batch = np.concatenate(crops, axis=0)
input_names = _attr_predictor.get_input_names()
handle = _attr_predictor.get_input_handle(input_names[0])
handle.reshape(batch.shape)
handle.copy_from_cpu(batch)
_attr_predictor.run()
output_names = _attr_predictor.get_output_names()
output_handle = _attr_predictor.get_output_handle(output_names[0])
all_logits = output_handle.copy_to_cpu()
all_attrs = [{} for _ in boxes]
for batch_idx, orig_idx in enumerate(valid_indices):
logits = all_logits[batch_idx]
probs = 1.0 / (1.0 + np.exp(-logits))
result = {}
for group, attrs in ATTR_GROUPS.items():
if group in ("Age", "Gender", "Direction"):
best_label, best_score = None, -1
for label, idx in attrs:
s = (1.0 - probs[abs(idx)]) if idx < 0 else (probs[idx] if idx < len(probs) else 0.0)
if s > best_score:
best_score, best_label = s, label
if best_label:
result[group] = (best_label, float(best_score))
else:
for label, idx in attrs:
if idx < len(probs) and probs[idx] > 0.5:
result[label] = float(probs[idx])
all_attrs[orig_idx] = result
return all_attrs
# ---------------------------------------------------------------------------
# Drawing
# ---------------------------------------------------------------------------
PALETTE = [
(78, 172, 248), (120, 230, 150), (255, 180, 80), (200, 130, 255),
(255, 120, 120), (100, 220, 220), (255, 200, 150), (180, 180, 255),
]
def draw_results(image, detections, attributes, show_attrs=True):
canvas = image.copy()
h, w = canvas.shape[:2]
s = max(w, h) / 1000.0
fs = max(0.45, s * 0.55)
thick = max(1, int(s * 2))
bthick = max(2, int(s * 2.5))
attr_fs = fs * 0.75
is_single = len(detections) == 1
for i, (det, attrs) in enumerate(zip(detections, attributes)):
x1, y1, x2, y2, score = det
c = PALETTE[i % len(PALETTE)]
cv2.rectangle(canvas, (x1, y1), (x2, y2), c, bthick)
# --- Detection header label (always on top of box) ---
hdr = f"#{i+1} {score:.0%}"
(htw, hth), _ = cv2.getTextSize(hdr, cv2.FONT_HERSHEY_SIMPLEX, fs, thick)
hdr_h = hth + 10
cv2.rectangle(canvas, (x1, y1 - hdr_h), (x1 + htw + 6, y1), c, -1)
cv2.putText(canvas, hdr, (x1 + 3, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, fs, (255, 255, 255), thick)
if not show_attrs or not attrs:
continue
# --- Build attribute text lines (ONE attribute per line) ---
lines = []
if "Gender" in attrs:
lines.append(f"Gender: {attrs['Gender'][0]}")
if "Age" in attrs:
lines.append(f"Age: {attrs['Age'][0]}")
if "Direction" in attrs:
lines.append(f"Dir: {attrs['Direction'][0]}")
# Add each extra attribute as its OWN line (not joined)
for k, v in attrs.items():
if k in ("Gender", "Age", "Direction"):
continue
if isinstance(v, tuple):
lines.append(f"{v[0]}")
else:
lines.append(f"{k}")
if not lines:
continue
(_, lth), _ = cv2.getTextSize("Ag", cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)
lh = lth + 8
pad = 5
if is_single:
# ─── SINGLE PEDESTRIAN: attributes beneath header label ───
# Limit to max 8 lines to keep it clean
display_lines = lines[:8]
mw = max(
cv2.getTextSize(l, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)[0][0]
for l in display_lines
)
block_w = mw + pad * 2
block_h = len(display_lines) * lh + pad * 2
# Position: starts right at the top edge of the bounding box
# (directly beneath the header label which sits above the box)
bx = x1
by = y1
# Clamp to image edges
if bx + block_w > w:
bx = w - block_w
if bx < 0:
bx = 0
if by + block_h > h:
by = h - block_h
# Semi-transparent background
overlay = canvas.copy()
cv2.rectangle(overlay, (bx, by), (bx + block_w, by + block_h), (20, 20, 20), -1)
cv2.addWeighted(overlay, 0.8, canvas, 0.2, 0, canvas)
for j, line in enumerate(display_lines):
ty = by + pad + (j + 1) * lh - 3
cv2.putText(canvas, line, (bx + pad, ty),
cv2.FONT_HERSHEY_SIMPLEX, attr_fs,
(255, 255, 255), 1, cv2.LINE_AA)
else:
# ─── MULTIPLE PEDESTRIANS: attributes inside box (upper-left) ───
box_w = x2 - x1
box_h = y2 - y1
# Limit lines to fit inside box height
max_lines = max(1, (box_h - pad * 2) // lh)
display_lines = lines[:max_lines]
# Truncate any line that's too wide for the box
fitted_lines = []
max_text_w = box_w - pad * 2
for line in display_lines:
(ltw, _), _ = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)
if ltw > max_text_w and max_text_w > 30:
# Truncate with ellipsis
while len(line) > 3:
line = line[:-1]
(ltw, _), _ = cv2.getTextSize(line + "..", cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)
if ltw <= max_text_w:
line = line + ".."
break
fitted_lines.append(line)
if not fitted_lines:
continue
mw = max(
cv2.getTextSize(l, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)[0][0]
for l in fitted_lines
)
block_w = min(mw + pad * 2, box_w)
block_h = min(len(fitted_lines) * lh + pad * 2, box_h)
bx = x1 + 2
by = y1 + 2
overlay = canvas.copy()
cv2.rectangle(overlay, (bx, by), (bx + block_w, by + block_h), (20, 20, 20), -1)
cv2.addWeighted(overlay, 0.75, canvas, 0.25, 0, canvas)
for j, line in enumerate(fitted_lines):
ty = by + pad + (j + 1) * lh - 3
cv2.putText(canvas, line, (bx + pad, ty),
cv2.FONT_HERSHEY_SIMPLEX, attr_fs,
(255, 255, 255), 1, cv2.LINE_AA)
# Pedestrian count badge
ct = f"Pedestrians: {len(detections)}"
(tw, th), _ = cv2.getTextSize(ct, cv2.FONT_HERSHEY_SIMPLEX, fs, thick)
cv2.rectangle(canvas, (8, 8), (tw + 24, th + 24), (30, 30, 30), -1)
cv2.putText(canvas, ct, (16, th + 16),
cv2.FONT_HERSHEY_SIMPLEX, fs, (78, 172, 248), thick, cv2.LINE_AA)
return canvas
# ---------------------------------------------------------------------------
# High-level API
# ---------------------------------------------------------------------------
def analyze_image(image: np.ndarray, conf_thresh=0.5, show_attrs=True):
bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 3 else image
# Downscale large images
h, w = bgr.shape[:2]
if max(h, w) > MAX_IMAGE_SIDE:
r = MAX_IMAGE_SIDE / max(h, w)
bgr = cv2.resize(bgr, (int(w * r), int(h * r)))
detections = detect_pedestrians(bgr, conf_thresh)
all_attrs = recognize_attributes_batch(bgr, detections)
annotated = draw_results(bgr, detections, all_attrs, show_attrs)
annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB)
lines = []
for i, (det, attrs) in enumerate(zip(detections, all_attrs)):
parts = [f"{k}: {v[0]}" if isinstance(v, tuple) else f"{k} ({v:.0%})" for k, v in attrs.items()]
line = f"**Person #{i+1}** ({det[4]:.0%})"
if parts:
line += " β€” " + ", ".join(parts)
lines.append(line)
return annotated_rgb, "\n\n".join(lines) if lines else "No pedestrians detected."