Spaces:
Sleeping
Sleeping
| """ | |
| PP-Human Pedestrian Analysis β PaddlePaddle Inference. | |
| Downloads Paddle models β uses Paddle Inference API directly for CPU inference. | |
| """ | |
| import os | |
| import urllib.request | |
| import zipfile | |
| import numpy as np | |
| import cv2 | |
| from tqdm import tqdm | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| MODEL_CACHE = os.path.join(os.path.expanduser("~"), "models") | |
| # Use the SMALL model for speed (swap to _l_ for accuracy) | |
| DET_MODEL_URL = "https://bj.bcebos.com/v1/paddledet/models/pipeline/mot_ppyoloe_l_36e_pipeline.zip" | |
| ATTR_MODEL_URL = "https://bj.bcebos.com/v1/paddledet/models/pipeline/PPLCNet_x1_0_person_attribute_945_infer.zip" | |
| DET_INPUT_SHAPE = (640, 640) | |
| ATTR_INPUT_SHAPE = (256, 192) # H, W | |
| MAX_IMAGE_SIDE = 960 # downscale large images | |
| ATTR_GROUPS = { | |
| "Age": [("Over 60", 19), ("18-60", 20), ("Under 18", 21)], | |
| "Gender": [("Female", 22), ("Male", -22)], | |
| "Direction": [("Front", 23), ("Side", 24), ("Back", 25)], | |
| "Accessories": [("Hat", 0), ("Glasses", 1)], | |
| "Upper Body": [("Short Sleeve", 2), ("Long Sleeve", 3)], | |
| "Upper Style": [("Stride", 4), ("Logo", 5), ("Plaid", 6), ("Splice", 7)], | |
| "Lower Body": [("Trousers", 11), ("Shorts", 12), ("Skirt/Dress", 13)], | |
| "Lower Style": [("Stripe", 8), ("Pattern", 9)], | |
| "Coat": [("Long Coat", 10)], | |
| "Shoes": [("Boots", 14)], | |
| "Bag": [("Handbag", 15), ("Shoulder Bag", 16), ("Backpack", 17)], | |
| "Action": [("Holding Objects", 18)], | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Download helpers | |
| # --------------------------------------------------------------------------- | |
| class _ProgressBar(tqdm): | |
| def update_to(self, b=1, bsize=1, tsize=None): | |
| if tsize is not None: | |
| self.total = tsize | |
| self.update(b * bsize - self.n) | |
| def _download_and_extract(url: str, dest_dir: str) -> str: | |
| """Download zip, extract, return model directory path.""" | |
| os.makedirs(dest_dir, exist_ok=True) | |
| fname = url.split("/")[-1] | |
| local_path = os.path.join(dest_dir, fname) | |
| # Check if already extracted | |
| if os.path.exists(dest_dir): | |
| for d in os.listdir(dest_dir): | |
| full = os.path.join(dest_dir, d) | |
| if os.path.isdir(full): | |
| if any(f.endswith(".pdmodel") for f in os.listdir(full)): | |
| print(f" [cached] {full}") | |
| return full | |
| print(f" [download] {fname}") | |
| with _ProgressBar(unit="B", unit_scale=True, miniters=1, desc=fname) as t: | |
| urllib.request.urlretrieve(url, filename=local_path, reporthook=t.update_to) | |
| if fname.endswith(".zip"): | |
| with zipfile.ZipFile(local_path, "r") as z: | |
| z.extractall(dest_dir) | |
| os.remove(local_path) | |
| for d in sorted(os.listdir(dest_dir)): | |
| full = os.path.join(dest_dir, d) | |
| if os.path.isdir(full): | |
| return full | |
| return dest_dir | |
| # --------------------------------------------------------------------------- | |
| # Find Paddle model files | |
| # --------------------------------------------------------------------------- | |
| def _find_paddle_files(model_dir: str): | |
| pdmodel, pdiparams = None, None | |
| for f in os.listdir(model_dir): | |
| if f.endswith(".pdmodel"): | |
| pdmodel = os.path.join(model_dir, f) | |
| elif f.endswith(".pdiparams"): | |
| pdiparams = os.path.join(model_dir, f) | |
| return pdmodel, pdiparams | |
| # --------------------------------------------------------------------------- | |
| # Paddle Inference predictors | |
| # --------------------------------------------------------------------------- | |
| _det_predictor = None | |
| _attr_predictor = None | |
| def _create_predictor(model_dir: str): | |
| """Create a Paddle Inference predictor from a model directory.""" | |
| from paddle.inference import Config, create_predictor | |
| pdmodel, pdiparams = _find_paddle_files(model_dir) | |
| if pdmodel is None or pdiparams is None: | |
| raise FileNotFoundError(f"No .pdmodel/.pdiparams in {model_dir}") | |
| config = Config(pdmodel, pdiparams) | |
| config.disable_gpu() | |
| config.set_cpu_math_library_num_threads(6) | |
| config.switch_ir_optim(True) | |
| config.enable_memory_optim() | |
| config.enable_mkldnn() | |
| config.set_mkldnn_cache_capacity(10) | |
| config.disable_glog_info() | |
| predictor = create_predictor(config) | |
| return predictor | |
| def load_models(): | |
| """Download and load detection + attribute models with Paddle Inference.""" | |
| global _det_predictor, _attr_predictor | |
| print("[1/3] Downloading detection model...") | |
| det_dir = _download_and_extract(DET_MODEL_URL, os.path.join(MODEL_CACHE, "det")) | |
| print("[2/3] Downloading attribute model...") | |
| attr_dir = _download_and_extract(ATTR_MODEL_URL, os.path.join(MODEL_CACHE, "attr")) | |
| print("[3/3] Loading Paddle Inference predictors...") | |
| _det_predictor = _create_predictor(det_dir) | |
| _attr_predictor = _create_predictor(attr_dir) | |
| # Warmup | |
| _warmup() | |
| print("[OK] Models ready.") | |
| def _warmup(): | |
| """Run dummy inference to trigger optimizations.""" | |
| # Detection warmup | |
| dummy_img = np.zeros((1, 3, *DET_INPUT_SHAPE), dtype=np.float32) | |
| dummy_scale = np.array([[1.0, 1.0]], dtype=np.float32) | |
| for name in _det_predictor.get_input_names(): | |
| handle = _det_predictor.get_input_handle(name) | |
| if "image" in name.lower() or name == "image": | |
| handle.reshape(dummy_img.shape) | |
| handle.copy_from_cpu(dummy_img) | |
| elif "scale" in name.lower(): | |
| handle.reshape(dummy_scale.shape) | |
| handle.copy_from_cpu(dummy_scale) | |
| try: | |
| _det_predictor.run() | |
| except Exception: | |
| pass | |
| # Attribute warmup | |
| dummy_attr = np.zeros((1, 3, ATTR_INPUT_SHAPE[0], ATTR_INPUT_SHAPE[1]), dtype=np.float32) | |
| attr_names = _attr_predictor.get_input_names() | |
| handle = _attr_predictor.get_input_handle(attr_names[0]) | |
| handle.reshape(dummy_attr.shape) | |
| handle.copy_from_cpu(dummy_attr) | |
| try: | |
| _attr_predictor.run() | |
| except Exception: | |
| pass | |
| # --------------------------------------------------------------------------- | |
| # Preprocessing | |
| # --------------------------------------------------------------------------- | |
| _MEAN = np.array([0.485, 0.456, 0.406], dtype=np.float32) | |
| _STD = np.array([0.229, 0.224, 0.225], dtype=np.float32) | |
| def _preprocess_det(image: np.ndarray): | |
| h, w = image.shape[:2] | |
| th, tw = DET_INPUT_SHAPE | |
| resized = cv2.resize(image, (tw, th), interpolation=cv2.INTER_LINEAR) | |
| img = resized.astype(np.float32) * (1.0 / 255.0) | |
| img = (img - _MEAN) / _STD | |
| img = img.transpose(2, 0, 1)[np.newaxis] | |
| scale = np.array([[float(th) / h, float(tw) / w]], dtype=np.float32) | |
| return img, scale | |
| def _preprocess_attr(crop: np.ndarray): | |
| h, w = ATTR_INPUT_SHAPE | |
| resized = cv2.resize(crop, (w, h), interpolation=cv2.INTER_LINEAR) | |
| img = resized.astype(np.float32) * (1.0 / 255.0) | |
| img = (img - _MEAN) / _STD | |
| img = img.transpose(2, 0, 1)[np.newaxis] | |
| return img | |
| # --------------------------------------------------------------------------- | |
| # Inference | |
| # --------------------------------------------------------------------------- | |
| def detect_pedestrians(image: np.ndarray, conf_thresh: float = 0.5): | |
| if _det_predictor is None: | |
| load_models() | |
| img, scale = _preprocess_det(image) | |
| for name in _det_predictor.get_input_names(): | |
| handle = _det_predictor.get_input_handle(name) | |
| if "image" in name.lower() or name == "image": | |
| handle.reshape(img.shape) | |
| handle.copy_from_cpu(img) | |
| elif "scale" in name.lower(): | |
| handle.reshape(scale.shape) | |
| handle.copy_from_cpu(scale) | |
| elif "im_shape" in name.lower(): | |
| im_shape = np.array([[DET_INPUT_SHAPE[0], DET_INPUT_SHAPE[1]]], dtype=np.float32) | |
| handle.reshape(im_shape.shape) | |
| handle.copy_from_cpu(im_shape) | |
| _det_predictor.run() | |
| output_names = _det_predictor.get_output_names() | |
| output_handle = _det_predictor.get_output_handle(output_names[0]) | |
| dets = output_handle.copy_to_cpu() | |
| boxes = [] | |
| h, w = image.shape[:2] | |
| if dets is not None and len(dets) > 0: | |
| for det in dets: | |
| if len(det) >= 6: | |
| _, score, x1, y1, x2, y2 = det[:6] | |
| if score >= conf_thresh: | |
| x1, y1 = max(0, int(x1)), max(0, int(y1)) | |
| x2, y2 = min(w, int(x2)), min(h, int(y2)) | |
| if x2 > x1 and y2 > y1: | |
| boxes.append((x1, y1, x2, y2, float(score))) | |
| return boxes | |
| def recognize_attributes(image: np.ndarray, box): | |
| if _attr_predictor is None: | |
| load_models() | |
| x1, y1, x2, y2 = box[:4] | |
| crop = image[y1:y2, x1:x2] | |
| if crop.size == 0: | |
| return {} | |
| crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) | |
| img = _preprocess_attr(crop_rgb) | |
| input_names = _attr_predictor.get_input_names() | |
| handle = _attr_predictor.get_input_handle(input_names[0]) | |
| handle.reshape(img.shape) | |
| handle.copy_from_cpu(img) | |
| _attr_predictor.run() | |
| output_names = _attr_predictor.get_output_names() | |
| output_handle = _attr_predictor.get_output_handle(output_names[0]) | |
| logits = output_handle.copy_to_cpu()[0] | |
| probs = 1.0 / (1.0 + np.exp(-logits)) | |
| result = {} | |
| for group, attrs in ATTR_GROUPS.items(): | |
| if group in ("Age", "Gender", "Direction"): | |
| best_label, best_score = None, -1 | |
| for label, idx in attrs: | |
| s = (1.0 - probs[abs(idx)]) if idx < 0 else (probs[idx] if idx < len(probs) else 0.0) | |
| if s > best_score: | |
| best_score, best_label = s, label | |
| if best_label: | |
| result[group] = (best_label, float(best_score)) | |
| else: | |
| for label, idx in attrs: | |
| if idx < len(probs) and probs[idx] > 0.5: | |
| result[label] = float(probs[idx]) | |
| return result | |
| # --------------------------------------------------------------------------- | |
| # Batch attribute recognition (faster for multiple pedestrians) | |
| # --------------------------------------------------------------------------- | |
| def recognize_attributes_batch(image: np.ndarray, boxes): | |
| """Process all detected pedestrians in one batch for speed.""" | |
| if not boxes: | |
| return [] | |
| if _attr_predictor is None: | |
| load_models() | |
| crops = [] | |
| valid_indices = [] | |
| for i, box in enumerate(boxes): | |
| x1, y1, x2, y2 = box[:4] | |
| crop = image[y1:y2, x1:x2] | |
| if crop.size > 0: | |
| crop_rgb = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB) | |
| crops.append(_preprocess_attr(crop_rgb)) | |
| valid_indices.append(i) | |
| if not crops: | |
| return [{} for _ in boxes] | |
| # Stack into batch | |
| batch = np.concatenate(crops, axis=0) | |
| input_names = _attr_predictor.get_input_names() | |
| handle = _attr_predictor.get_input_handle(input_names[0]) | |
| handle.reshape(batch.shape) | |
| handle.copy_from_cpu(batch) | |
| _attr_predictor.run() | |
| output_names = _attr_predictor.get_output_names() | |
| output_handle = _attr_predictor.get_output_handle(output_names[0]) | |
| all_logits = output_handle.copy_to_cpu() | |
| all_attrs = [{} for _ in boxes] | |
| for batch_idx, orig_idx in enumerate(valid_indices): | |
| logits = all_logits[batch_idx] | |
| probs = 1.0 / (1.0 + np.exp(-logits)) | |
| result = {} | |
| for group, attrs in ATTR_GROUPS.items(): | |
| if group in ("Age", "Gender", "Direction"): | |
| best_label, best_score = None, -1 | |
| for label, idx in attrs: | |
| s = (1.0 - probs[abs(idx)]) if idx < 0 else (probs[idx] if idx < len(probs) else 0.0) | |
| if s > best_score: | |
| best_score, best_label = s, label | |
| if best_label: | |
| result[group] = (best_label, float(best_score)) | |
| else: | |
| for label, idx in attrs: | |
| if idx < len(probs) and probs[idx] > 0.5: | |
| result[label] = float(probs[idx]) | |
| all_attrs[orig_idx] = result | |
| return all_attrs | |
| # --------------------------------------------------------------------------- | |
| # Drawing | |
| # --------------------------------------------------------------------------- | |
| PALETTE = [ | |
| (78, 172, 248), (120, 230, 150), (255, 180, 80), (200, 130, 255), | |
| (255, 120, 120), (100, 220, 220), (255, 200, 150), (180, 180, 255), | |
| ] | |
| def draw_results(image, detections, attributes, show_attrs=True): | |
| canvas = image.copy() | |
| h, w = canvas.shape[:2] | |
| s = max(w, h) / 1000.0 | |
| fs = max(0.45, s * 0.55) | |
| thick = max(1, int(s * 2)) | |
| bthick = max(2, int(s * 2.5)) | |
| attr_fs = fs * 0.75 | |
| is_single = len(detections) == 1 | |
| for i, (det, attrs) in enumerate(zip(detections, attributes)): | |
| x1, y1, x2, y2, score = det | |
| c = PALETTE[i % len(PALETTE)] | |
| cv2.rectangle(canvas, (x1, y1), (x2, y2), c, bthick) | |
| # --- Detection header label (always on top of box) --- | |
| hdr = f"#{i+1} {score:.0%}" | |
| (htw, hth), _ = cv2.getTextSize(hdr, cv2.FONT_HERSHEY_SIMPLEX, fs, thick) | |
| hdr_h = hth + 10 | |
| cv2.rectangle(canvas, (x1, y1 - hdr_h), (x1 + htw + 6, y1), c, -1) | |
| cv2.putText(canvas, hdr, (x1 + 3, y1 - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, fs, (255, 255, 255), thick) | |
| if not show_attrs or not attrs: | |
| continue | |
| # --- Build attribute text lines (ONE attribute per line) --- | |
| lines = [] | |
| if "Gender" in attrs: | |
| lines.append(f"Gender: {attrs['Gender'][0]}") | |
| if "Age" in attrs: | |
| lines.append(f"Age: {attrs['Age'][0]}") | |
| if "Direction" in attrs: | |
| lines.append(f"Dir: {attrs['Direction'][0]}") | |
| # Add each extra attribute as its OWN line (not joined) | |
| for k, v in attrs.items(): | |
| if k in ("Gender", "Age", "Direction"): | |
| continue | |
| if isinstance(v, tuple): | |
| lines.append(f"{v[0]}") | |
| else: | |
| lines.append(f"{k}") | |
| if not lines: | |
| continue | |
| (_, lth), _ = cv2.getTextSize("Ag", cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1) | |
| lh = lth + 8 | |
| pad = 5 | |
| if is_single: | |
| # βββ SINGLE PEDESTRIAN: attributes beneath header label βββ | |
| # Limit to max 8 lines to keep it clean | |
| display_lines = lines[:8] | |
| mw = max( | |
| cv2.getTextSize(l, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)[0][0] | |
| for l in display_lines | |
| ) | |
| block_w = mw + pad * 2 | |
| block_h = len(display_lines) * lh + pad * 2 | |
| # Position: starts right at the top edge of the bounding box | |
| # (directly beneath the header label which sits above the box) | |
| bx = x1 | |
| by = y1 | |
| # Clamp to image edges | |
| if bx + block_w > w: | |
| bx = w - block_w | |
| if bx < 0: | |
| bx = 0 | |
| if by + block_h > h: | |
| by = h - block_h | |
| # Semi-transparent background | |
| overlay = canvas.copy() | |
| cv2.rectangle(overlay, (bx, by), (bx + block_w, by + block_h), (20, 20, 20), -1) | |
| cv2.addWeighted(overlay, 0.8, canvas, 0.2, 0, canvas) | |
| for j, line in enumerate(display_lines): | |
| ty = by + pad + (j + 1) * lh - 3 | |
| cv2.putText(canvas, line, (bx + pad, ty), | |
| cv2.FONT_HERSHEY_SIMPLEX, attr_fs, | |
| (255, 255, 255), 1, cv2.LINE_AA) | |
| else: | |
| # βββ MULTIPLE PEDESTRIANS: attributes inside box (upper-left) βββ | |
| box_w = x2 - x1 | |
| box_h = y2 - y1 | |
| # Limit lines to fit inside box height | |
| max_lines = max(1, (box_h - pad * 2) // lh) | |
| display_lines = lines[:max_lines] | |
| # Truncate any line that's too wide for the box | |
| fitted_lines = [] | |
| max_text_w = box_w - pad * 2 | |
| for line in display_lines: | |
| (ltw, _), _ = cv2.getTextSize(line, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1) | |
| if ltw > max_text_w and max_text_w > 30: | |
| # Truncate with ellipsis | |
| while len(line) > 3: | |
| line = line[:-1] | |
| (ltw, _), _ = cv2.getTextSize(line + "..", cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1) | |
| if ltw <= max_text_w: | |
| line = line + ".." | |
| break | |
| fitted_lines.append(line) | |
| if not fitted_lines: | |
| continue | |
| mw = max( | |
| cv2.getTextSize(l, cv2.FONT_HERSHEY_SIMPLEX, attr_fs, 1)[0][0] | |
| for l in fitted_lines | |
| ) | |
| block_w = min(mw + pad * 2, box_w) | |
| block_h = min(len(fitted_lines) * lh + pad * 2, box_h) | |
| bx = x1 + 2 | |
| by = y1 + 2 | |
| overlay = canvas.copy() | |
| cv2.rectangle(overlay, (bx, by), (bx + block_w, by + block_h), (20, 20, 20), -1) | |
| cv2.addWeighted(overlay, 0.75, canvas, 0.25, 0, canvas) | |
| for j, line in enumerate(fitted_lines): | |
| ty = by + pad + (j + 1) * lh - 3 | |
| cv2.putText(canvas, line, (bx + pad, ty), | |
| cv2.FONT_HERSHEY_SIMPLEX, attr_fs, | |
| (255, 255, 255), 1, cv2.LINE_AA) | |
| # Pedestrian count badge | |
| ct = f"Pedestrians: {len(detections)}" | |
| (tw, th), _ = cv2.getTextSize(ct, cv2.FONT_HERSHEY_SIMPLEX, fs, thick) | |
| cv2.rectangle(canvas, (8, 8), (tw + 24, th + 24), (30, 30, 30), -1) | |
| cv2.putText(canvas, ct, (16, th + 16), | |
| cv2.FONT_HERSHEY_SIMPLEX, fs, (78, 172, 248), thick, cv2.LINE_AA) | |
| return canvas | |
| # --------------------------------------------------------------------------- | |
| # High-level API | |
| # --------------------------------------------------------------------------- | |
| def analyze_image(image: np.ndarray, conf_thresh=0.5, show_attrs=True): | |
| bgr = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) if len(image.shape) == 3 else image | |
| # Downscale large images | |
| h, w = bgr.shape[:2] | |
| if max(h, w) > MAX_IMAGE_SIDE: | |
| r = MAX_IMAGE_SIDE / max(h, w) | |
| bgr = cv2.resize(bgr, (int(w * r), int(h * r))) | |
| detections = detect_pedestrians(bgr, conf_thresh) | |
| all_attrs = recognize_attributes_batch(bgr, detections) | |
| annotated = draw_results(bgr, detections, all_attrs, show_attrs) | |
| annotated_rgb = cv2.cvtColor(annotated, cv2.COLOR_BGR2RGB) | |
| lines = [] | |
| for i, (det, attrs) in enumerate(zip(detections, all_attrs)): | |
| parts = [f"{k}: {v[0]}" if isinstance(v, tuple) else f"{k} ({v:.0%})" for k, v in attrs.items()] | |
| line = f"**Person #{i+1}** ({det[4]:.0%})" | |
| if parts: | |
| line += " β " + ", ".join(parts) | |
| lines.append(line) | |
| return annotated_rgb, "\n\n".join(lines) if lines else "No pedestrians detected." |