| from __future__ import annotations |
|
|
| import copy |
| import math |
| import pickle |
| import threading |
| from dataclasses import dataclass |
| from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
| import numpy as np |
| import cv2 |
| import torch |
|
|
|
|
| # ============================================================ |
| # ComfyUI Node (pose_data + PKL) |
| # ============================================================ |
| |
| _GLOBAL_LOCK = threading.Lock() |
| |
| |
| class KPSSmoothPoseDataAndRender: |
| """ |
| Сглаживание + рендер позы. |
| Вход: POSEDATA (как объект/dict; обычно приходит из TSLoadPoseDataPickle). |
| Выход: IMAGE (torch [T,H,W,3] float 0..1), POSEDATA (в том же формате, но сглаженный). |
| """ |
| |
| @classmethod |
| def INPUT_TYPES(cls): |
| return { |
| "required": { |
| "pose_data": ("POSEDATA",), # <-- ВАЖНО: именно POSEDATA |
| "filter_extra_people": ("BOOLEAN", {"default": True}), |
| # общий набор параметров сглаживания (вместо body + face_hands) |
| "smooth_alpha": ("FLOAT", {"default": 0.7, "min": 0.01, "max": 0.99, "step": 0.01}), |
| "gap_frames": ("INT", {"default": 12, "min": 0, "max": 100, "step": 1}), |
| "min_run_frames": ("INT", {"default": 2, "min": 1, "max": 60, "step": 1}), |
| # пороги отрисовки (в инпут добавляем body/hands, face НЕ добавляем) |
| "conf_thresh_body": ("FLOAT", {"default": 0.20, "min": 0.0, "max": 1.0, "step": 0.01}), |
| "conf_thresh_hands": ("FLOAT", {"default": 0.50, "min": 0.0, "max": 1.0, "step": 0.01}), |
| } |
| } |
| |
| RETURN_TYPES = ("IMAGE", "POSEDATA") # <-- ВАЖНО: именно POSEDATA |
| RETURN_NAMES = ("IMAGE", "pose_data") |
| FUNCTION = "run" |
| CATEGORY = "posedata" |
| |
| def run(self, pose_data, **kwargs): |
| filter_extra_people = bool(kwargs.get("filter_extra_people", True)) |
| |
| # общий набор |
| smooth_alpha = float(kwargs.get("smooth_alpha", 0.7)) |
| gap_frames = int(kwargs.get("gap_frames", 12)) |
| min_run_frames = int(kwargs.get("min_run_frames", 2)) |
| |
| # пороги рендера |
| conf_thresh_body = float(kwargs.get("conf_thresh_body", 0.20)) |
| conf_thresh_hands = float(kwargs.get("conf_thresh_hands", 0.50)) |
| conf_thresh_face = 0.20 # <- НЕ добавляем в INPUT, но фиксируем как ты просил |
| |
| force_body_18 = bool(kwargs.get("force_body_18", False)) |
| |
| pose_data = _coerce_pose_data_to_obj(pose_data) |
| |
| # pose_data -> frames_json_like |
| frames_json_like, meta_ref = _pose_data_to_kps_frames(pose_data, force_body_18=force_body_18) |
| |
| with _GLOBAL_LOCK: |
| old = _snapshot_tunable_globals() |
| try: |
| # BODY |
| globals()["ALPHA_BODY"] = smooth_alpha |
| globals()["SUPER_SMOOTH_ALPHA"] = smooth_alpha |
| globals()["MAX_GAP_FRAMES"] = gap_frames |
| globals()["MIN_RUN_FRAMES"] = min_run_frames |
| |
| # FACE+HANDS (dense) тоже от общего набора |
| globals()["DENSE_SUPER_SMOOTH_ALPHA"] = smooth_alpha |
| globals()["DENSE_MAX_GAP_FRAMES"] = gap_frames |
| globals()["DENSE_MIN_RUN_FRAMES"] = min_run_frames |
| |
| globals()["FILTER_EXTRA_PEOPLE"] = filter_extra_people |
| |
| smoothed_frames = smooth_KPS_json_obj( |
| frames_json_like, |
| keep_face_untouched=False, |
| keep_hands_untouched=False, |
| filter_extra_people=filter_extra_people, |
| ) |
| finally: |
| _restore_tunable_globals(old) |
| |
| # frames_json_like -> pose_data (обратно в pose_metas) |
| out_pose_data = _kps_frames_to_pose_data(pose_data, smoothed_frames, meta_ref, force_body_18=force_body_18) |
| |
| # render |
| w, h = _extract_canvas_wh(smoothed_frames, default_w=720, default_h=1280) |
| frames_np = [] |
| for fr in smoothed_frames: |
| if isinstance(fr, dict) and fr.get("people"): |
| img = _draw_pose_frame_full( |
| w, |
| h, |
| fr["people"][0], |
| conf_thresh_body=conf_thresh_body, |
| conf_thresh_hands=conf_thresh_hands, |
| conf_thresh_face=conf_thresh_face, |
| ) |
| else: |
| img = np.zeros((h, w, 3), dtype=np.uint8) |
| frames_np.append(img) |
| |
| frames_t = torch.from_numpy(np.stack(frames_np, axis=0)).float() / 255.0 |
| return (frames_t, out_pose_data) |
| |
|
|
| # ============================================================ |
| # PKL / pose_data IO |
| # ============================================================ |
| |
| |
| class _PoseDummyObj: |
| def __init__(self, *a, **k): |
| pass |
| |
| def __setstate__(self, state): |
| # поддержка dict и (dict, slotstate) |
| if isinstance(state, dict): |
| self.__dict__.update(state) |
| elif isinstance(state, (list, tuple)) and len(state) == 2 and isinstance(state[0], dict): |
| self.__dict__.update(state[0]) |
| if isinstance(state[1], dict): |
| self.__dict__.update(state[1]) |
| else: |
| self.__dict__["_slotstate"] = state[1] |
| else: |
| self.__dict__["_state"] = state |
| |
|
|
| class _SafeUnpickler(pickle.Unpickler): |
| """ |
| Безопасно грузим PKL из ComfyUI окружения: |
| - ремап numpy._core -> numpy.core |
| - неизвестные классы (WanAnimatePreprocess.*) превращаем в простые объекты с __dict__ |
| """ |
| |
| def find_class(self, module, name): |
| # ремап внутренних путей numpy (частая проблема между версиями) |
| if module.startswith("numpy._core"): |
| module = module.replace("numpy._core", "numpy.core", 1) |
| if module.startswith("numpy._globals"): |
| module = module.replace("numpy._globals", "numpy", 1) |
| |
| # конкретные классы метаданных (если встречаются) |
| if name in {"AAPoseMeta"}: |
| return _PoseDummyObj |
| |
| try: |
| return super().find_class(module, name) |
| except Exception: |
| return _PoseDummyObj |
| |
| |
| def _load_pose_data_pkl(path: str) -> Any: |
| with open(path, "rb") as f: |
| return _SafeUnpickler(f).load() |
| |
| |
| def _coerce_pose_data_to_obj(pd: Any) -> Any: |
| """ |
| Accepts: |
| - dict pose_data |
| - object with attributes like .pose_metas (AAPoseMeta-like) |
| - str path to .pkl |
| - dict wrapper with 'pose_data' |
| """ |
| if isinstance(pd, str): |
| obj = _load_pose_data_pkl(pd) |
| return obj |
| |
| if isinstance(pd, dict) and "pose_data" in pd: |
| return pd["pose_data"] |
| |
| return pd |
| |
| |
| # ============================================================ |
| # pose_data <-> JSON-like KPS frames |
| # ============================================================ |
| |
| |
| def _as_attr(x: Any, key: str, default=None): |
| if isinstance(x, dict): |
| return x.get(key, default) |
| return getattr(x, key, default) |
| |
| |
| def _set_attr(x: Any, key: str, value: Any): |
| if isinstance(x, dict): |
| x[key] = value |
| else: |
| setattr(x, key, value) |
| |
| |
| def _xy_p_to_flat(xy: Optional[np.ndarray], p: Optional[np.ndarray]) -> Optional[List[float]]: |
| if xy is None: |
| return None |
| arr = np.asarray(xy) |
| if arr.ndim != 2 or arr.shape[1] < 2: |
| return None |
| N = arr.shape[0] |
| if p is None: |
| pp = np.ones((N,), dtype=np.float32) |
| else: |
| pp = np.asarray(p).reshape(-1) |
| if pp.shape[0] != N: |
| # если вдруг не совпали — подстрахуемся |
| pp = np.ones((N,), dtype=np.float32) |
| |
| out: List[float] = [] |
| for i in range(N): |
| out.extend([float(arr[i, 0]), float(arr[i, 1]), float(pp[i])]) |
| return out |
| |
| |
| def _flat_to_xy_p(flat: Optional[List[float]]) -> Tuple[Optional[np.ndarray], Optional[np.ndarray]]: |
| if not isinstance(flat, list) or len(flat) % 3 != 0: |
| return None, None |
| N = len(flat) // 3 |
| xy = np.zeros((N, 2), dtype=np.float32) |
| p = np.zeros((N,), dtype=np.float32) |
| for i in range(N): |
| xy[i, 0] = float(flat[3 * i + 0]) |
| xy[i, 1] = float(flat[3 * i + 1]) |
| p[i] = float(flat[3 * i + 2]) |
| return xy, p |
| |
|
|
| def _pose_data_to_kps_frames(pose_data: Any, *, force_body_18: bool) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: |
| """ |
| Делает "как JSON" список кадров: |
| frame = {"people":[{pose_keypoints_2d, face_keypoints_2d, hand_left_keypoints_2d, hand_right_keypoints_2d}], |
| "canvas_width": W, "canvas_height": H} |
| meta_ref: ссылки на pose_metas + тип/доступ, чтобы правильно записать обратно. |
| """ |
| pose_metas = _as_attr(pose_data, "pose_metas", None) |
| if pose_metas is None: |
| # иногда называют иначе |
| pose_metas = _as_attr(pose_data, "frames", None) |
| |
| if pose_metas is None or not isinstance(pose_metas, list): |
| raise ValueError("pose_data does not contain 'pose_metas' list.") |
| |
| frames: List[Dict[str, Any]] = [] |
| for meta in pose_metas: |
| h = _as_attr(meta, "height", 1280) |
| w = _as_attr(meta, "width", 720) |
| |
| kps_body = _as_attr(meta, "kps_body", None) |
| kps_body_p = _as_attr(meta, "kps_body_p", None) |
| |
| kps_face = _as_attr(meta, "kps_face", None) |
| kps_face_p = _as_attr(meta, "kps_face_p", None) |
| |
| kps_lhand = _as_attr(meta, "kps_lhand", None) |
| kps_lhand_p = _as_attr(meta, "kps_lhand_p", None) |
| |
| kps_rhand = _as_attr(meta, "kps_rhand", None) |
| kps_rhand_p = _as_attr(meta, "kps_rhand_p", None) |
| |
| # to flat |
| pose_flat = _xy_p_to_flat(kps_body, kps_body_p) |
| face_flat = _xy_p_to_flat(kps_face, kps_face_p) |
| lh_flat = _xy_p_to_flat(kps_lhand, kps_lhand_p) |
| rh_flat = _xy_p_to_flat(kps_rhand, kps_rhand_p) |
| |
| if force_body_18 and isinstance(pose_flat, list) and len(pose_flat) >= 18 * 3: |
| pose_flat = pose_flat[: 18 * 3] |
| |
| person = { |
| "pose_keypoints_2d": pose_flat if pose_flat is not None else [], |
| "face_keypoints_2d": face_flat if face_flat is not None else [], |
| "hand_left_keypoints_2d": lh_flat, |
| "hand_right_keypoints_2d": rh_flat, |
| } |
| |
| frame = {"people": [person], "canvas_height": int(h), "canvas_width": int(w)} |
| frames.append(frame) |
| |
| meta_ref = { |
| "pose_metas": pose_metas, |
| "len": len(pose_metas), |
| } |
| return frames, meta_ref |
| |
|
|
| def _kps_frames_to_pose_data( |
| pose_data_in: Any, |
| frames_kps: List[Dict[str, Any]], |
| meta_ref: Dict[str, Any], |
| *, |
| force_body_18: bool, |
| ) -> Any: |
| """ |
| Записывает обратно сглаженные keypoints в pose_metas[*].kps_* / kps_*_p. |
| Остальные поля pose_data сохраняем. |
| """ |
| out_pd = copy.deepcopy(pose_data_in) |
| pose_metas_out = _as_attr(out_pd, "pose_metas", None) |
| if pose_metas_out is None: |
| # fallback: вдруг другой ключ |
| pose_metas_out = meta_ref.get("pose_metas") |
| |
| if pose_metas_out is None or not isinstance(pose_metas_out, list): |
| raise ValueError("Failed to locate pose_metas in output pose_data.") |
| |
| T = min(len(pose_metas_out), len(frames_kps)) |
| for t in range(T): |
| meta = pose_metas_out[t] |
| fr = frames_kps[t] |
| people = fr.get("people", []) if isinstance(fr, dict) else [] |
| p0 = people[0] if people else None |
| if not isinstance(p0, dict): |
| continue |
| |
| pose_flat = p0.get("pose_keypoints_2d") |
| face_flat = p0.get("face_keypoints_2d") |
| lh_flat = p0.get("hand_left_keypoints_2d") |
| rh_flat = p0.get("hand_right_keypoints_2d") |
| |
| if force_body_18 and isinstance(pose_flat, list) and len(pose_flat) >= 18 * 3: |
| pose_flat = pose_flat[: 18 * 3] |
| |
| body_xy, body_p = _flat_to_xy_p(pose_flat if isinstance(pose_flat, list) else None) |
| face_xy, face_p = _flat_to_xy_p(face_flat if isinstance(face_flat, list) else None) |
| lh_xy, lh_p = _flat_to_xy_p(lh_flat if isinstance(lh_flat, list) else None) |
| rh_xy, rh_p = _flat_to_xy_p(rh_flat if isinstance(rh_flat, list) else None) |
| |
| if body_xy is not None and body_p is not None: |
| _set_attr(meta, "kps_body", body_xy.astype(np.float32, copy=False)) |
| _set_attr(meta, "kps_body_p", body_p.astype(np.float32, copy=False)) |
| |
| if face_xy is not None and face_p is not None: |
| _set_attr(meta, "kps_face", face_xy.astype(np.float32, copy=False)) |
| _set_attr(meta, "kps_face_p", face_p.astype(np.float32, copy=False)) |
| |
| if lh_xy is not None and lh_p is not None: |
| _set_attr(meta, "kps_lhand", lh_xy.astype(np.float32, copy=False)) |
| _set_attr(meta, "kps_lhand_p", lh_p.astype(np.float32, copy=False)) |
| |
| if rh_xy is not None and rh_p is not None: |
| _set_attr(meta, "kps_rhand", rh_xy.astype(np.float32, copy=False)) |
| _set_attr(meta, "kps_rhand_p", rh_p.astype(np.float32, copy=False)) |
| |
| # обновим width/height если нужно |
| if isinstance(fr, dict): |
| if "canvas_width" in fr: |
| _set_attr(meta, "width", int(fr["canvas_width"])) |
| if "canvas_height" in fr: |
| _set_attr(meta, "height", int(fr["canvas_height"])) |
| |
| # обязательно положим pose_metas обратно |
| _set_attr(out_pd, "pose_metas", pose_metas_out) |
| return out_pd |
| |
|
|
| def _extract_canvas_wh(data: Any, default_w: int, default_h: int) -> Tuple[int, int]: |
| w, h = int(default_w), int(default_h) |
| if isinstance(data, list): |
| for fr in data: |
| if isinstance(fr, dict) and "canvas_width" in fr and "canvas_height" in fr: |
| try: |
| w = int(fr["canvas_width"]) |
| h = int(fr["canvas_height"]) |
| break |
| except Exception: |
| pass |
| return w, h |
| |
|
|
| # ============================================================ |
| # === START: smooth_KPS_json.py logic (ported as-is) |
| # ============================================================ |
|
|
| # --- Root+Scale carry (when torso disappears on close-up) --- |
| ROOTSCALE_CARRY_ENABLED = True |
| CARRY_MAX_FRAMES = 48 |
| CARRY_MIN_ANCHORS = 2 |
| CARRY_ANCHOR_JOINTS = [0, 1, 2, 5, 3, 6, 4, 7] |
| CARRY_CONF_GATE = 0.20 |
|
|
| # --- Main person selection / multi-person filtering --- |
| FILTER_EXTRA_PEOPLE = True |
| MAIN_PERSON_MODE = "longest_track" |
| TRACK_MATCH_MIN_PX = 80.0 |
| TRACK_MATCH_FACTOR = 3.0 |
| TRACK_MAX_FRAME_GAP = 32 |
| |
| # --- Spatial outlier suppression --- |
| SPATIAL_OUTLIER_FIX = True |
| BONE_MAX_FACTOR = 2.3 |
| TORSO_RADIUS_FACTOR = 4.0 |
| |
| # EMA smoothing for BODY only (online) |
| ALPHA_BODY = 0.70 |
| MAX_STEP_BODY = 60.0 |
| VEL_ALPHA = 0.45 |
| EPS = 0.3 |
| CONF_GATE_BODY = 0.20 |
| CONF_FLOOR_BODY = 0.00 |
| |
| TRACK_DIST_PENALTY = 1.5 |
| FACE_WEIGHT_IN_SCORE = 0.15 |
| HAND_WEIGHT_IN_SCORE = 0.35 |
| |
| ALLOW_DISAPPEAR_JOINTS = {3, 4, 6, 7} |
| |
| GAP_FILL_ENABLED = True |
| MAX_GAP_FRAMES = 12 |
| MIN_RUN_FRAMES = 2 |
| |
| TORSO_SYNC_ENABLED = True |
| TORSO_JOINTS = {1, 2, 5, 8, 11} |
| TORSO_LOOKAHEAD_FRAMES = 32 |
|
|
| SUPER_SMOOTH_ENABLED = True |
| SUPER_SMOOTH_ALPHA = 0.7 |
| SUPER_SMOOTH_MIN_CONF = 0.20 |
| |
| MEDIAN3_ENABLED = True |
|
|
| FACE_SMOOTH_ENABLED = True |
| HANDS_SMOOTH_ENABLED = False |
|
|
| CONF_GATE_FACE = 0.20 |
| CONF_GATE_HAND = 0.50 |
|
|
| HAND_MIN_POINTS_PRESENT = 7 |
| MIN_HAND_RUN_FRAMES = 6 |
|
|
| DENSE_GAP_FILL_ENABLED = False |
| DENSE_MAX_GAP_FRAMES = 8 |
| DENSE_MIN_RUN_FRAMES = 2 |
| |
| DENSE_MEDIAN3_ENABLED = False |
| DENSE_SUPER_SMOOTH_ENABLED = False |
| DENSE_SUPER_SMOOTH_ALPHA = 0.7 |
| |
| |
| def _snapshot_tunable_globals() -> Dict[str, Any]: |
| keys = [ |
| "FILTER_EXTRA_PEOPLE", |
| "SUPER_SMOOTH_ALPHA", |
| "MAX_GAP_FRAMES", |
| "MIN_RUN_FRAMES", |
| "DENSE_SUPER_SMOOTH_ALPHA", |
| "DENSE_MAX_GAP_FRAMES", |
| "DENSE_MIN_RUN_FRAMES", |
| ] |
| return {k: globals().get(k) for k in keys} |
| |
|
|
| def _restore_tunable_globals(old: Dict[str, Any]) -> None: |
| for k, v in old.items(): |
| globals()[k] = v |
| |
| |
| def _is_valid_xyc(x: float, y: float, c: float) -> bool: |
| if c is None: |
| return False |
| if c <= 0: |
| return False |
| if x == 0 and y == 0: |
| return False |
| if math.isnan(x) or math.isnan(y) or math.isnan(c): |
| return False |
| return True |
| |
|
|
| def _reshape_keypoints_2d(arr: List[float]) -> List[Tuple[float, float, float]]: |
| if arr is None: |
| return [] |
| if len(arr) % 3 != 0: |
| raise ValueError(f"keypoints length not multiple of 3: {len(arr)}") |
| out = [] |
| for i in range(0, len(arr), 3): |
| out.append((float(arr[i]), float(arr[i + 1]), float(arr[i + 2]))) |
| return out |
| |
| |
| def _flatten_keypoints_2d(kps: List[Tuple[float, float, float]]) -> List[float]: |
| out: List[float] = [] |
| for x, y, c in kps: |
| out.extend([float(x), float(y), float(c)]) |
| return out |
| |
|
|
| def _sum_conf(arr: Optional[List[float]], sample_step: int = 1) -> float: |
| if not arr: |
| return 0.0 |
| s = 0.0 |
| for i in range(2, len(arr), 3 * sample_step): |
| try: |
| c = float(arr[i]) |
| except Exception: |
| c = 0.0 |
| if c > 0: |
| s += c |
| return s |
| |
|
|
| def _body_center_from_pose(pose_arr: Optional[List[float]]) -> Optional[Tuple[float, float]]: |
| if not pose_arr: |
| return None |
| kps = _reshape_keypoints_2d(pose_arr) |
| idxs = [2, 5, 8, 11, 1] |
| pts = [] |
| for idx in idxs: |
| if idx < len(kps): |
| x, y, c = kps[idx] |
| if _is_valid_xyc(x, y, c): |
| pts.append((x, y)) |
| if not pts: |
| for x, y, c in kps: |
| if _is_valid_xyc(x, y, c): |
| pts.append((x, y)) |
| if not pts: |
| return None |
| cx = sum(p[0] for p in pts) / len(pts) |
| cy = sum(p[1] for p in pts) / len(pts) |
| return (cx, cy) |
| |
|
|
| def _dist(a: Tuple[float, float], b: Tuple[float, float]) -> float: |
| return math.hypot(a[0] - b[0], a[1] - b[1]) |
| |
| |
| def _choose_single_person( |
| people: List[Dict[str, Any]], prev_center: Optional[Tuple[float, float]] |
| ) -> Optional[Dict[str, Any]]: |
| if not people: |
| return None |
| best = None |
| best_score = -1e18 |
| |
| for p in people: |
| pose = p.get("pose_keypoints_2d") |
| face = p.get("face_keypoints_2d") |
| lh = p.get("hand_left_keypoints_2d") |
| rh = p.get("hand_right_keypoints_2d") |
| |
| score = _sum_conf(pose) |
| score += FACE_WEIGHT_IN_SCORE * _sum_conf(face, sample_step=4) |
| score += HAND_WEIGHT_IN_SCORE * (_sum_conf(lh, sample_step=2) + _sum_conf(rh, sample_step=2)) |
| |
| center = _body_center_from_pose(pose) |
| if prev_center is not None and center is not None: |
| score -= TRACK_DIST_PENALTY * _dist(prev_center, center) |
| |
| if score > best_score: |
| best_score = score |
| best = p |
| |
| return best |
| |
|
|
| @dataclass |
| class _Track: |
| frames: Dict[int, Dict[str, Any]] |
| centers: Dict[int, Tuple[float, float]] |
| last_t: int |
| last_center: Tuple[float, float] |
| |
|
|
| def _estimate_torso_scale(pose: List[Tuple[float, float, float]]) -> Optional[float]: |
| def dist(i, k) -> Optional[float]: |
| if i >= len(pose) or k >= len(pose): |
| return None |
| xi, yi, ci = pose[i] |
| xk, yk, ck = pose[k] |
| if not _is_valid_xyc(xi, yi, ci) or not _is_valid_xyc(xk, yk, ck): |
| return None |
| return math.hypot(xi - xk, yi - yk) |
| |
| cand = [dist(2, 5), dist(8, 11), dist(1, 8), dist(1, 11)] |
| cand = [c for c in cand if c is not None and c > 1e-3] |
| if not cand: |
| return None |
| return float(sum(cand) / len(cand)) |
| |
| |
| def _track_match_threshold_from_pose(pose_arr: Optional[List[float]]) -> float: |
| if isinstance(pose_arr, list): |
| pose = _reshape_keypoints_2d(pose_arr) |
| s = _estimate_torso_scale(pose) |
| if s is not None: |
| return max(float(TRACK_MATCH_MIN_PX), float(TRACK_MATCH_FACTOR) * float(s)) |
| return float(max(TRACK_MATCH_MIN_PX, 120.0)) |
| |
|
|
| def _build_tracks_over_video(frames_data: List[Any]) -> List[_Track]: |
| tracks: List[_Track] = [] |
| |
| for t, frame in enumerate(frames_data): |
| if not isinstance(frame, dict): |
| continue |
| people = frame.get("people", []) |
| if not isinstance(people, list) or not people: |
| continue |
| |
| cand: List[Tuple[int, Dict[str, Any], Tuple[float, float]]] = [] |
| for i, p in enumerate(people): |
| if not isinstance(p, dict): |
| continue |
| pose = p.get("pose_keypoints_2d") |
| c = _body_center_from_pose(pose) |
| if c is None: |
| continue |
| cand.append((i, p, c)) |
| |
| if not cand: |
| continue |
| |
| used = set() |
| track_order = sorted(range(len(tracks)), key=lambda k: tracks[k].last_t, reverse=True) |
| |
| for k in track_order: |
| tr = tracks[k] |
| age = t - tr.last_t |
| if age > int(TRACK_MAX_FRAME_GAP): |
| continue |
| |
| best_idx = None |
| best_d = 1e18 |
| |
| for i, p, cc in cand: |
| if i in used: |
| continue |
| |
| thr = _track_match_threshold_from_pose(p.get("pose_keypoints_2d")) |
| d = _dist(tr.last_center, cc) |
| if d <= thr and d < best_d: |
| best_d = d |
| best_idx = i |
| |
| if best_idx is not None: |
| i, p, cc = next(x for x in cand if x[0] == best_idx) |
| used.add(i) |
| tr.frames[t] = p |
| tr.centers[t] = cc |
| tr.last_t = t |
| tr.last_center = cc |
| |
| for i, p, cc in cand: |
| if i in used: |
| continue |
| tracks.append(_Track(frames={t: p}, centers={t: cc}, last_t=t, last_center=cc)) |
| |
| return tracks |
| |
|
|
| def _track_presence_score(tr: _Track) -> Tuple[int, float, float]: |
| frames_count = len(tr.frames) |
| face_sum = 0.0 |
| body_sum = 0.0 |
| for p in tr.frames.values(): |
| face_sum += _sum_conf(p.get("face_keypoints_2d"), sample_step=4) |
| body_sum += _sum_conf(p.get("pose_keypoints_2d"), sample_step=1) |
| return (frames_count, face_sum, body_sum) |
| |
|
|
| def _pick_main_track(tracks: List[_Track]) -> Optional[_Track]: |
| if not tracks: |
| return None |
| best = None |
| best_key = (-1, -1e18, -1e18) |
| for tr in tracks: |
| key = _track_presence_score(tr) |
| if key > best_key: |
| best_key = key |
| best = tr |
| return best |
| |
|
|
| @dataclass |
| class BodyState: |
| last_xy: List[Optional[Tuple[float, float]]] |
| last_v: List[Tuple[float, float]] |
| |
| def __init__(self, joints: int): |
| self.last_xy = [None] * joints |
| self.last_v = [(0.0, 0.0)] * joints |
| |
|
|
| def _smooth_body_pose(pose_arr: Optional[List[float]], state: BodyState) -> Optional[List[float]]: |
| if pose_arr is None: |
| return None |
| |
| kps = _reshape_keypoints_2d(pose_arr) |
| J = len(kps) |
| if len(state.last_xy) != J: |
| state.last_xy = [None] * J |
| state.last_v = [(0.0, 0.0)] * J |
| |
| out: List[Tuple[float, float, float]] = [] |
| |
| for j in range(J): |
| x, y, c = kps[j] |
| last = state.last_xy[j] |
| vx_last, vy_last = state.last_v[j] |
| |
| valid_in = _is_valid_xyc(x, y, c) and (c >= CONF_GATE_BODY) |
| |
| if valid_in: |
| if last is None: |
| nx, ny = x, y |
| state.last_xy[j] = (nx, ny) |
| state.last_v[j] = (0.0, 0.0) |
| out.append((nx, ny, float(c))) |
| continue |
| |
| dx_raw = x - last[0] |
| dy_raw = y - last[1] |
| if abs(dx_raw) < EPS: |
| dx_raw = 0.0 |
| if abs(dy_raw) < EPS: |
| dy_raw = 0.0 |
| |
| vx = VEL_ALPHA * dx_raw + (1.0 - VEL_ALPHA) * vx_last |
| vy = VEL_ALPHA * dy_raw + (1.0 - VEL_ALPHA) * vy_last |
| |
| px = last[0] + vx |
| py = last[1] + vy |
| |
| nx = ALPHA_BODY * x + (1.0 - ALPHA_BODY) * px |
| ny = ALPHA_BODY * y + (1.0 - ALPHA_BODY) * py |
| |
| ddx = nx - last[0] |
| ddy = ny - last[1] |
| d = math.hypot(ddx, ddy) |
| if d > MAX_STEP_BODY and d > 1e-6: |
| scale = MAX_STEP_BODY / d |
| nx = last[0] + ddx * scale |
| ny = last[1] + ddy * scale |
| vx = nx - last[0] |
| vy = ny - last[1] |
| |
| state.last_xy[j] = (nx, ny) |
| state.last_v[j] = (vx, vy) |
| |
| out.append((nx, ny, float(c))) |
| else: |
| out.append((float(x), float(y), float(c))) |
| |
| return _flatten_keypoints_2d(out) |
| |
|
|
| COCO18_EDGES = [ |
| (1, 2), |
| (2, 3), |
| (3, 4), |
| (1, 5), |
| (5, 6), |
| (6, 7), |
| (1, 8), |
| (8, 9), |
| (9, 10), |
| (1, 11), |
| (11, 12), |
| (12, 13), |
| (8, 11), |
| (1, 0), |
| (0, 14), |
| (14, 16), |
| (0, 15), |
| (15, 17), |
| ] |
| |
| HAND21_EDGES = [ |
| (0, 1), |
| (1, 2), |
| (2, 3), |
| (3, 4), |
| (0, 5), |
| (5, 6), |
| (6, 7), |
| (7, 8), |
| (0, 9), |
| (9, 10), |
| (10, 11), |
| (11, 12), |
| (0, 13), |
| (13, 14), |
| (14, 15), |
| (15, 16), |
| (0, 17), |
| (17, 18), |
| (18, 19), |
| (19, 20), |
| ] |
| |
| _NEIGHBORS = None |
| |
| |
| def _build_neighbors(): |
| global _NEIGHBORS |
| if _NEIGHBORS is not None: |
| return |
| neigh = {} |
| for a, b in COCO18_EDGES: |
| neigh.setdefault(a, set()).add(b) |
| neigh.setdefault(b, set()).add(a) |
| _NEIGHBORS = neigh |
| |
|
|
| def _suppress_spatial_outliers_in_pose_arr( |
| pose_arr: Optional[List[float]], *, conf_gate: float |
| ) -> Optional[List[float]]: |
| if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: |
| return pose_arr |
| |
| pose = _reshape_keypoints_2d(pose_arr) |
| J = len(pose) |
| |
| center = _body_center_from_pose(pose_arr) |
| scale = _estimate_torso_scale(pose) |
| if center is None or scale is None: |
| return pose_arr |
| |
| cx, cy = center |
| max_r = TORSO_RADIUS_FACTOR * scale |
| max_bone = BONE_MAX_FACTOR * scale |
| |
| out = [list(p) for p in pose] |
| |
| def visible(j: int) -> bool: |
| if j >= J: |
| return False |
| x, y, c = out[j] |
| return (c >= conf_gate) and not (x == 0 and y == 0) |
| |
| for j in range(J): |
| x, y, c = out[j] |
| if c >= conf_gate and not (x == 0 and y == 0): |
| if math.hypot(x - cx, y - cy) > max_r: |
| out[j] = [0.0, 0.0, 0.0] |
| |
| for a, b in COCO18_EDGES: |
| if a >= J or b >= J: |
| continue |
| if not visible(a) or not visible(b): |
| continue |
| ax, ay, ac = out[a] |
| bx, by, bc = out[b] |
| d = math.hypot(ax - bx, ay - by) |
| if d > max_bone: |
| if ac <= bc: |
| out[a] = [0.0, 0.0, 0.0] |
| else: |
| out[b] = [0.0, 0.0, 0.0] |
| |
| flat: List[float] = [] |
| for x, y, c in out: |
| flat.extend([float(x), float(y), float(c)]) |
| return flat |
| |
|
|
| def _suppress_isolated_joints_in_pose_arr( |
| pose_arr: Optional[List[float]], *, conf_gate: float, keep: set[int] = None |
| ) -> Optional[List[float]]: |
| if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: |
| return pose_arr |
| |
| _build_neighbors() |
| pose = _reshape_keypoints_2d(pose_arr) |
| J = len(pose) |
| out = [list(p) for p in pose] |
| |
| if keep is None: |
| keep = set() |
| |
| def vis(j: int) -> bool: |
| if j >= J: |
| return False |
| x, y, c = out[j] |
| return (c >= conf_gate) and not (x == 0 and y == 0) |
| |
| for j in range(J): |
| if j in keep: |
| continue |
| if not vis(j): |
| continue |
| neighs = _NEIGHBORS.get(j, set()) |
| if not any((n < J and vis(n)) for n in neighs): |
| out[j] = [0.0, 0.0, 0.0] |
| |
| flat = [] |
| for x, y, c in out: |
| flat.extend([float(x), float(y), float(c)]) |
| return flat |
| |
|
|
| def _denoise_and_fill_gaps_pose_seq( |
| pose_arr_seq: List[Optional[List[float]]], |
| *, |
| conf_gate: float, |
| min_run: int, |
| max_gap: int, |
| ) -> List[Optional[List[float]]]: |
| if not pose_arr_seq: |
| return pose_arr_seq |
| |
| J = None |
| for arr in pose_arr_seq: |
| if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: |
| J = len(arr) // 3 |
| break |
| if J is None: |
| return pose_arr_seq |
| |
| T = len(pose_arr_seq) |
| out_seq: List[Optional[List[float]]] = [] |
| for arr in pose_arr_seq: |
| if isinstance(arr, list) and len(arr) == J * 3: |
| out_seq.append(list(arr)) |
| else: |
| out_seq.append(arr) |
| |
| def is_vis(arr: List[float], j: int) -> bool: |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| c = float(arr[3 * j + 2]) |
| return (c >= conf_gate) and not (x == 0 and y == 0) |
| |
| # 1) remove short flashes |
| for j in range(J): |
| start = None |
| for t in range(T + 1): |
| cur = False |
| if t < T and isinstance(out_seq[t], list): |
| cur = is_vis(out_seq[t], j) |
| if cur and start is None: |
| start = t |
| if (not cur) and start is not None: |
| run_len = t - start |
| if run_len < min_run: |
| for k in range(start, t): |
| if not isinstance(out_seq[k], list): |
| continue |
| out_seq[k][3 * j + 0] = 0.0 |
| out_seq[k][3 * j + 1] = 0.0 |
| out_seq[k][3 * j + 2] = 0.0 |
| start = None |
| |
| # 2) gap fill only if returns |
| for j in range(J): |
| last_vis_t = None |
| t = 0 |
| while t < T: |
| arr = out_seq[t] |
| if not isinstance(arr, list): |
| t += 1 |
| continue |
| |
| cur_vis = is_vis(arr, j) |
| if cur_vis: |
| last_vis_t = t |
| t += 1 |
| continue |
| |
| if last_vis_t is None: |
| t += 1 |
| continue |
| |
| gap_start = t |
| t2 = t |
| while t2 < T: |
| arr2 = out_seq[t2] |
| if isinstance(arr2, list) and is_vis(arr2, j): |
| break |
| t2 += 1 |
| |
| if t2 >= T: |
| break |
| |
| gap_len = t2 - gap_start |
| if gap_len <= 0: |
| t = t2 |
| continue |
| |
| if gap_len <= max_gap: |
| a = out_seq[last_vis_t] |
| b = out_seq[t2] |
| if isinstance(a, list) and isinstance(b, list): |
| ax, ay, ac = float(a[3 * j + 0]), float(a[3 * j + 1]), float(a[3 * j + 2]) |
| bx, by, bc = float(b[3 * j + 0]), float(b[3 * j + 1]), float(b[3 * j + 2]) |
| if not (ax == 0 and ay == 0) and not (bx == 0 and by == 0): |
| conf_fill = min(ac, bc) |
| for k in range(gap_len): |
| tt = gap_start + k |
| if not isinstance(out_seq[tt], list): |
| continue |
| r = (k + 1) / (gap_len + 1) |
| x = ax + (bx - ax) * r |
| y = ay + (by - ay) * r |
| out_seq[tt][3 * j + 0] = float(x) |
| out_seq[tt][3 * j + 1] = float(y) |
| out_seq[tt][3 * j + 2] = float(conf_fill) |
| |
| t = t2 |
| |
| return out_seq |
| |
|
|
| def _zero_lag_ema_pose_seq( |
| pose_seq: List[Optional[List[float]]], *, alpha: float, conf_gate: float |
| ) -> List[Optional[List[float]]]: |
| if not pose_seq: |
| return pose_seq |
| |
| J = None |
| for arr in pose_seq: |
| if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: |
| J = len(arr) // 3 |
| break |
| if J is None: |
| return pose_seq |
| |
| T = len(pose_seq) |
| |
| def is_vis(arr: List[float], j: int) -> bool: |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| c = float(arr[3 * j + 2]) |
| return (c >= conf_gate) and not (x == 0 and y == 0) |
| |
| fwd = [None] * T |
| last = [None] * J |
| for t in range(T): |
| arr = pose_seq[t] |
| if not isinstance(arr, list) or len(arr) != J * 3: |
| fwd[t] = arr |
| continue |
| out = list(arr) |
| for j in range(J): |
| if is_vis(arr, j): |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| if last[j] is None: |
| sx, sy = x, y |
| else: |
| sx = alpha * x + (1 - alpha) * last[j][0] |
| sy = alpha * y + (1 - alpha) * last[j][1] |
| last[j] = (sx, sy) |
| out[3 * j + 0] = float(sx) |
| out[3 * j + 1] = float(sy) |
| fwd[t] = out |
| |
| bwd = [None] * T |
| last = [None] * J |
| for t in range(T - 1, -1, -1): |
| arr = fwd[t] |
| if not isinstance(arr, list) or len(arr) != J * 3: |
| bwd[t] = arr |
| continue |
| out = list(arr) |
| for j in range(J): |
| if is_vis(arr, j): |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| if last[j] is None: |
| sx, sy = x, y |
| else: |
| sx = alpha * x + (1 - alpha) * last[j][0] |
| sy = alpha * y + (1 - alpha) * last[j][1] |
| last[j] = (sx, sy) |
| out[3 * j + 0] = float(sx) |
| out[3 * j + 1] = float(sy) |
| bwd[t] = out |
| |
| return bwd |
| |
|
|
| def _apply_root_scale( |
| pose_arr: Optional[List[float]], |
| *, |
| src_root: Tuple[float, float], |
| src_scale: float, |
| dst_root: Tuple[float, float], |
| dst_scale: float, |
| ) -> Optional[List[float]]: |
| if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: |
| return pose_arr |
| if src_scale <= 1e-6 or dst_scale <= 1e-6: |
| return pose_arr |
| |
| kps = _reshape_keypoints_2d(pose_arr) |
| out = [] |
| s = dst_scale / src_scale |
| |
| for x, y, c in kps: |
| if c <= 0 or (x == 0 and y == 0): |
| out.append((x, y, c)) |
| continue |
| nx = dst_root[0] + (x - src_root[0]) * s |
| ny = dst_root[1] + (y - src_root[1]) * s |
| out.append((nx, ny, c)) |
| |
| return _flatten_keypoints_2d(out) |
| |
|
|
| def _carry_pose_when_torso_missing( |
| pose_seq: List[Optional[List[float]]], |
| *, |
| conf_gate: float, |
| max_carry: int, |
| anchor_joints: List[int], |
| min_anchors: int, |
| ) -> List[Optional[List[float]]]: |
| if not pose_seq: |
| return pose_seq |
| |
| J = None |
| for arr in pose_seq: |
| if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: |
| J = len(arr) // 3 |
| break |
| if J is None: |
| return pose_seq |
| |
| out = [a if a is None else list(a) for a in pose_seq] |
| |
| FILL_JOINTS = {1, 8, 9, 10, 11, 12, 13} |
| FILL_JOINTS -= set(ALLOW_DISAPPEAR_JOINTS) |
| |
| def is_vis_flat(arr: List[float], j: int) -> bool: |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| c = float(arr[3 * j + 2]) |
| return (c >= conf_gate) and not (x == 0 and y == 0) |
| |
| def count_visible(arr: List[float], joints: List[int]) -> int: |
| c = 0 |
| for j in joints: |
| if j < J and is_vis_flat(arr, j): |
| c += 1 |
| return c |
| |
| def root_scale_from_anchors(arr: List[float]) -> Optional[Tuple[Tuple[float, float], float]]: |
| pts = [] |
| for j in anchor_joints: |
| if j >= J: |
| continue |
| if is_vis_flat(arr, j): |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| pts.append((x, y)) |
| if len(pts) < min_anchors: |
| return None |
| |
| rx = sum(p[0] for p in pts) / len(pts) |
| ry = sum(p[1] for p in pts) / len(pts) |
| |
| xs = [p[0] for p in pts] |
| ys = [p[1] for p in pts] |
| scale = max(max(xs) - min(xs), max(ys) - min(ys)) |
| if scale <= 1e-3: |
| return None |
| |
| return (rx, ry), float(scale) |
| |
| last_good: Optional[List[float]] = None |
| last_good_rs: Optional[Tuple[Tuple[float, float], float]] = None |
| carry_left = 0 |
| |
| for t in range(len(out)): |
| arr = out[t] |
| if not isinstance(arr, list) or len(arr) != J * 3: |
| continue |
| |
| anchors_ok = count_visible(arr, anchor_joints) >= min_anchors |
| fill_vis = sum(1 for j in FILL_JOINTS if j < J and is_vis_flat(arr, j)) |
| rs = root_scale_from_anchors(arr) |
| |
| if anchors_ok and rs is not None and fill_vis >= 2: |
| last_good = list(arr) |
| last_good_rs = rs |
| carry_left = max_carry |
| continue |
| |
| if anchors_ok and rs is not None and last_good is not None and last_good_rs is not None and carry_left > 0: |
| dst_root, dst_scale = rs |
| src_root, src_scale = last_good_rs |
| |
| carried_full = _apply_root_scale( |
| last_good, |
| src_root=src_root, |
| src_scale=src_scale, |
| dst_root=dst_root, |
| dst_scale=dst_scale, |
| ) |
| if isinstance(carried_full, list) and len(carried_full) == J * 3: |
| for j in FILL_JOINTS: |
| if j >= J: |
| continue |
| if is_vis_flat(arr, j): |
| continue |
| |
| cx = float(carried_full[3 * j + 0]) |
| cy = float(carried_full[3 * j + 1]) |
| cc = float(carried_full[3 * j + 2]) |
| |
| if (cx == 0 and cy == 0) or cc <= 0: |
| continue |
| |
| arr[3 * j + 0] = cx |
| arr[3 * j + 1] = cy |
| arr[3 * j + 2] = max(min(cc, 0.60), conf_gate) |
| |
| out[t] = arr |
| carry_left -= 1 |
| continue |
| |
| carry_left = max(carry_left - 1, 0) |
| |
| return out |
| |
|
|
| def _force_full_torso_pair( |
| pose_seq: List[Optional[List[float]]], |
| *, |
| conf_gate: float, |
| anchor_joints: List[int], |
| min_anchors: int, |
| max_lookback: int = 240, |
| fill_legs_with_hip: bool = True, |
| always_fill_if_one_hip: bool = True, |
| ) -> List[Optional[List[float]]]: |
| if not pose_seq: |
| return pose_seq |
| |
| J = None |
| for arr in pose_seq: |
| if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: |
| J = len(arr) // 3 |
| break |
| if J is None: |
| return pose_seq |
| |
| out = [a if a is None else list(a) for a in pose_seq] |
| |
| R_HIP, R_KNEE, R_ANK = 8, 9, 10 |
| L_HIP, L_KNEE, L_ANK = 11, 12, 13 |
| |
| def is_vis(arr: List[float], j: int) -> bool: |
| if j >= J: |
| return False |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| c = float(arr[3 * j + 2]) |
| return (c >= conf_gate) and not (x == 0 and y == 0) |
| |
| def count_visible(arr: List[float], joints: List[int]) -> int: |
| c = 0 |
| for j in joints: |
| if is_vis(arr, j): |
| c += 1 |
| return c |
| |
| def root_scale_from_anchors(arr: List[float]) -> Optional[Tuple[Tuple[float, float], float]]: |
| pts = [] |
| for j in anchor_joints: |
| if j >= J: |
| continue |
| if is_vis(arr, j): |
| pts.append((float(arr[3 * j + 0]), float(arr[3 * j + 1]))) |
| if len(pts) < min_anchors: |
| return None |
| |
| rx = sum(p[0] for p in pts) / len(pts) |
| ry = sum(p[1] for p in pts) / len(pts) |
| |
| xs = [p[0] for p in pts] |
| ys = [p[1] for p in pts] |
| scale = max(max(xs) - min(xs), max(ys) - min(ys)) |
| if scale <= 1e-3: |
| return None |
| return (rx, ry), float(scale) |
| |
| last_full_idx = None |
| last_full = None |
| last_full_rs = None |
| |
| for t in range(len(out)): |
| arr = out[t] |
| if not isinstance(arr, list) or len(arr) != J * 3: |
| continue |
| |
| rs = root_scale_from_anchors(arr) |
| |
| r_ok = is_vis(arr, R_HIP) |
| l_ok = is_vis(arr, L_HIP) |
| |
| anchors_ok = count_visible(arr, anchor_joints) >= min_anchors |
| |
| if anchors_ok and rs is not None and r_ok and l_ok: |
| last_full_idx = t |
| last_full = list(arr) |
| last_full_rs = rs |
| continue |
| |
| if last_full is None or last_full_rs is None or last_full_idx is None: |
| continue |
| if (t - last_full_idx) > max_lookback: |
| continue |
| if not (r_ok or l_ok): |
| continue |
| if r_ok and l_ok: |
| continue |
| if not always_fill_if_one_hip: |
| continue |
| if rs is None: |
| continue |
| |
| dst_root, dst_scale = rs |
| src_root, src_scale = last_full_rs |
| |
| carried = _apply_root_scale( |
| last_full, |
| src_root=src_root, |
| src_scale=src_scale, |
| dst_root=dst_root, |
| dst_scale=dst_scale, |
| ) |
| if not (isinstance(carried, list) and len(carried) == J * 3): |
| continue |
| |
| def copy_joint(j: int): |
| if j >= J: |
| return |
| if is_vis(arr, j): |
| return |
| cx = float(carried[3 * j + 0]) |
| cy = float(carried[3 * j + 1]) |
| cc = float(carried[3 * j + 2]) |
| if (cx == 0 and cy == 0) or cc <= 0: |
| return |
| arr[3 * j + 0] = cx |
| arr[3 * j + 1] = cy |
| arr[3 * j + 2] = max(min(cc, 0.60), conf_gate) |
| |
| if not r_ok: |
| copy_joint(R_HIP) |
| if fill_legs_with_hip: |
| copy_joint(R_KNEE) |
| copy_joint(R_ANK) |
| |
| if not l_ok: |
| copy_joint(L_HIP) |
| if fill_legs_with_hip: |
| copy_joint(L_KNEE) |
| copy_joint(L_ANK) |
| |
| out[t] = arr |
| |
| return out |
| |
|
|
| def _median3_pose_seq(pose_seq: List[Optional[List[float]]], *, conf_gate: float) -> List[Optional[List[float]]]: |
| if not pose_seq: |
| return pose_seq |
| |
| J = None |
| for arr in pose_seq: |
| if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: |
| J = len(arr) // 3 |
| break |
| if J is None: |
| return pose_seq |
| |
| T = len(pose_seq) |
| |
| def is_vis(arr: List[float], j: int) -> bool: |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| c = float(arr[3 * j + 2]) |
| return (c >= conf_gate) and not (x == 0 and y == 0) |
| |
| out_seq: List[Optional[List[float]]] = [] |
| for t in range(T): |
| arr = pose_seq[t] |
| if not isinstance(arr, list) or len(arr) != J * 3: |
| out_seq.append(arr) |
| continue |
| |
| out = list(arr) |
| t0 = max(0, t - 1) |
| t1 = t |
| t2 = min(T - 1, t + 1) |
| |
| a0 = pose_seq[t0] |
| a1 = pose_seq[t1] |
| a2 = pose_seq[t2] |
| |
| for j in range(J): |
| if not is_vis(arr, j): |
| continue |
| |
| xs, ys = [], [] |
| for aa in (a0, a1, a2): |
| if isinstance(aa, list) and len(aa) == J * 3 and is_vis(aa, j): |
| xs.append(float(aa[3 * j + 0])) |
| ys.append(float(aa[3 * j + 1])) |
| |
| if len(xs) >= 2: |
| xs.sort() |
| ys.sort() |
| out[3 * j + 0] = float(xs[len(xs) // 2]) |
| out[3 * j + 1] = float(ys[len(ys) // 2]) |
| |
| out_seq.append(out) |
| |
| return out_seq |
| |
|
|
| def _sync_group_appearances( |
| pose_arr_seq: List[Optional[List[float]]], |
| *, |
| group: set[int], |
| conf_gate: float, |
| lookahead: int, |
| ) -> List[Optional[List[float]]]: |
| if not pose_arr_seq: |
| return pose_arr_seq |
| |
| J = None |
| for arr in pose_arr_seq: |
| if isinstance(arr, list) and len(arr) % 3 == 0 and len(arr) > 0: |
| J = len(arr) // 3 |
| break |
| if J is None: |
| return pose_arr_seq |
| |
| T = len(pose_arr_seq) |
| out_seq: List[Optional[List[float]]] = [] |
| for arr in pose_arr_seq: |
| if isinstance(arr, list) and len(arr) == J * 3: |
| out_seq.append(list(arr)) |
| else: |
| out_seq.append(arr) |
| |
| def is_vis(arr: List[float], j: int) -> bool: |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| c = float(arr[3 * j + 2]) |
| return (c >= conf_gate) and not (x == 0 and y == 0) |
| |
| for t in range(T): |
| arr = out_seq[t] |
| if not isinstance(arr, list): |
| continue |
| |
| vis = {j for j in group if j < J and is_vis(arr, j)} |
| if not vis: |
| continue |
| |
| missing = {j for j in group if j < J and j not in vis} |
| if not missing: |
| continue |
| |
| appear_t: dict[int, int] = {} |
| for j in list(missing): |
| t2 = t + 1 |
| while t2 < T and t2 <= t + lookahead: |
| arr2 = out_seq[t2] |
| if isinstance(arr2, list) and is_vis(arr2, j): |
| appear_t[j] = t2 |
| break |
| t2 += 1 |
| |
| if not appear_t: |
| continue |
| |
| for j, t2 in appear_t.items(): |
| last_t = None |
| for tb in range(t - 1, -1, -1): |
| arrb = out_seq[tb] |
| if isinstance(arrb, list) and is_vis(arrb, j): |
| last_t = tb |
| break |
| |
| if last_t is None: |
| b = out_seq[t2] |
| if not isinstance(b, list): |
| continue |
| bx, by, bc = float(b[3 * j + 0]), float(b[3 * j + 1]), float(b[3 * j + 2]) |
| for k in range(t, t2): |
| a = out_seq[k] |
| if not isinstance(a, list): |
| continue |
| a[3 * j + 0] = bx |
| a[3 * j + 1] = by |
| a[3 * j + 2] = bc |
| continue |
| |
| a0 = out_seq[last_t] |
| b0 = out_seq[t2] |
| if not (isinstance(a0, list) and isinstance(b0, list)): |
| continue |
| |
| ax, ay, ac = float(a0[3 * j + 0]), float(a0[3 * j + 1]), float(a0[3 * j + 2]) |
| bx, by, bc = float(b0[3 * j + 0]), float(b0[3 * j + 1]), float(b0[3 * j + 2]) |
| |
| if (ax == 0 and ay == 0) or (bx == 0 and by == 0): |
| continue |
| |
| conf_fill = min(ac, bc) |
| total = t2 - last_t |
| if total <= 0: |
| continue |
| |
| for tt in range(t, t2): |
| a = out_seq[tt] |
| if not isinstance(a, list): |
| continue |
| r = (tt - last_t) / total |
| x = ax + (bx - ax) * r |
| y = ay + (by - ay) * r |
| a[3 * j + 0] = float(x) |
| a[3 * j + 1] = float(y) |
| a[3 * j + 2] = float(conf_fill) |
| |
| return out_seq |
| |
|
|
| def _count_valid_points(arr: Optional[List[float]], *, conf_gate: float) -> int: |
| if not isinstance(arr, list) or len(arr) % 3 != 0: |
| return 0 |
| cnt = 0 |
| for i in range(0, len(arr), 3): |
| x, y, c = float(arr[i]), float(arr[i + 1]), float(arr[i + 2]) |
| if c >= conf_gate and not (x == 0 and y == 0): |
| cnt += 1 |
| return cnt |
| |
|
|
| def _zero_out_kps(arr: Optional[List[float]]) -> Optional[List[float]]: |
| if not isinstance(arr, list) or len(arr) % 3 != 0: |
| return arr |
| out = list(arr) |
| for i in range(0, len(out), 3): |
| out[i + 0] = 0.0 |
| out[i + 1] = 0.0 |
| out[i + 2] = 0.0 |
| return out |
| |
| |
| def _pin_body_wrist_to_hand( |
| p_out: Dict[str, Any], |
| *, |
| side: str, |
| conf_gate_body: float = 0.2, |
| conf_gate_hand: float = 0.2, |
| blend: float = 1.0, |
| ) -> None: |
| if side == "right": |
| bw = 4 |
| hk = "hand_right_keypoints_2d" |
| else: |
| bw = 7 |
| hk = "hand_left_keypoints_2d" |
| |
| pose = p_out.get("pose_keypoints_2d") |
| hand = p_out.get(hk) |
| |
| if not (isinstance(pose, list) and isinstance(hand, list)): |
| return |
| if len(pose) < (bw * 3 + 3): |
| return |
| if len(hand) < 3: |
| return |
| |
| hx, hy, hc = float(hand[0]), float(hand[1]), float(hand[2]) |
| if hc < conf_gate_hand or (hx == 0.0 and hy == 0.0): |
| return |
| |
| bx, by, bc = float(pose[bw * 3 + 0]), float(pose[bw * 3 + 1]), float(pose[bw * 3 + 2]) |
| |
| if bc < conf_gate_body or (bx == 0.0 and by == 0.0): |
| pose[bw * 3 + 0] = hx |
| pose[bw * 3 + 1] = hy |
| pose[bw * 3 + 2] = float(max(bc, min(hc, 0.9))) |
| else: |
| nx = bx * (1.0 - blend) + hx * blend |
| ny = by * (1.0 - blend) + hy * blend |
| pose[bw * 3 + 0] = nx |
| pose[bw * 3 + 1] = ny |
| pose[bw * 3 + 2] = float(min(bc, hc)) |
| |
| p_out["pose_keypoints_2d"] = pose |
| |
|
|
| def _fix_elbow_using_wrist(p_out: Dict[str, Any], *, side: str, conf_gate: float = 0.2) -> None: |
| pose = p_out.get("pose_keypoints_2d") |
| if not isinstance(pose, list) or len(pose) % 3 != 0: |
| return |
| |
| if side == "right": |
| sh, el, wr = 2, 3, 4 |
| else: |
| sh, el, wr = 5, 6, 7 |
| |
| def get(j): |
| return float(pose[3 * j + 0]), float(pose[3 * j + 1]), float(pose[3 * j + 2]) |
| |
| def vis(x, y, c): |
| return c >= conf_gate and not (x == 0.0 and y == 0.0) |
| |
| sx, sy, sc = get(sh) |
| ex, ey, ec = get(el) |
| wx, wy, wc = get(wr) |
| |
| if not (vis(sx, sy, sc) and vis(wx, wy, wc)): |
| return |
| |
| if vis(ex, ey, ec): |
| Lse = math.hypot(ex - sx, ey - sy) |
| Lew = math.hypot(wx - ex, wy - ey) |
| else: |
| dsw = math.hypot(wx - sx, wy - sy) |
| if dsw < 1e-3: |
| return |
| Lse = 0.55 * dsw |
| Lew = 0.45 * dsw |
| |
| dx = wx - sx |
| dy = wy - sy |
| d = math.hypot(dx, dy) |
| if d < 1e-6: |
| return |
| |
| d2 = max(min(d, (Lse + Lew) - 1e-3), abs(Lse - Lew) + 1e-3) |
| |
| a = (Lse * Lse - Lew * Lew + d2 * d2) / (2.0 * d2) |
| h2 = max(Lse * Lse - a * a, 0.0) |
| h = math.sqrt(h2) |
| |
| ux = dx / d |
| uy = dy / d |
| px = sx + a * ux |
| py = sy + a * uy |
| |
| rx = -uy |
| ry = ux |
| |
| e1x, e1y = px + h * rx, py + h * ry |
| e2x, e2y = px - h * rx, py - h * ry |
| |
| if vis(ex, ey, ec): |
| if math.hypot(e1x - ex, e1y - ey) <= math.hypot(e2x - ex, e2y - ey): |
| nx, ny = e1x, e1y |
| else: |
| nx, ny = e2x, e2y |
| else: |
| nx, ny = e1x, e1y |
| |
| pose[3 * el + 0] = float(nx) |
| pose[3 * el + 1] = float(ny) |
| pose[3 * el + 2] = float(max(min(ec, 0.8), conf_gate)) |
| |
| p_out["pose_keypoints_2d"] = pose |
| |
|
|
| def _remove_short_presence_runs_kps_seq( |
| seq: List[Optional[List[float]]], |
| *, |
| conf_gate: float, |
| min_points_present: int, |
| min_run: int, |
| ) -> List[Optional[List[float]]]: |
| if not seq: |
| return seq |
| |
| present = [(_count_valid_points(a, conf_gate=conf_gate) >= min_points_present) for a in seq] |
| out = [None if a is None else list(a) for a in seq] |
| |
| start = None |
| for t in range(len(seq) + 1): |
| cur = present[t] if t < len(seq) else False |
| if cur and start is None: |
| start = t |
| if (not cur) and start is not None: |
| run_len = t - start |
| if run_len < min_run: |
| for k in range(start, t): |
| out[k] = _zero_out_kps(out[k]) |
| start = None |
| |
| return out |
| |
|
|
| def _zero_sparse_frames_kps_seq( |
| seq: List[Optional[List[float]]], *, conf_gate: float, min_points_present: int |
| ) -> List[Optional[List[float]]]: |
| if not seq: |
| return seq |
| |
| out: List[Optional[List[float]]] = [] |
| for a in seq: |
| if not isinstance(a, list): |
| out.append(a) |
| continue |
| if _count_valid_points(a, conf_gate=conf_gate) < min_points_present: |
| out.append(_zero_out_kps(a)) |
| else: |
| out.append(a) |
| return out |
| |
|
|
| def _suppress_spatial_outliers_in_hand_arr( |
| hand_arr: Optional[List[float]], *, conf_gate: float, max_bone_factor: float = 3.0 |
| ) -> Optional[List[float]]: |
| if not isinstance(hand_arr, list) or len(hand_arr) % 3 != 0: |
| return hand_arr |
| pts = _reshape_keypoints_2d(hand_arr) |
| J = len(pts) |
| if J < 21: |
| return hand_arr |
| |
| out = [list(p) for p in pts] |
| |
| def vis(j: int) -> bool: |
| x, y, c = out[j] |
| return c >= conf_gate and not (x == 0 and y == 0) |
| |
| vv = [(x, y) for (x, y, c) in out if c >= conf_gate and not (x == 0 and y == 0)] |
| if len(vv) < 6: |
| return hand_arr |
| xs = [p[0] for p in vv] |
| ys = [p[1] for p in vv] |
| scale = max(max(xs) - min(xs), max(ys) - min(ys)) |
| if scale <= 1e-3: |
| return hand_arr |
| max_bone = max_bone_factor * scale |
| |
| for a, b in HAND21_EDGES: |
| if a >= J or b >= J: |
| continue |
| if not vis(a) or not vis(b): |
| continue |
| ax, ay, ac = out[a] |
| bx, by, bc = out[b] |
| d = math.hypot(ax - bx, ay - by) |
| if d > max_bone: |
| if ac <= bc: |
| out[a] = [0.0, 0.0, 0.0] |
| else: |
| out[b] = [0.0, 0.0, 0.0] |
| |
| return _flatten_keypoints_2d([(x, y, c) for x, y, c in out]) |
| |
|
|
| def _body_head_root_scale_from_pose( |
| pose_arr: Optional[List[float]], *, conf_gate: float |
| ) -> Optional[Tuple[Tuple[float, float], float]]: |
| if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: |
| return None |
| kps = _reshape_keypoints_2d(pose_arr) |
| |
| def vis(j: int) -> Optional[Tuple[float, float]]: |
| if j >= len(kps): |
| return None |
| x, y, c = kps[j] |
| if c >= conf_gate and not (x == 0 and y == 0): |
| return (float(x), float(y)) |
| return None |
| |
| pts = [] |
| for j in [0, 1, 14, 15, 16, 17]: |
| p = vis(j) |
| if p is not None: |
| pts.append(p) |
| |
| if not pts: |
| return None |
| |
| rx = sum(p[0] for p in pts) / len(pts) |
| ry = sum(p[1] for p in pts) / len(pts) |
| root = (rx, ry) |
| |
| def dist(a: int, b: int) -> Optional[float]: |
| pa, pb = vis(a), vis(b) |
| if pa is None or pb is None: |
| return None |
| d = math.hypot(pa[0] - pb[0], pa[1] - pb[1]) |
| return d if d > 1e-3 else None |
| |
| cands = [dist(14, 15), dist(16, 17), dist(2, 5)] |
| cands = [c for c in cands if c is not None] |
| if not cands: |
| return None |
| |
| scale = float(sum(cands) / len(cands)) |
| return root, scale |
| |
|
|
| def _body_wrist_root_scale_from_pose( |
| pose_arr: Optional[List[float]], *, side: str, conf_gate: float |
| ) -> Optional[Tuple[Tuple[float, float], float]]: |
| if not isinstance(pose_arr, list) or len(pose_arr) % 3 != 0: |
| return None |
| kps = _reshape_keypoints_2d(pose_arr) |
| |
| if side == "right": |
| w, e = 4, 3 |
| else: |
| w, e = 7, 6 |
| |
| def vis(j: int) -> Optional[Tuple[float, float]]: |
| if j >= len(kps): |
| return None |
| x, y, c = kps[j] |
| if c >= conf_gate and not (x == 0 and y == 0): |
| return (float(x), float(y)) |
| return None |
| |
| pw = vis(w) |
| if pw is None: |
| return None |
| root = pw |
| |
| pe = vis(e) |
| scale = None |
| if pe is not None: |
| d = math.hypot(pw[0] - pe[0], pw[1] - pe[1]) |
| if d > 1e-3: |
| scale = d |
| |
| if scale is None: |
| p2 = vis(2) |
| p5 = vis(5) |
| if p2 is not None and p5 is not None: |
| d = math.hypot(p2[0] - p5[0], p2[1] - p5[1]) |
| if d > 1e-3: |
| scale = d |
| |
| if scale is None: |
| return None |
| |
| return root, float(scale) |
| |
|
|
| def _smooth_dense_seq_anchored_to_body( |
| dense_seq: List[Optional[List[float]]], |
| body_pose_seq: List[Optional[List[float]]], |
| *, |
| kind: str, |
| conf_gate_dense: float, |
| conf_gate_body: float, |
| median3: bool, |
| zero_lag_alpha: float, |
| ) -> List[Optional[List[float]]]: |
| if not dense_seq: |
| return dense_seq |
| |
| Jd = None |
| for a in dense_seq: |
| if isinstance(a, list) and len(a) % 3 == 0 and len(a) > 0: |
| Jd = len(a) // 3 |
| break |
| if Jd is None: |
| return dense_seq |
| |
| T = len(dense_seq) |
| out = [None if a is None else list(a) for a in dense_seq] |
| |
| norm_seq: List[Optional[List[float]]] = [None] * T |
| |
| for t in range(T): |
| arr = out[t] |
| body = body_pose_seq[t] if t < len(body_pose_seq) else None |
| if not isinstance(arr, list) or len(arr) != Jd * 3 or not isinstance(body, list): |
| norm_seq[t] = arr |
| continue |
| |
| if kind == "face": |
| rs = _body_head_root_scale_from_pose(body, conf_gate=conf_gate_body) |
| elif kind == "hand_left": |
| rs = _body_wrist_root_scale_from_pose(body, side="left", conf_gate=conf_gate_body) |
| else: |
| rs = _body_wrist_root_scale_from_pose(body, side="right", conf_gate=conf_gate_body) |
| |
| if rs is None: |
| norm_seq[t] = arr |
| continue |
| |
| (rx, ry), s = rs |
| if s <= 1e-6: |
| norm_seq[t] = arr |
| continue |
| |
| nn = list(arr) |
| for j in range(Jd): |
| x = float(arr[3 * j + 0]) |
| y = float(arr[3 * j + 1]) |
| c = float(arr[3 * j + 2]) |
| if c >= conf_gate_dense and not (x == 0 and y == 0): |
| nn[3 * j + 0] = (x - rx) / s |
| nn[3 * j + 1] = (y - ry) / s |
| norm_seq[t] = nn |
| |
| if median3: |
| norm_seq = _median3_pose_seq(norm_seq, conf_gate=conf_gate_dense) |
| |
| norm_seq = _zero_lag_ema_pose_seq(norm_seq, alpha=zero_lag_alpha, conf_gate=conf_gate_dense) |
| |
| for t in range(T): |
| arrn = norm_seq[t] |
| body = body_pose_seq[t] if t < len(body_pose_seq) else None |
| if not isinstance(arrn, list) or len(arrn) != Jd * 3 or not isinstance(body, list): |
| continue |
| |
| if kind == "face": |
| rs = _body_head_root_scale_from_pose(body, conf_gate=conf_gate_body) |
| elif kind == "hand_left": |
| rs = _body_wrist_root_scale_from_pose(body, side="left", conf_gate=conf_gate_body) |
| else: |
| rs = _body_wrist_root_scale_from_pose(body, side="right", conf_gate=conf_gate_body) |
| |
| if rs is None: |
| continue |
| |
| (rx, ry), s = rs |
| if s <= 1e-6: |
| continue |
| |
| orig = out[t] |
| for j in range(Jd): |
| x = float(arrn[3 * j + 0]) |
| y = float(arrn[3 * j + 1]) |
| c = float(arrn[3 * j + 2]) |
| |
| ox = float(orig[3 * j + 0]) |
| oy = float(orig[3 * j + 1]) |
| oc = float(orig[3 * j + 2]) |
| |
| if oc >= conf_gate_dense and not (ox == 0 and oy == 0) and c >= conf_gate_dense: |
| orig[3 * j + 0] = rx + x * s |
| orig[3 * j + 1] = ry + y * s |
| |
| out[t] = orig |
| |
| return out |
| |
|
|
| def smooth_KPS_json_obj( |
| data: Any, |
| *, |
| keep_face_untouched: bool = True, |
| keep_hands_untouched: bool = True, |
| filter_extra_people: Optional[bool] = None, |
| ) -> Any: |
| if not isinstance(data, list): |
| raise ValueError("Expected top-level JSON to be a list of frames.") |
| |
| if filter_extra_people is None: |
| filter_extra_people = bool(FILTER_EXTRA_PEOPLE) |
| |
| chosen_people: List[Optional[Dict[str, Any]]] = [None] * len(data) |
|
|
| if MAIN_PERSON_MODE == "longest_track": |
| tracks = _build_tracks_over_video(data) |
| main_tr = _pick_main_track(tracks) |
| |
| if main_tr is not None: |
| for t in range(len(data)): |
| if t in main_tr.frames: |
| chosen_people[t] = main_tr.frames[t] |
| else: |
| prev_center: Optional[Tuple[float, float]] = None |
| for i, frame in enumerate(data): |
| if not isinstance(frame, dict): |
| continue |
| people = frame.get("people", []) |
| if not isinstance(people, list) or len(people) == 0: |
| continue |
| chosen = _choose_single_person(people, prev_center) |
| chosen_people[i] = chosen |
| if chosen is not None: |
| c = _body_center_from_pose(chosen.get("pose_keypoints_2d")) |
| if c is not None: |
| prev_center = c |
| else: |
| prev_center: Optional[Tuple[float, float]] = None |
| for i, frame in enumerate(data): |
| if not isinstance(frame, dict): |
| continue |
| people = frame.get("people", []) |
| if not isinstance(people, list) or len(people) == 0: |
| continue |
| chosen = _choose_single_person(people, prev_center) |
| chosen_people[i] = chosen |
| if chosen is not None: |
| c = _body_center_from_pose(chosen.get("pose_keypoints_2d")) |
| if c is not None: |
| prev_center = c |
| |
| pose_seq: List[Optional[List[float]]] = [] |
| for p in chosen_people: |
| pose_seq.append(p.get("pose_keypoints_2d") if isinstance(p, dict) else None) |
| |
| if SPATIAL_OUTLIER_FIX: |
| pose_seq = [ |
| _suppress_spatial_outliers_in_pose_arr(arr, conf_gate=CONF_GATE_BODY) if arr is not None else None |
| for arr in pose_seq |
| ] |
| |
| if GAP_FILL_ENABLED: |
| pose_seq = _denoise_and_fill_gaps_pose_seq( |
| pose_seq, |
| conf_gate=CONF_GATE_BODY, |
| min_run=MIN_RUN_FRAMES, |
| max_gap=MAX_GAP_FRAMES, |
| ) |
| |
| if TORSO_SYNC_ENABLED: |
| pose_seq = _sync_group_appearances( |
| pose_seq, |
| group=TORSO_JOINTS, |
| conf_gate=CONF_GATE_BODY, |
| lookahead=TORSO_LOOKAHEAD_FRAMES, |
| ) |
| |
| pose_seq = [ |
| ( |
| _suppress_isolated_joints_in_pose_arr(arr, conf_gate=CONF_GATE_BODY, keep=TORSO_JOINTS) |
| if arr is not None |
| else None |
| ) |
| for arr in pose_seq |
| ] |
| |
| if MEDIAN3_ENABLED: |
| pose_seq = _median3_pose_seq(pose_seq, conf_gate=CONF_GATE_BODY) |
| |
| if SUPER_SMOOTH_ENABLED: |
| pose_seq = _zero_lag_ema_pose_seq(pose_seq, alpha=SUPER_SMOOTH_ALPHA, conf_gate=SUPER_SMOOTH_MIN_CONF) |
| |
| if ROOTSCALE_CARRY_ENABLED: |
| pose_seq = _carry_pose_when_torso_missing( |
| pose_seq, |
| conf_gate=CARRY_CONF_GATE, |
| max_carry=CARRY_MAX_FRAMES, |
| anchor_joints=CARRY_ANCHOR_JOINTS, |
| min_anchors=CARRY_MIN_ANCHORS, |
| ) |
| |
| pose_seq = _force_full_torso_pair( |
| pose_seq, |
| conf_gate=CARRY_CONF_GATE, |
| anchor_joints=CARRY_ANCHOR_JOINTS, |
| min_anchors=CARRY_MIN_ANCHORS, |
| max_lookback=240, |
| fill_legs_with_hip=True, |
| always_fill_if_one_hip=True, |
| ) |
| |
| face_seq: List[Optional[List[float]]] = [] |
| lh_seq: List[Optional[List[float]]] = [] |
| rh_seq: List[Optional[List[float]]] = [] |
| |
| for p in chosen_people: |
| if isinstance(p, dict): |
| face_seq.append(p.get("face_keypoints_2d")) |
| lh_seq.append(p.get("hand_left_keypoints_2d")) |
| rh_seq.append(p.get("hand_right_keypoints_2d")) |
| else: |
| face_seq.append(None) |
| lh_seq.append(None) |
| rh_seq.append(None) |
| |
| if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched): |
| lh_seq = [ |
| _suppress_spatial_outliers_in_hand_arr(a, conf_gate=CONF_GATE_HAND) if a is not None else None |
| for a in lh_seq |
| ] |
| rh_seq = [ |
| _suppress_spatial_outliers_in_hand_arr(a, conf_gate=CONF_GATE_HAND) if a is not None else None |
| for a in rh_seq |
| ] |
| |
| lh_seq = _remove_short_presence_runs_kps_seq( |
| lh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT, min_run=MIN_HAND_RUN_FRAMES |
| ) |
| rh_seq = _remove_short_presence_runs_kps_seq( |
| rh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT, min_run=MIN_HAND_RUN_FRAMES |
| ) |
| |
| lh_seq = _zero_sparse_frames_kps_seq( |
| lh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT |
| ) |
| rh_seq = _zero_sparse_frames_kps_seq( |
| rh_seq, conf_gate=CONF_GATE_HAND, min_points_present=HAND_MIN_POINTS_PRESENT |
| ) |
| |
| if DENSE_GAP_FILL_ENABLED: |
| lh_seq = _denoise_and_fill_gaps_pose_seq( |
| lh_seq, conf_gate=CONF_GATE_HAND, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES |
| ) |
| rh_seq = _denoise_and_fill_gaps_pose_seq( |
| rh_seq, conf_gate=CONF_GATE_HAND, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES |
| ) |
| |
| if FACE_SMOOTH_ENABLED and (not keep_face_untouched): |
| if DENSE_GAP_FILL_ENABLED: |
| face_seq = _denoise_and_fill_gaps_pose_seq( |
| face_seq, conf_gate=CONF_GATE_FACE, min_run=DENSE_MIN_RUN_FRAMES, max_gap=DENSE_MAX_GAP_FRAMES |
| ) |
| |
| if FACE_SMOOTH_ENABLED and (not keep_face_untouched): |
| face_seq = _smooth_dense_seq_anchored_to_body( |
| face_seq, |
| pose_seq, |
| kind="face", |
| conf_gate_dense=CONF_GATE_FACE, |
| conf_gate_body=CONF_GATE_BODY, |
| median3=DENSE_MEDIAN3_ENABLED, |
| zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA, |
| ) |
| |
| if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched): |
| lh_seq = _smooth_dense_seq_anchored_to_body( |
| lh_seq, |
| pose_seq, |
| kind="hand_left", |
| conf_gate_dense=CONF_GATE_HAND, |
| conf_gate_body=CONF_GATE_BODY, |
| median3=DENSE_MEDIAN3_ENABLED, |
| zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA, |
| ) |
| rh_seq = _smooth_dense_seq_anchored_to_body( |
| rh_seq, |
| pose_seq, |
| kind="hand_right", |
| conf_gate_dense=CONF_GATE_HAND, |
| conf_gate_body=CONF_GATE_BODY, |
| median3=DENSE_MEDIAN3_ENABLED, |
| zero_lag_alpha=DENSE_SUPER_SMOOTH_ALPHA, |
| ) |
| |
| out_frames = [] |
| body_state: Optional[BodyState] = None |
| |
| for i, frame in enumerate(data): |
| if not isinstance(frame, dict): |
| out_frames.append(frame) |
| continue |
| |
| frame_out = copy.deepcopy(frame) |
| chosen = chosen_people[i] |
| |
| if chosen is None: |
| if filter_extra_people: |
| frame_out["people"] = [] |
| out_frames.append(frame_out) |
| continue |
| |
| p_out = copy.deepcopy(chosen) |
| p_out["pose_keypoints_2d"] = pose_seq[i] |
| |
| pose_arr = p_out.get("pose_keypoints_2d") |
| joints = (len(pose_arr) // 3) if isinstance(pose_arr, list) else 0 |
| if body_state is None: |
| body_state = BodyState(joints if joints > 0 else 18) |
| |
| p_out["pose_keypoints_2d"] = _smooth_body_pose(p_out.get("pose_keypoints_2d"), body_state) |
| |
| if FACE_SMOOTH_ENABLED and (not keep_face_untouched): |
| p_out["face_keypoints_2d"] = face_seq[i] |
| else: |
| p_out["face_keypoints_2d"] = chosen.get("face_keypoints_2d", p_out.get("face_keypoints_2d")) |
| |
| if HANDS_SMOOTH_ENABLED and (not keep_hands_untouched): |
| p_out["hand_left_keypoints_2d"] = lh_seq[i] |
| p_out["hand_right_keypoints_2d"] = rh_seq[i] |
| else: |
| p_out["hand_left_keypoints_2d"] = chosen.get("hand_left_keypoints_2d", p_out.get("hand_left_keypoints_2d")) |
| p_out["hand_right_keypoints_2d"] = chosen.get( |
| "hand_right_keypoints_2d", p_out.get("hand_right_keypoints_2d") |
| ) |
| |
| _pin_body_wrist_to_hand( |
| p_out, side="left", conf_gate_body=CONF_GATE_BODY, conf_gate_hand=CONF_GATE_HAND, blend=1.0 |
| ) |
| _pin_body_wrist_to_hand( |
| p_out, side="right", conf_gate_body=CONF_GATE_BODY, conf_gate_hand=CONF_GATE_HAND, blend=1.0 |
| ) |
| |
| _fix_elbow_using_wrist(p_out, side="left", conf_gate=CONF_GATE_BODY) |
| _fix_elbow_using_wrist(p_out, side="right", conf_gate=CONF_GATE_BODY) |
| |
| if filter_extra_people: |
| frame_out["people"] = [p_out] |
| else: |
| orig_people = frame.get("people", []) |
| if not isinstance(orig_people, list): |
| frame_out["people"] = [p_out] |
| else: |
| replaced = False |
| new_people = [] |
| for op in orig_people: |
| if (not replaced) and (op is chosen): |
| new_people.append(p_out) |
| replaced = True |
| else: |
| new_people.append(copy.deepcopy(op)) |
| if not replaced: |
| new_people = [p_out] + [copy.deepcopy(op) for op in orig_people] |
| frame_out["people"] = new_people |
| |
| out_frames.append(frame_out) |
| |
| return out_frames |
| |
|
|
| # ============================================================ |
| # === END: smooth_KPS_json.py logic |
| # ============================================================ |
|
|
|
|
| # ============================================================ |
| # === START: render_pose_video.py logic (ported to frame render) |
| # ============================================================ |
|
|
| OP_COLORS: List[Tuple[int, int, int]] = [ |
| (255, 0, 0), |
| (255, 85, 0), |
| (255, 170, 0), |
| (255, 255, 0), |
| (170, 255, 0), |
| (85, 255, 0), |
| (0, 255, 0), |
| (0, 255, 85), |
| (0, 255, 170), |
| (0, 255, 255), |
| (0, 170, 255), |
| (0, 85, 255), |
| (0, 0, 255), |
| (85, 0, 255), |
| (170, 0, 255), |
| (255, 0, 255), |
| (255, 0, 170), |
| (255, 0, 85), |
| ] |
| |
| BODY_EDGES: List[Tuple[int, int]] = [ |
| (1, 2), |
| (1, 5), |
| (2, 3), |
| (3, 4), |
| (5, 6), |
| (6, 7), |
| (1, 8), |
| (8, 9), |
| (9, 10), |
| (1, 11), |
| (11, 12), |
| (12, 13), |
| (1, 0), |
| (0, 14), |
| (14, 16), |
| (0, 15), |
| (15, 17), |
| ] |
| |
| BODY_EDGE_COLORS = OP_COLORS[: len(BODY_EDGES)] |
| BODY_JOINT_COLORS = OP_COLORS |
| |
| HAND_EDGES: List[Tuple[int, int]] = [ |
| (0, 1), |
| (1, 2), |
| (2, 3), |
| (3, 4), |
| (0, 5), |
| (5, 6), |
| (6, 7), |
| (7, 8), |
| (0, 9), |
| (9, 10), |
| (10, 11), |
| (11, 12), |
| (0, 13), |
| (13, 14), |
| (14, 15), |
| (15, 16), |
| (0, 17), |
| (17, 18), |
| (18, 19), |
| (19, 20), |
| ] |
| |
|
|
| def _valid_pt(x: float, y: float, c: float, conf_thresh: float) -> bool: |
| return (c is not None) and (c >= conf_thresh) and not (x == 0 and y == 0) |
|
|
|
|
| def _hsv_to_bgr(h: float, s: float, v: float) -> Tuple[int, int, int]: |
| H = int(np.clip(h, 0.0, 1.0) * 179.0) |
| S = int(np.clip(s, 0.0, 1.0) * 255.0) |
| V = int(np.clip(v, 0.0, 1.0) * 255.0) |
| hsv = np.uint8([[[H, S, V]]]) |
| bgr = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)[0, 0] |
| return int(bgr[0]), int(bgr[1]), int(bgr[2]) |
| |
|
|
| def _looks_normalized(points: List[Tuple[float, float, float]], conf_thresh: float) -> bool: |
| valid = [(x, y, c) for (x, y, c) in points if _valid_pt(x, y, c, conf_thresh)] |
| if not valid: |
| return False |
| in01 = sum(1 for (x, y, _) in valid if 0.0 <= x <= 1.0 and 0.0 <= y <= 1.0) |
| return (in01 / float(len(valid))) >= 0.7 |
| |
|
|
| def _draw_body( |
| canvas: np.ndarray, pose: List[Tuple[float, float, float]], conf_thresh: float, xinsr_stick_scaling: bool = False |
| ) -> None: |
| CH, CW = canvas.shape[:2] |
| stickwidth = 2 |
| |
| valid = [(x, y, c) for (x, y, c) in pose if _valid_pt(x, y, c, conf_thresh)] |
| norm = False |
| if valid: |
| in01 = sum(1 for (x, y, _) in valid if 0.0 <= x <= 1.0 and 0.0 <= y <= 1.0) |
| norm = (in01 / float(len(valid))) >= 0.7 |
| |
| def to_px(x: float, y: float) -> Tuple[float, float]: |
| if norm: |
| return x * CW, y * CH |
| return x, y |
| |
| max_side = max(CW, CH) |
| if xinsr_stick_scaling: |
| stick_scale = 1 if max_side < 500 else min(2 + (max_side // 1000), 7) |
| else: |
| stick_scale = 1 |
| |
| for idx, (a, b) in enumerate(BODY_EDGES): |
| if a >= len(pose) or b >= len(pose): |
| continue |
| |
| ax, ay, ac = pose[a] |
| bx, by, bc = pose[b] |
| if not (_valid_pt(ax, ay, ac, conf_thresh) and _valid_pt(bx, by, bc, conf_thresh)): |
| continue |
| |
| ax, ay = to_px(ax, ay) |
| bx, by = to_px(bx, by) |
| |
| base = BODY_EDGE_COLORS[idx] if idx < len(BODY_EDGE_COLORS) else (255, 255, 255) |
| |
| X = np.array([ay, by], dtype=np.float32) |
| Y = np.array([ax, bx], dtype=np.float32) |
| |
| mX = float(np.mean(X)) |
| mY = float(np.mean(Y)) |
| length = float(np.hypot(X[0] - X[1], Y[0] - Y[1])) |
| if length < 1.0: |
| continue |
| |
| angle = math.degrees(math.atan2(X[0] - X[1], Y[0] - Y[1])) |
| |
| polygon = cv2.ellipse2Poly( |
| (int(mY), int(mX)), |
| (int(length / 2), int(stickwidth * stick_scale)), |
| int(angle), |
| 0, |
| 360, |
| 1, |
| ) |
| |
| cv2.fillConvexPoly( |
| canvas, |
| polygon, |
| (int(base[0] * 0.6), int(base[1] * 0.6), int(base[2] * 0.6)), |
| ) |
| |
| for j, (x, y, c) in enumerate(pose): |
| if not _valid_pt(x, y, c, conf_thresh): |
| continue |
| x, y = to_px(x, y) |
| col = BODY_JOINT_COLORS[j] if j < len(BODY_JOINT_COLORS) else (255, 255, 255) |
| cv2.circle(canvas, (int(x), int(y)), 2, col, thickness=-1) |
| |
|
|
| def _draw_hand(canvas: np.ndarray, hand: List[Tuple[float, float, float]], conf_thresh: float) -> None: |
| if not hand or len(hand) < 21: |
| return |
| |
| CH, CW = canvas.shape[:2] |
| norm = _looks_normalized(hand, conf_thresh) |
|
|
| def to_px(x: float, y: float) -> Tuple[float, float]: |
| return (x * CW, y * CH) if norm else (x, y) |
| |
| n_edges = len(HAND_EDGES) |
| for i, (a, b) in enumerate(HAND_EDGES): |
| x1, y1, c1 = hand[a] |
| x2, y2, c2 = hand[b] |
| if _valid_pt(x1, y1, c1, conf_thresh) and _valid_pt(x2, y2, c2, conf_thresh): |
| x1, y1 = to_px(x1, y1) |
| x2, y2 = to_px(x2, y2) |
| bgr = _hsv_to_bgr(i / float(n_edges), 1.0, 1.0) |
| cv2.line(canvas, (int(x1), int(y1)), (int(x2), int(y2)), bgr, 1, cv2.LINE_AA) |
| |
| for x, y, c in hand: |
| if _valid_pt(x, y, c, conf_thresh): |
| x, y = to_px(x, y) |
| cv2.circle(canvas, (int(x), int(y)), 1, (0, 0, 255), -1, cv2.LINE_AA) |
| |
|
|
| def _draw_face(canvas: np.ndarray, face: List[Tuple[float, float, float]], conf_thresh: float) -> None: |
| if not face: |
| return |
| |
| CH, CW = canvas.shape[:2] |
| norm = _looks_normalized(face, conf_thresh) |
|
|
| def to_px(x: float, y: float) -> Tuple[float, float]: |
| return (x * CW, y * CH) if norm else (x, y) |
| |
| for x, y, c in face: |
| if _valid_pt(x, y, c, conf_thresh): |
| x, y = to_px(x, y) |
| cv2.circle(canvas, (int(x), int(y)), 0, (255, 255, 255), -1, cv2.LINE_AA) |
| |
|
|
| def _draw_pose_frame_full( |
| w: int, |
| h: int, |
| person: Dict[str, Any], |
| conf_thresh_body: float = 0.10, |
| conf_thresh_hands: float = 0.10, |
| conf_thresh_face: float = 0.10, |
| ) -> np.ndarray: |
| img = np.zeros((h, w, 3), dtype=np.uint8) |
| |
| pose = _reshape_keypoints_2d(person.get("pose_keypoints_2d") or []) |
| face = _reshape_keypoints_2d(person.get("face_keypoints_2d") or []) |
| hand_l = _reshape_keypoints_2d(person.get("hand_left_keypoints_2d") or []) |
| hand_r = _reshape_keypoints_2d(person.get("hand_right_keypoints_2d") or []) |
| |
| if pose: |
| _draw_body(img, pose, conf_thresh_body) |
| if hand_l: |
| _draw_hand(img, hand_l, conf_thresh_hands) |
| if hand_r: |
| _draw_hand(img, hand_r, conf_thresh_hands) |
| if face: |
| _draw_face(img, face, conf_thresh_face) |
| |
| return img |
| |
|
|
| # ============================================================ |
| # === END: render_pose_video.py logic |
| # ============================================================ |
|
|
|
|
| # ============================================================ |
| # ComfyUI mappings |
| # ============================================================ |
|
|
| NODE_CLASS_MAPPINGS = { |
| "TSPoseDataSmoother": KPSSmoothPoseDataAndRender, |
| } |
| |
| NODE_DISPLAY_NAME_MAPPINGS = { |
| "TSPoseDataSmoother": "KPS: Smooth + Render (pose_data/PKL)", |
| } |
|
|