""" PP-Human Pedestrian Analysis — PaddlePaddle Inference. Downloads Paddle models → uses Paddle Inference API directly for CPU inference. """ import os import urllib.request import zipfile import numpy as np import cv2 from tqdm import tqdm # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- MODEL_CACHE = os.path.join(os.path.expanduser("~"), "models") # Use the SMALL model for speed (swap to _l_ for accuracy) DET_MODEL_URL = "https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_pipeline.zip" ATTR_MODEL_URL = "https://bj.bcebos.com/v1/paddledet/models/pipeline/PPLCNet_x1_0_person_attribute_945_infer.zip" DET_INPUT_SHAPE = (640, 640) ATTR_INPUT_SHAPE = (256, 192) # H, W MAX_IMAGE_SIDE = 960 # downscale large images ATTR_GROUPS = { "Age": [("Over 60", 19), ("18-60", 20), ("Under 18", 21)], "Gender": [("Female", 22), ("Male", -22)], "Direction": [("Front", 23), ("Side", 24), ("Back", 25)], "Accessories": [("Hat", 0), ("Glasses", 1)], "Upper Body": [("Short Sleeve", 2), ("Long Sleeve", 3)], "Upper Style": [("Stride", 4), ("Logo", 5), ("Plaid", 6), ("Splice", 7)], "Lower Body": [("Trousers", 11), ("Shorts", 12), ("Skirt/Dress", 13)], "Lower Style": [("Stripe", 8), ("Pattern", 9)], "Coat": [("Long Coat", 10)], "Shoes": [("Boots", 14)], "Bag": [("Handbag", 15), ("Shoulder Bag", 16), ("Backpack", 17)], "Action": [("Holding Objects", 18)], } # --------------------------------------------------------------------------- # Download helpers # --------------------------------------------------------------------------- class _ProgressBar(tqdm): def update_to(self, b=1, bsize=1, tsize=None): if tsize is not None: self.total = tsize self.update(b * bsize - self.n) def _download_and_extract(url: str, dest_dir: str) -> str: """Download zip, extract, return model directory path.""" os.makedirs(dest_dir, exist_ok=True) fname = url.split("/")[-1] local_path = os.path.join(dest_dir, fname) # Check if already extracted if os.path.exists(dest_dir): for d in os.listdir(dest_dir): full = os.path.join(dest_dir, d) if os.path.isdir(full): if any(f.endswith(".pdmodel") for f in os.listdir(full)): print(f" [cached] {full}") return full print(f" [download] {fname}") with _ProgressBar(unit="B", unit_scale=True, miniters=1, desc=fname) as t: urllib.request.urlretrieve(url, filename=local_path, reporthook=t.update_to) if fname.endswith(".zip"): with zipfile.ZipFile(local_path, "r") as z: z.extractall(dest_dir) os.remove(local_path) for d in sorted(os.listdir(dest_dir)): full = os.path.join(dest_dir, d) if os.path.isdir(full): return full return dest_dir # --------------------------------------------------------------------------- # Find Paddle model files # --------------------------------------------------------------------------- def _find_paddle_files(model_dir: str): pdmodel, pdiparams = None, None for f in os.listdir(model_dir): if f.endswith(".pdmodel"): pdmodel = os.path.join(model_dir, f) elif f.endswith(".pdiparams"): pdiparams = os.path.join(model_dir, f) return pdmodel, pdiparams # --------------------------------------------------------------------------- # Paddle Inference predictors # --------------------------------------------------------------------------- _det_predictor = None _attr_predictor = None def _create_predictor(model_dir: str): """Create a Paddle Inference predictor from a model directory.""" from paddle.inference import Config, create_predictor pdmodel, pdiparams = _find_paddle_files(model_dir) if pdmodel is None or pdiparams is None: raise FileNotFoundError(f"No .pdmodel/.pdiparams in {model_dir}") config = Config(pdmodel, pdiparams) config.disable_gpu() config.set_cpu_math_library_num_threads(6) config.switch_ir_optim(True) config.enable_memory_optim() config.enable_mkldnn() config.set_mkldnn_cache_capacity(10) config.disable_glog_info() predictor = create_predictor(config) return predictor def load_models(): """Download and load detection + attribute models with Paddle Inference.""" global _det_predictor, _attr_predictor print("[1/3] Downloading detection model...") det_dir = _download_and_extract(DET_MODEL_URL, os.path.join(MODEL_CACHE, "det")) print("[2/3] Downloading attribute model...") attr_dir = _download_and_extract(ATTR_MODEL_URL, os.path.join(MODEL_CACHE, "attr")) print("[3/3] Loading Paddle Inference predictors...") _det_predictor = _create_predictor(det_dir) _attr_predictor = _create_predictor(attr_dir) # Warmup _warmup() print("[OK] Models ready.") def _warmup(): """Run dummy inference to trigger optimizations.""" # Detection warmup dummy_img = np.zeros((1, 3, *DET_INPUT_SHAPE), dtype=np.float32) dummy_scale = np.array([[1.0, 1.0]], dtype=np.float32) for name in _det_predictor.get_input_names(): handle = _det_predictor.get_input_handle(name) if "image" in name.lower() or name == "image": handle.reshape(dummy_img.shape) handle.copy_from_cpu(dummy_img) elif "scale" in name.lower(): handle.reshape(dummy_scale.shape) handle.copy_from_cpu(dummy_scale) try: _det_predictor.run() except Exception: pass # Attribute warmup dummy_attr = np.zeros((1, 3, ATTR_INPUT_SHAPE[0], ATTR_INPUT_SHAPE[1]), dtype=np.float32) attr_names = _attr_predictor.get_input_names() handle = _attr_predictor.get_input_handle(attr_names[0]) handle.reshape(dummy_attr.shape) handle.copy_from_cpu(dummy_attr) try: _attr_predictor.run() except Exception: pass # --------------------------------------------------------------------------- # Preprocessing # --------------------------------------------------------------------------- _MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32) _STD = np.array([0.229, 0.224, 0.225], dtype=np.float32) def _preprocess_det(image: np.ndarray): h, w = image.shape[:2] th, tw = DET_INPUT_SHAPE resized = cv2.resize(image, (tw, th), interpolation=cv2.INTER_LINEAR) img = resized.astype(np.float32) * (1.0 / 255.0) img = (img - _MEAN) / _STD img = img.transpose(2, 0, 1)[np.newaxis] scale = np.array([[float(th) / h, float(tw) / w]], dtype=np.float32) return img, scale def _preprocess_attr(crop: np.ndarray): h, w = ATTR_INPUT_SHAPE resized = cv2.resize(crop, (w, h), interpolation=cv2.INTER_LINEAR) img = resized.astype(np.float32) * (1.0 / 255.0) img = (img - _MEAN) / _STD img = img.transpose(2, 0, 1)[np.newaxis] return img # --------------------------------------------------------------------------- # Inference # --------------------------------------------------------------------------- def detect_pedestrians(image: np.ndarray, conf_thresh: float = 0.5): if _det_predictor is None: load_models() img, scale = _preprocess_det(image) for name in _det_predictor.get_input_names(): handle = _det_predictor.get_input_handle(name) if "image" in name.lower() or name == "image": handle.reshape(img.shape) handle.copy_from_cpu(img) elif "scale" in name.lower(): handle.reshape(scale.shape) handle.copy_from_cpu(scale) elif "im_shape" in name.lower(): im_shape = np.array([[DET_INPUT_SHAPE[0], DET_INPUT_SHAPE[1]]], dtype=np.float32) handle.reshape(im_shape.shape) handle.copy_from_cpu(im_shape) _det_predictor.run() output_names = _det_predictor.get_output_names() output_handle = _det_predictor.get_output_handle(output_names[0]) dets = output_handle.copy_to_cpu() boxes = [] h, w = image.shape[:2] if dets is not None and len(dets) > 0: for det in dets: if len(det) >= 6: _, score, x1, y1, x2, y2 = det[:6] if score >= conf_thresh: x1, y1 = max(0, int(x1)), max(0, int(y1)) x2, y2 = min(w, int(x2)), min(h, int(y2)) if x2 > x1 and y2 > y1: boxes.append((x1, y1, x2, y2, float(score))) return boxes def recognize_attributes(image: np.ndarray, box): if _attr_predictor is None: load_models() x1, y1, x2, y2 = box[:4] crop = image[y1:y2, x1:x2] if crop.size == 0: return {} crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) img = _preprocess_attr(crop_rgb) input_names = _attr_predictor.get_input_names() handle = _attr_predictor.get_input_handle(input_names[0]) handle.reshape(img.shape) handle.copy_from_cpu(img) _attr_predictor.run() output_names = _attr_predictor.get_output_names() output_handle = _attr_predictor.get_output_handle(output_names[0]) logits = output_handle.copy_to_cpu()[0] probs = 1.0 / (1.0 + np.exp(-logits)) result = {} for group, attrs in ATTR_GROUPS.items(): if group in ("Age", "Gender", "Direction"): best_label, best_score = None, -1 for label, idx in attrs: s = (1.0 - probs[abs(idx)]) if idx < 0 else (probs[idx] if idx < len(probs) else 0.0) if s > best_score: best_score, best_label = s, label if best_label: result[group] = (best_label, float(best_score)) else: for label, idx in attrs: if idx < len(probs) and probs[idx] > 0.5: result[label] = float(probs[idx]) return result # --------------------------------------------------------------------------- # Batch attribute recognition (faster for multiple pedestrians) # --------------------------------------------------------------------------- def recognize_attributes_batch(image: np.ndarray, boxes): """Process all detected pedestrians in one batch for speed.""" if not boxes: return [] if _attr_predictor is None: load_models() crops = [] valid_indices = [] for i, box in enumerate(boxes): x1, y1, x2, y2 = box[:4] crop = image[y1:y2, x1:x2] if crop.size > 0: crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) crops.append(_preprocess_attr(crop_rgb)) valid_indices.append(i) if not crops: return [{} for _ in boxes] # Stack into batch batch = np.concatenate(crops, axis=0) input_names = _attr_predictor.get_input_names() handle = _attr_predictor.get_input_handle(input_names[0]) handle.reshape(batch.shape) handle.copy_from_cpu(batch) _attr_predictor.run() output_names = _attr_predictor.get_output_names() output_handle = _attr_predictor.get_output_handle(output_names[0]) all_logits = output_handle.copy_to_cpu() all_attrs = [{} for _ in boxes] for batch_idx, orig_idx in enumerate(valid_indices): logits = all_logits[batch_idx] probs = 1.0 / (1.0 + np.exp(-logits)) result = {} for group, attrs in ATTR_GROUPS.items(): if group in ("Age", "Gender", "Direction"): best_label, best_score = None, -1 for label, idx in attrs: s = (1.0 - probs[abs(idx)]) if idx < 0 else (probs[idx] if idx < len(probs) else 0.0) if s > best_score: best_score, best_label = s, label if best_label: result[group] = (best_label, float(best_score)) else: for label, idx in attrs: if idx < len(probs) and probs[idx] > 0.5: result[label] = float(probs[idx]) all_attrs[orig_idx] = result return all_attrs # --------------------------------------------------------------------------- # Drawing # --------------------------------------------------------------------------- PALETTE = [ (78, 172, 248), (120, 230, 150), (255, 180, 80), (200, 130, 255), (255, 120, 120), (100, 220, 220), (255, 200, 150), (180, 180, 255), ] def draw_results(image, detections, attributes, show_attrs=True): canvas = image.copy() h, w = canvas.shape[:2] s = max(w, h) / 1000.0 fs = max(0.45, s * 0.55) thick = max(1, int(s * 2)) bthick = max(2, int(s * 2.5)) attr_fs = fs * 0.75 is_single = len(detections) == 1 for i, (det, attrs) in enumerate(zip(detections, attributes)): x1, y1, x2, y2, score = det c = PALETTE[i % len(PALETTE)] cv2.rectangle(canvas, (x1, y1), (x2, y2), c, bthick) # --- Detection header label (always on top of box) --- hdr = f"#{i+1} {score:.0%}" (htw, hth), _ = cv2.getTextSize(hdr, cv2.FONT_HERSHEY_SIMPLEX, fs, thick) hdr_h = hth + 10 cv2.rectangle(canvas, (x1, y1 - hdr_h), (x1 + htw + 6, y1), c, -1) cv2.putText(canvas, hdr, (x1 + 3, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, fs, (255, 255, 255), thick) if not show_attrs or not attrs: continue # --- Build attribute text lines (ONE attribute per line) --- lines = [] if "Gender" in attrs: lines.append(f"Gender: {attrs['Gender'][0]}") if "Age" in attrs: lines.append(f"Age: {attrs['Age'][0]}") if "Direction" in attrs: lines.append(f"Dir: {attrs['Direction'][0]}") # Add each extra attribute as its OWN line (not joined) for k, v in attrs.items(): if k in ("Gender", "Age", "Direction"): continue if isinstance(v, tuple): lines.append(f"{v[0]}") else: lines.append(f"{k}") if not lines: continue (_, lth), _ = cv2.getTextSize("Ag", cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1) lh = lth + 8 pad = 5 if is_single: # ─── SINGLE PEDESTRIAN: attributes beneath header label ─── # Limit to max 8 lines to keep it clean display_lines = lines[:8] mw = max( cv2.getTextSize(l, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)[0][0] for l in display_lines ) block_w = mw + pad * 2 block_h = len(display_lines) * lh + pad * 2 # Position: starts right at the top edge of the bounding box # (directly beneath the header label which sits above the box) bx = x1 by = y1 # Clamp to image edges if bx + block_w > w: bx = w - block_w if bx < 0: bx = 0 if by + block_h > h: by = h - block_h # Semi-transparent background overlay = canvas.copy() cv2.rectangle(overlay, (bx, by), (bx + block_w, by + block_h), (20, 20, 20), -1) cv2.addWeighted(overlay, 0.8, canvas, 0.2, 0, canvas) for j, line in enumerate(display_lines): ty = by + pad + (j + 1) * lh - 3 cv2.putText(canvas, line, (bx + pad, ty), cv2.FONT_HERSHEY_SIMPLEX, attr_fs, (255, 255, 255), 1, cv2.LINE_AA) else: # ─── MULTIPLE PEDESTRIANS: attributes inside box (upper-left) ─── box_w = x2 - x1 box_h = y2 - y1 # Limit lines to fit inside box height max_lines = max(1, (box_h - pad * 2) // lh) display_lines = lines[:max_lines] # Truncate any line that's too wide for the box fitted_lines = [] max_text_w = box_w - pad * 2 for line in display_lines: (ltw, _), _ = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1) if ltw > max_text_w and max_text_w > 30: # Truncate with ellipsis while len(line) > 3: line = line[:-1] (ltw, _), _ = cv2.getTextSize(line + "..", cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1) if ltw <= max_text_w: line = line + ".." break fitted_lines.append(line) if not fitted_lines: continue mw = max( cv2.getTextSize(l, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)[0][0] for l in fitted_lines ) block_w = min(mw + pad * 2, box_w) block_h = min(len(fitted_lines) * lh + pad * 2, box_h) bx = x1 + 2 by = y1 + 2 overlay = canvas.copy() cv2.rectangle(overlay, (bx, by), (bx + block_w, by + block_h), (20, 20, 20), -1) cv2.addWeighted(overlay, 0.75, canvas, 0.25, 0, canvas) for j, line in enumerate(fitted_lines): ty = by + pad + (j + 1) * lh - 3 cv2.putText(canvas, line, (bx + pad, ty), cv2.FONT_HERSHEY_SIMPLEX, attr_fs, (255, 255, 255), 1, cv2.LINE_AA) # Pedestrian count badge ct = f"Pedestrians: {len(detections)}" (tw, th), _ = cv2.getTextSize(ct, cv2.FONT_HERSHEY_SIMPLEX, fs, thick) cv2.rectangle(canvas, (8, 8), (tw + 24, th + 24), (30, 30, 30), -1) cv2.putText(canvas, ct, (16, th + 16), cv2.FONT_HERSHEY_SIMPLEX, fs, (78, 172, 248), thick, cv2.LINE_AA) return canvas # --------------------------------------------------------------------------- # High-level API # --------------------------------------------------------------------------- def analyze_image(image: np.ndarray, conf_thresh=0.5, show_attrs=True): bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 3 else image # Downscale large images h, w = bgr.shape[:2] if max(h, w) > MAX_IMAGE_SIDE: r = MAX_IMAGE_SIDE / max(h, w) bgr = cv2.resize(bgr, (int(w * r), int(h * r))) detections = detect_pedestrians(bgr, conf_thresh) all_attrs = recognize_attributes_batch(bgr, detections) annotated = draw_results(bgr, detections, all_attrs, show_attrs) annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) lines = [] for i, (det, attrs) in enumerate(zip(detections, all_attrs)): parts = [f"{k}: {v[0]}" if isinstance(v, tuple) else f"{k} ({v:.0%})" for k, v in attrs.items()] line = f"**Person #{i+1}** ({det[4]:.0%})" if parts: line += " — " + ", ".join(parts) lines.append(line) return annotated_rgb, "\n\n".join(lines) if lines else "No pedestrians detected."