|
|
import numpy as np |
|
|
import cv2 |
|
|
from PIL import Image |
|
|
from typing import Tuple, List, Optional, Dict |
|
|
from .error_handler import PoseDetectionError, ImageProcessingError, safe_execute |
|
|
|
|
|
class DWPoseDetector: |
|
|
def __init__(self, manager): |
|
|
self.manager = manager |
|
|
self.input_size = 640 |
|
|
self.detection_threshold = 0.3 |
|
|
|
|
|
def detect(self, image): |
|
|
"""画像からポーズを検出(refs互換実装)""" |
|
|
try: |
|
|
if not self.manager.is_initialized(): |
|
|
raise PoseDetectionError("モデルが初期化されていません") |
|
|
|
|
|
|
|
|
processed_image = safe_execute( |
|
|
lambda: self._preprocess_image(image), |
|
|
"画像の前処理に失敗しました", |
|
|
show_error=False |
|
|
) |
|
|
if processed_image is None: |
|
|
raise ImageProcessingError("画像の前処理に失敗しました") |
|
|
|
|
|
print(f"[DEBUG] 🖼️ Image preprocessed: {type(processed_image)}, shape: {processed_image.shape}") |
|
|
|
|
|
|
|
|
persons = safe_execute( |
|
|
lambda: self._detect_persons_refs(processed_image, processed_image), |
|
|
"人物検出に失敗しました", |
|
|
show_error=False |
|
|
) |
|
|
if not persons or len(persons) == 0: |
|
|
raise PoseDetectionError("人物が検出されませんでした") |
|
|
|
|
|
print(f"[DEBUG] 👤 Detected {len(persons)} persons") |
|
|
|
|
|
|
|
|
pose_results = safe_execute( |
|
|
lambda: self._estimate_pose_refs(image, persons), |
|
|
"ポーズ検出に失敗しました", |
|
|
show_error=False |
|
|
) |
|
|
|
|
|
if pose_results and len(pose_results) > 0: |
|
|
|
|
|
formatted_result = self._format_to_json_refs(pose_results) |
|
|
print(f"[DEBUG] ✅ Pose detection successful: {len(pose_results)} poses") |
|
|
return formatted_result, None |
|
|
else: |
|
|
raise PoseDetectionError("ポーズを検出できませんでした") |
|
|
|
|
|
except (PoseDetectionError, ImageProcessingError) as e: |
|
|
return None, str(e) |
|
|
except Exception as e: |
|
|
return None, f"予期しないエラー: {str(e)}" |
|
|
|
|
|
def _preprocess_image(self, image): |
|
|
"""画像前処理(refs互換)""" |
|
|
if image is None: |
|
|
raise ImageProcessingError("画像が選択されていません") |
|
|
|
|
|
|
|
|
if isinstance(image, Image.Image): |
|
|
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
|
|
elif isinstance(image, np.ndarray): |
|
|
pass |
|
|
else: |
|
|
raise ImageProcessingError("サポートされていない画像形式です") |
|
|
|
|
|
|
|
|
return self._preprocess_image_refs(image) |
|
|
|
|
|
def _preprocess_image_refs(self, image: np.ndarray, target_size: Tuple[int, int] = (640, 640)) -> np.ndarray: |
|
|
"""refs互換の画像前処理""" |
|
|
if len(image.shape) == 3 and image.shape[2] == 3: |
|
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
processed_img = self._resize_with_aspect_ratio(image, target_size) |
|
|
processed_img = processed_img.astype(np.float32) / 255.0 |
|
|
processed_img = processed_img.transpose(2, 0, 1) |
|
|
processed_img = np.expand_dims(processed_img, axis=0) |
|
|
|
|
|
return processed_img |
|
|
|
|
|
def _resize_with_aspect_ratio(self, image: np.ndarray, target_size: Tuple[int, int]) -> np.ndarray: |
|
|
"""アスペクト比を保持したリサイズ処理(refs互換)""" |
|
|
h, w = image.shape[:2] |
|
|
target_w, target_h = target_size |
|
|
|
|
|
scale = min(target_w / w, target_h / h) |
|
|
new_w, new_h = int(w * scale), int(h * scale) |
|
|
|
|
|
resized = cv2.resize(image, (new_w, new_h)) |
|
|
|
|
|
padded = np.zeros((target_h, target_w, 3), dtype=np.uint8) |
|
|
|
|
|
offset_x = (target_w - new_w) // 2 |
|
|
offset_y = (target_h - new_h) // 2 |
|
|
padded[offset_y:offset_y+new_h, offset_x:offset_x+new_w] = resized |
|
|
|
|
|
return padded |
|
|
|
|
|
def _detect_persons_refs(self, image: np.ndarray, original_image: np.ndarray) -> List[Dict]: |
|
|
"""refs互換の人物検出""" |
|
|
try: |
|
|
outputs = self.manager.yolox_session.run(None, {self.manager.yolox_input_name: image}) |
|
|
predictions = outputs[0] |
|
|
|
|
|
if predictions.ndim == 3: |
|
|
predictions = predictions[0] |
|
|
|
|
|
input_shape = (640, 640) |
|
|
predictions = self._demo_postprocess(predictions, input_shape) |
|
|
|
|
|
boxes = predictions[:, :4] |
|
|
scores = predictions[:, 4:5] * predictions[:, 5:] |
|
|
|
|
|
boxes_xyxy = np.ones_like(boxes) |
|
|
boxes_xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2. |
|
|
boxes_xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2. |
|
|
boxes_xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2. |
|
|
boxes_xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2. |
|
|
|
|
|
if image.ndim == 4: |
|
|
_, _, h, w = image.shape |
|
|
else: |
|
|
h, w = image.shape[0:2] |
|
|
ratio = min(640 / w, 640 / h) |
|
|
boxes_xyxy /= ratio |
|
|
|
|
|
|
|
|
dets = self._multiclass_nms(boxes_xyxy, scores, nms_thr=0.45, score_thr=0.1) |
|
|
|
|
|
persons = [] |
|
|
if dets is not None: |
|
|
final_boxes, final_scores, final_cls_inds = dets[:, :4], dets[:, 4], dets[:, 5] |
|
|
|
|
|
|
|
|
person_detections = (final_cls_inds == 0) |
|
|
person_scores = final_scores[person_detections] |
|
|
if len(person_scores) > 0: |
|
|
print(f"[DEBUG] 人物検出候補: {len(person_scores)}個, 最高スコア: {person_scores.max():.3f}") |
|
|
else: |
|
|
print("[DEBUG] 人物検出候補が0個です") |
|
|
|
|
|
is_person = (final_cls_inds == 0) & (final_scores > self.detection_threshold) |
|
|
final_boxes = final_boxes[is_person] |
|
|
final_scores = final_scores[is_person] |
|
|
|
|
|
print(f"[DEBUG] 閾値{self.detection_threshold}以上の人物: {len(final_scores)}個") |
|
|
|
|
|
for box, conf in zip(final_boxes, final_scores): |
|
|
x1, y1, x2, y2 = box |
|
|
persons.append({ |
|
|
"bbox": [float(x1), float(y1), float(x2), float(y2)], |
|
|
"confidence": float(conf) |
|
|
}) |
|
|
|
|
|
if len(persons) == 0: |
|
|
|
|
|
|
|
|
yolox_w, yolox_h = 640, 640 |
|
|
x1, y1 = yolox_w * 0.2, yolox_h * 0.2 |
|
|
x2, y2 = yolox_w * 0.8, yolox_h * 0.8 |
|
|
persons.append({"bbox": [float(x1), float(y1), float(x2), float(y2)], "confidence": 1.0}) |
|
|
print(f"[DEBUG] 🔄 Fallback detection: [{x1:.0f}, {y1:.0f}, {x2:.0f}, {y2:.0f}] (YOLOX 640x640基準)") |
|
|
|
|
|
return persons |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Person detection error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
return [] |
|
|
|
|
|
def _demo_postprocess(self, outputs: np.ndarray, img_size: Tuple[int, int], p6: bool = False) -> np.ndarray: |
|
|
"""refs互換のYOLOX後処理""" |
|
|
grids = [] |
|
|
expanded_strides = [] |
|
|
strides = [8, 16, 32] if not p6 else [8, 16, 32, 64] |
|
|
|
|
|
hsizes = [img_size[0] // stride for stride in strides] |
|
|
wsizes = [img_size[1] // stride for stride in strides] |
|
|
|
|
|
for hsize, wsize, stride in zip(hsizes, wsizes, strides): |
|
|
xv, yv = np.meshgrid(np.arange(wsize), np.arange(hsize)) |
|
|
grid = np.stack((xv, yv), 2).reshape(1, -1, 2) |
|
|
grids.append(grid) |
|
|
shape = grid.shape[:2] |
|
|
expanded_strides.append(np.full((*shape, 1), stride)) |
|
|
|
|
|
grids = np.concatenate(grids, 1) |
|
|
expanded_strides = np.concatenate(expanded_strides, 1) |
|
|
outputs[..., :2] = (outputs[..., :2] + grids) * expanded_strides |
|
|
outputs[..., 2:4] = np.exp(outputs[..., 2:4]) * expanded_strides |
|
|
|
|
|
return outputs |
|
|
|
|
|
def _multiclass_nms(self, boxes: np.ndarray, scores: np.ndarray, nms_thr: float, score_thr: float) -> Optional[np.ndarray]: |
|
|
"""refs互換のNMS""" |
|
|
final_dets = [] |
|
|
num_classes = scores.shape[1] |
|
|
for cls_ind in range(num_classes): |
|
|
cls_scores = scores[:, cls_ind] |
|
|
valid_score_mask = cls_scores > score_thr |
|
|
if valid_score_mask.sum() == 0: |
|
|
continue |
|
|
else: |
|
|
valid_scores = cls_scores[valid_score_mask] |
|
|
valid_boxes = boxes[valid_score_mask] |
|
|
keep = self._nms(valid_boxes, valid_scores, nms_thr) |
|
|
if len(keep) > 0: |
|
|
cls_inds = np.ones((len(keep), 1)) * cls_ind |
|
|
dets = np.concatenate( |
|
|
[valid_boxes[keep], valid_scores[keep, None], cls_inds], 1 |
|
|
) |
|
|
final_dets.append(dets) |
|
|
if len(final_dets) == 0: |
|
|
return None |
|
|
return np.concatenate(final_dets, 0) |
|
|
|
|
|
def _nms(self, boxes: np.ndarray, scores: np.ndarray, nms_thr: float) -> List[int]: |
|
|
"""refs互換のNMS""" |
|
|
x1 = boxes[:, 0] |
|
|
y1 = boxes[:, 1] |
|
|
x2 = boxes[:, 2] |
|
|
y2 = boxes[:, 3] |
|
|
|
|
|
areas = (x2 - x1 + 1) * (y2 - y1 + 1) |
|
|
order = scores.argsort()[::-1] |
|
|
|
|
|
keep = [] |
|
|
while order.size > 0: |
|
|
i = order[0] |
|
|
keep.append(i) |
|
|
xx1 = np.maximum(x1[i], x1[order[1:]]) |
|
|
yy1 = np.maximum(y1[i], y1[order[1:]]) |
|
|
xx2 = np.minimum(x2[i], x2[order[1:]]) |
|
|
yy2 = np.minimum(y2[i], y2[order[1:]]) |
|
|
|
|
|
w = np.maximum(0.0, xx2 - xx1 + 1) |
|
|
h = np.maximum(0.0, yy2 - yy1 + 1) |
|
|
inter = w * h |
|
|
ovr = inter / (areas[i] + areas[order[1:]] - inter) |
|
|
|
|
|
inds = np.where(ovr <= nms_thr)[0] |
|
|
order = order[inds + 1] |
|
|
|
|
|
return keep |
|
|
|
|
|
def _estimate_pose_refs(self, image: np.ndarray, person_boxes: List[Dict]) -> List[Dict]: |
|
|
"""refs互換のポーズ推定""" |
|
|
pose_results = [] |
|
|
|
|
|
|
|
|
|
|
|
if hasattr(image, 'shape'): |
|
|
|
|
|
orig_h, orig_w = image.shape[:2] |
|
|
elif hasattr(image, 'size'): |
|
|
|
|
|
orig_w, orig_h = image.size |
|
|
|
|
|
image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
|
|
orig_h, orig_w = image.shape[:2] |
|
|
else: |
|
|
|
|
|
orig_w, orig_h = 640, 640 |
|
|
|
|
|
|
|
|
target_resolution = (512, 512) |
|
|
image_resized = cv2.resize(image, target_resolution) |
|
|
orig_w, orig_h = target_resolution |
|
|
image = image_resized |
|
|
|
|
|
|
|
|
self._original_image_size = (orig_w, orig_h) |
|
|
print(f"[DEBUG] 📷 Original image size recorded: {self._original_image_size}") |
|
|
|
|
|
model_input_shape = self.manager.dwpose_session.get_inputs()[0].shape |
|
|
model_h, model_w = model_input_shape[2], model_input_shape[3] |
|
|
model_input_size = (model_w, model_h) |
|
|
|
|
|
print(f"[DEBUG] 🎯 Model input size: {model_input_size}") |
|
|
|
|
|
for person_idx, person in enumerate(person_boxes): |
|
|
try: |
|
|
bbox = person["bbox"] |
|
|
|
|
|
|
|
|
target_w, target_h = 640, 640 |
|
|
scale = min(target_w / orig_w, target_h / orig_h) |
|
|
new_w, new_h = orig_w * scale, orig_h * scale |
|
|
offset_x = (target_w - new_w) / 2 |
|
|
offset_y = (target_h - new_h) / 2 |
|
|
|
|
|
x1p, y1p, x2p, y2p = bbox |
|
|
|
|
|
|
|
|
x1 = (x1p - offset_x) / scale |
|
|
y1 = (y1p - offset_y) / scale |
|
|
x2 = (x2p - offset_x) / scale |
|
|
y2 = (y2p - offset_y) / scale |
|
|
|
|
|
bbox = [x1, y1, x2, y2] |
|
|
|
|
|
print(f"[DEBUG] 🔄 Coordinate transform: YOLOX({x1p:.1f},{y1p:.1f},{x2p:.1f},{y2p:.1f}) → Original({x1:.1f},{y1:.1f},{x2:.1f},{y2:.1f})") |
|
|
print(f"[DEBUG] 📐 Transform params: scale={scale:.3f}, offset=({offset_x:.1f},{offset_y:.1f}), orig_size=({orig_w},{orig_h})") |
|
|
|
|
|
print(f"[DEBUG] 📦 Person {person_idx}: bbox {bbox}") |
|
|
|
|
|
keypoints, scores = self._inference_pose_dwpose_refs(image, [bbox], model_input_size) |
|
|
|
|
|
if len(keypoints) > 0 and len(scores) > 0: |
|
|
combined_keypoints = [] |
|
|
for i, (kp, score) in enumerate(zip(keypoints[0], scores[0])): |
|
|
combined_keypoints.append([float(kp[0]), float(kp[1]), float(score)]) |
|
|
|
|
|
|
|
|
if i in [12, 13, 14, 15, 16]: |
|
|
part_names = {12: "右腰", 13: "左腰", 14: "右膝", 15: "左膝", 16: "右足首"} |
|
|
part_name = part_names.get(i, f"下半身{i}") |
|
|
print(f"[DEBUG] 🦵 生データ {part_name}[{i}]: ({kp[0]:.1f}, {kp[1]:.1f}) 生信頼度:{score:.3f}") |
|
|
|
|
|
filtered_keypoints = self._filter_by_confidence_refs(combined_keypoints) |
|
|
|
|
|
pose_results.append({ |
|
|
"bbox": bbox, |
|
|
"keypoints": filtered_keypoints, |
|
|
"confidence": person["confidence"] |
|
|
}) |
|
|
|
|
|
print(f"[DEBUG] ✅ Person {person_idx}: {len(filtered_keypoints)} keypoints, valid: {len([k for k in filtered_keypoints if k[2] > 0])}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Pose estimation error: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
continue |
|
|
|
|
|
return pose_results |
|
|
|
|
|
def _filter_by_confidence_refs(self, keypoints: List[List[float]], threshold: float = None) -> List[List[float]]: |
|
|
"""refs互換の信頼度フィルタリング""" |
|
|
if threshold is None: |
|
|
threshold = self.detection_threshold |
|
|
|
|
|
|
|
|
filtered = [] |
|
|
for i, kp in enumerate(keypoints): |
|
|
current_threshold = threshold |
|
|
|
|
|
if kp[2] >= current_threshold: |
|
|
filtered.append(kp) |
|
|
else: |
|
|
filtered.append([0.0, 0.0, 0.0]) |
|
|
|
|
|
return filtered |
|
|
|
|
|
def _inference_pose_dwpose_refs(self, image: np.ndarray, bboxes: List[List[float]], model_input_size: Tuple[int, int]) -> Tuple[List[np.ndarray], List[np.ndarray]]: |
|
|
"""refs互換のDWPose推論""" |
|
|
resized_imgs, centers, scales = self._preprocess_dwpose_refs(image, bboxes, model_input_size) |
|
|
|
|
|
all_outputs = [] |
|
|
for resized_img in resized_imgs: |
|
|
input_data = resized_img.transpose(2, 0, 1)[None, ...].astype(np.float32) |
|
|
|
|
|
sess_input = {self.manager.dwpose_input_name: input_data} |
|
|
outputs = self.manager.dwpose_session.run(None, sess_input) |
|
|
all_outputs.append(outputs) |
|
|
|
|
|
keypoints, scores = self._postprocess_dwpose_refs(all_outputs, model_input_size, centers, scales) |
|
|
|
|
|
return keypoints, scores |
|
|
|
|
|
def _preprocess_dwpose_refs(self, image: np.ndarray, bboxes: List[List[float]], input_size: Tuple[int, int]) -> Tuple[List[np.ndarray], List[np.ndarray], List[np.ndarray]]: |
|
|
"""refs互換のDWPose前処理""" |
|
|
img_shape = image.shape[:2] |
|
|
out_img, out_center, out_scale = [], [], [] |
|
|
|
|
|
if len(bboxes) == 0: |
|
|
bboxes = [[0, 0, img_shape[1], img_shape[0]]] |
|
|
|
|
|
for bbox in bboxes: |
|
|
x1, y1, x2, y2 = bbox |
|
|
bbox_array = np.array([x1, y1, x2, y2]) |
|
|
|
|
|
|
|
|
center, scale = self._bbox_xyxy2cs(bbox_array, padding=1.25) |
|
|
resized_img, scale = self._top_down_affine(input_size, scale, center, image) |
|
|
|
|
|
|
|
|
mean = np.array([123.675, 116.28, 103.53]) |
|
|
std = np.array([58.395, 57.12, 57.375]) |
|
|
resized_img = (resized_img - mean) / std |
|
|
|
|
|
out_img.append(resized_img) |
|
|
out_center.append(center) |
|
|
out_scale.append(scale) |
|
|
|
|
|
return out_img, out_center, out_scale |
|
|
|
|
|
def _bbox_xyxy2cs(self, bbox: np.ndarray, padding: float = 1.0) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""refs互換のbbox変換""" |
|
|
dim = bbox.ndim |
|
|
if dim == 1: |
|
|
bbox = bbox[None, :] |
|
|
|
|
|
x1, y1, x2, y2 = np.hsplit(bbox, [1, 2, 3]) |
|
|
center = np.hstack([x1 + x2, y1 + y2]) * 0.5 |
|
|
scale = np.hstack([x2 - x1, y2 - y1]) * padding |
|
|
|
|
|
if dim == 1: |
|
|
center = center[0] |
|
|
scale = scale[0] |
|
|
|
|
|
return center, scale |
|
|
|
|
|
def _fix_aspect_ratio(self, bbox_scale: np.ndarray, aspect_ratio: float) -> np.ndarray: |
|
|
"""refs互換のアスペクト比修正""" |
|
|
w, h = np.hsplit(bbox_scale, [1]) |
|
|
bbox_scale = np.where(w > h * aspect_ratio, |
|
|
np.hstack([w, w / aspect_ratio]), |
|
|
np.hstack([h * aspect_ratio, h])) |
|
|
return bbox_scale |
|
|
|
|
|
def _get_warp_matrix(self, center: np.ndarray, scale: np.ndarray, rot: float, output_size: Tuple[int, int]) -> np.ndarray: |
|
|
"""refs互換のアフィン変換行列計算""" |
|
|
src_w = scale[0] |
|
|
dst_w = output_size[0] |
|
|
dst_h = output_size[1] |
|
|
|
|
|
rot_rad = np.deg2rad(rot) |
|
|
src_dir = self._rotate_point(np.array([0., src_w * -0.5]), rot_rad) |
|
|
dst_dir = np.array([0., dst_w * -0.5]) |
|
|
|
|
|
src = np.zeros((3, 2), dtype=np.float32) |
|
|
src[0, :] = center |
|
|
src[1, :] = center + src_dir |
|
|
src[2, :] = self._get_3rd_point(src[0, :], src[1, :]) |
|
|
|
|
|
dst = np.zeros((3, 2), dtype=np.float32) |
|
|
dst[0, :] = [dst_w * 0.5, dst_h * 0.5] |
|
|
dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir |
|
|
dst[2, :] = self._get_3rd_point(dst[0, :], dst[1, :]) |
|
|
|
|
|
warp_mat = cv2.getAffineTransform(np.float32(src), np.float32(dst)) |
|
|
return warp_mat |
|
|
|
|
|
def _rotate_point(self, pt: np.ndarray, angle_rad: float) -> np.ndarray: |
|
|
"""refs互換の点回転""" |
|
|
sn, cs = np.sin(angle_rad), np.cos(angle_rad) |
|
|
rot_mat = np.array([[cs, -sn], [sn, cs]]) |
|
|
return rot_mat @ pt |
|
|
|
|
|
def _get_3rd_point(self, a: np.ndarray, b: np.ndarray) -> np.ndarray: |
|
|
"""refs互換の第3点取得""" |
|
|
direction = a - b |
|
|
c = b + np.r_[-direction[1], direction[0]] |
|
|
return c |
|
|
|
|
|
def _top_down_affine(self, input_size: Tuple[int, int], bbox_scale: np.ndarray, bbox_center: np.ndarray, img: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""refs互換のアフィン変換""" |
|
|
w, h = input_size |
|
|
warp_size = (int(w), int(h)) |
|
|
|
|
|
bbox_scale = self._fix_aspect_ratio(bbox_scale, aspect_ratio=w / h) |
|
|
|
|
|
center = bbox_center |
|
|
scale = bbox_scale |
|
|
rot = 0 |
|
|
warp_mat = self._get_warp_matrix(center, scale, rot, output_size=(w, h)) |
|
|
|
|
|
img = cv2.warpAffine(img, warp_mat, warp_size, flags=cv2.INTER_LINEAR) |
|
|
|
|
|
return img, bbox_scale |
|
|
|
|
|
def _postprocess_dwpose_refs(self, all_outputs: List, model_input_size: Tuple[int, int], centers: List[np.ndarray], scales: List[np.ndarray], simcc_split_ratio: float = 2.0) -> Tuple[List[np.ndarray], List[np.ndarray]]: |
|
|
"""refs互換のDWPose後処理""" |
|
|
|
|
|
self._last_dwpose_params = { |
|
|
'model_input_size': model_input_size, |
|
|
'centers': centers, |
|
|
'scales': scales, |
|
|
'simcc_split_ratio': simcc_split_ratio |
|
|
} |
|
|
|
|
|
all_keypoints = [] |
|
|
all_scores = [] |
|
|
|
|
|
for i, outputs in enumerate(all_outputs): |
|
|
simcc_x, simcc_y = outputs[0], outputs[1] |
|
|
keypoints, scores = self._decode_simcc(simcc_x, simcc_y, simcc_split_ratio) |
|
|
|
|
|
|
|
|
keypoints = keypoints / np.array(model_input_size) * scales[i] + centers[i] - scales[i] / 2 |
|
|
|
|
|
|
|
|
if len(keypoints.shape) == 3 and keypoints.shape[0] == 1: |
|
|
|
|
|
keypoints_2d = keypoints[0] |
|
|
else: |
|
|
keypoints_2d = keypoints |
|
|
|
|
|
print(f"[DEBUG] 🔄 Before normalization: shape={keypoints_2d.shape}") |
|
|
|
|
|
|
|
|
|
|
|
normalized_keypoints = keypoints_2d |
|
|
|
|
|
|
|
|
if len(keypoints.shape) == 3 and keypoints.shape[0] == 1: |
|
|
normalized_keypoints = np.expand_dims(normalized_keypoints, axis=0) |
|
|
|
|
|
all_keypoints.append(normalized_keypoints[0] if len(normalized_keypoints.shape) == 3 else normalized_keypoints) |
|
|
all_scores.append(scores[0]) |
|
|
|
|
|
return all_keypoints, all_scores |
|
|
|
|
|
def _decode_simcc(self, simcc_x: np.ndarray, simcc_y: np.ndarray, simcc_split_ratio: float) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""refs互換のSimCCデコード""" |
|
|
keypoints, scores = self._get_simcc_maximum(simcc_x, simcc_y) |
|
|
keypoints /= simcc_split_ratio |
|
|
return keypoints, scores |
|
|
|
|
|
def _get_simcc_maximum(self, simcc_x: np.ndarray, simcc_y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
"""refs互換のSimCC最大値取得""" |
|
|
N, K, Wx = simcc_x.shape |
|
|
simcc_x = simcc_x.reshape(N * K, -1) |
|
|
simcc_y = simcc_y.reshape(N * K, -1) |
|
|
|
|
|
x_locs = np.argmax(simcc_x, axis=1) |
|
|
y_locs = np.argmax(simcc_y, axis=1) |
|
|
locs = np.stack((x_locs, y_locs), axis=-1).astype(np.float32) |
|
|
max_val_x = np.amax(simcc_x, axis=1) |
|
|
max_val_y = np.amax(simcc_y, axis=1) |
|
|
|
|
|
mask = max_val_x > max_val_y |
|
|
max_val_x[mask] = max_val_y[mask] |
|
|
vals = max_val_x |
|
|
locs[vals <= 0.] = -1 |
|
|
|
|
|
locs = locs.reshape(N, K, 2) |
|
|
vals = vals.reshape(N, K) |
|
|
|
|
|
return locs, vals |
|
|
|
|
|
def _format_to_json_refs(self, pose_results: List[Dict]) -> Dict: |
|
|
"""refs互換のJSON形式変換""" |
|
|
formatted_data = { |
|
|
"version": "1.3", |
|
|
"people": [], |
|
|
"metadata": {} |
|
|
} |
|
|
|
|
|
for pose_result in pose_results: |
|
|
converted_keypoints = self._convert_to_openpose_with_feet_format(pose_result["keypoints"]) |
|
|
|
|
|
original_keypoints = pose_result["keypoints"] |
|
|
|
|
|
face_keypoints = self._extract_face_keypoints_raw(original_keypoints) |
|
|
hand_left_keypoints = self._extract_hand_keypoints_raw(original_keypoints, is_left=True) |
|
|
hand_right_keypoints = self._extract_hand_keypoints_raw(original_keypoints, is_left=False) |
|
|
|
|
|
print(f"[DEBUG] 😊 Face keypoints (raw): {len(face_keypoints)} points") |
|
|
print(f"[DEBUG] 👋 Hand keypoints (raw): Left={len(hand_left_keypoints)}, Right={len(hand_right_keypoints)}") |
|
|
|
|
|
person_data = { |
|
|
"pose_keypoints_2d": self._flatten_keypoints(converted_keypoints), |
|
|
"face_keypoints_2d": self._flatten_keypoints(face_keypoints), |
|
|
"hand_left_keypoints_2d": self._flatten_keypoints(hand_left_keypoints), |
|
|
"hand_right_keypoints_2d": self._flatten_keypoints(hand_right_keypoints), |
|
|
"bbox": pose_result["bbox"], |
|
|
"confidence": pose_result["confidence"] |
|
|
} |
|
|
formatted_data["people"].append(person_data) |
|
|
|
|
|
|
|
|
if len(pose_results) > 0: |
|
|
candidates = [] |
|
|
for kp in converted_keypoints: |
|
|
candidates.append([float(kp[0]), float(kp[1])]) |
|
|
|
|
|
formatted_data["bodies"] = { |
|
|
"candidate": candidates, |
|
|
"subset": [[list(range(len(candidates))), 1.0, len(candidates)]] |
|
|
} |
|
|
|
|
|
|
|
|
if len(face_keypoints) > 0: |
|
|
formatted_data["faces"] = [self._flatten_keypoints(face_keypoints)] |
|
|
else: |
|
|
formatted_data["faces"] = [] |
|
|
|
|
|
if len(hand_left_keypoints) > 0 or len(hand_right_keypoints) > 0: |
|
|
hands_data = [] |
|
|
if len(hand_left_keypoints) > 0: |
|
|
hands_data.append(self._flatten_keypoints(hand_left_keypoints)) |
|
|
if len(hand_right_keypoints) > 0: |
|
|
hands_data.append(self._flatten_keypoints(hand_right_keypoints)) |
|
|
formatted_data["hands"] = hands_data |
|
|
else: |
|
|
formatted_data["hands"] = [] |
|
|
|
|
|
formatted_data["resolution"] = [512, 512] |
|
|
|
|
|
return formatted_data |
|
|
|
|
|
def _convert_to_openpose_with_feet_format(self, keypoints: List[List[float]]) -> List[List[float]]: |
|
|
"""refs互換のOpenPose+足形式変換(20個)""" |
|
|
|
|
|
converted_18 = self._convert_to_openpose_format(keypoints) |
|
|
|
|
|
|
|
|
converted_20 = converted_18.copy() |
|
|
|
|
|
|
|
|
if len(keypoints) > 19 and keypoints[18][2] > 0 and keypoints[19][2] > 0: |
|
|
left_toe_x = (keypoints[18][0] + keypoints[19][0]) / 2 |
|
|
left_toe_y = (keypoints[18][1] + keypoints[19][1]) / 2 |
|
|
left_toe_conf = min(keypoints[18][2], keypoints[19][2]) |
|
|
converted_20.append([left_toe_x, left_toe_y, left_toe_conf]) |
|
|
else: |
|
|
converted_20.append([0.0, 0.0, 0.0]) |
|
|
|
|
|
|
|
|
if len(keypoints) > 22 and keypoints[21][2] > 0 and keypoints[22][2] > 0: |
|
|
right_toe_x = (keypoints[21][0] + keypoints[22][0]) / 2 |
|
|
right_toe_y = (keypoints[21][1] + keypoints[22][1]) / 2 |
|
|
right_toe_conf = min(keypoints[21][2], keypoints[22][2]) |
|
|
converted_20.append([right_toe_x, right_toe_y, right_toe_conf]) |
|
|
else: |
|
|
converted_20.append([0.0, 0.0, 0.0]) |
|
|
|
|
|
return converted_20 |
|
|
|
|
|
def _convert_to_openpose_format(self, keypoints: List[List[float]]) -> List[List[float]]: |
|
|
"""refs互換のOpenPose形式変換(18個)""" |
|
|
if len(keypoints) < 17: |
|
|
while len(keypoints) < 17: |
|
|
keypoints.append([0.0, 0.0, 0.0]) |
|
|
|
|
|
|
|
|
print(f"[DEBUG] 🎯 DWPose→OpenPose変換開始: {len(keypoints)}キーポイント") |
|
|
for i in range(min(17, len(keypoints))): |
|
|
kp = keypoints[i] |
|
|
conf = kp[2] if len(kp) > 2 else 0.0 |
|
|
|
|
|
if i in [1, 2, 3, 4, 12, 13, 14, 15, 16]: |
|
|
part_names = {1: "左目", 2: "右目", 3: "左耳", 4: "右耳", 12: "下半身12", 13: "下半身13", 14: "下半身14", 15: "下半身15", 16: "下半身16"} |
|
|
part_name = part_names.get(i, f"DWPose[{i}]") |
|
|
print(f"[DEBUG] 🦵 {part_name}: ({kp[0]:.1f}, {kp[1]:.1f}) 信頼度:{conf:.3f}") |
|
|
|
|
|
|
|
|
if keypoints[5][2] > 0.3 and keypoints[6][2] > 0.3: |
|
|
neck_x = (keypoints[5][0] + keypoints[6][0]) / 2 |
|
|
neck_y = (keypoints[5][1] + keypoints[6][1]) / 2 |
|
|
neck_conf = min(keypoints[5][2], keypoints[6][2]) |
|
|
neck = [neck_x, neck_y, neck_conf] |
|
|
else: |
|
|
neck = [0.0, 0.0, 0.0] |
|
|
|
|
|
new_keypoints = keypoints[:17] + [neck] |
|
|
|
|
|
converted = [[0.0, 0.0, 0.0] for _ in range(18)] |
|
|
|
|
|
|
|
|
converted[0] = new_keypoints[0] |
|
|
|
|
|
if len(new_keypoints) > 17: |
|
|
converted[1] = new_keypoints[17] |
|
|
if len(new_keypoints) > 6: |
|
|
converted[2] = new_keypoints[6] |
|
|
if len(new_keypoints) > 8: |
|
|
converted[3] = new_keypoints[8] |
|
|
if len(new_keypoints) > 10: |
|
|
converted[4] = new_keypoints[10] |
|
|
if len(new_keypoints) > 5: |
|
|
converted[5] = new_keypoints[5] |
|
|
if len(new_keypoints) > 7: |
|
|
converted[6] = new_keypoints[7] |
|
|
if len(new_keypoints) > 9: |
|
|
converted[7] = new_keypoints[9] |
|
|
if len(new_keypoints) > 12: |
|
|
converted[8] = new_keypoints[12] |
|
|
if len(new_keypoints) > 14: |
|
|
converted[9] = new_keypoints[14] |
|
|
if len(new_keypoints) > 16: |
|
|
converted[10] = new_keypoints[16] |
|
|
if len(new_keypoints) > 11: |
|
|
converted[11] = new_keypoints[11] |
|
|
if len(new_keypoints) > 13: |
|
|
converted[12] = new_keypoints[13] |
|
|
if len(new_keypoints) > 15: |
|
|
converted[13] = new_keypoints[15] |
|
|
if len(new_keypoints) > 2: |
|
|
converted[14] = new_keypoints[2] |
|
|
if len(new_keypoints) > 1: |
|
|
converted[15] = new_keypoints[1] |
|
|
if len(new_keypoints) > 4: |
|
|
converted[16] = new_keypoints[4] |
|
|
if len(new_keypoints) > 3: |
|
|
converted[17] = new_keypoints[3] |
|
|
|
|
|
|
|
|
print(f"[DEBUG] 🎯 変換後のOpenPose 目・耳キーポイント:") |
|
|
eye_ear_indices = [14, 15, 16, 17] |
|
|
eye_ear_names = ["右目", "左目", "右耳", "左耳"] |
|
|
for idx, name in zip(eye_ear_indices, eye_ear_names): |
|
|
if idx < len(converted): |
|
|
kp = converted[idx] |
|
|
conf = kp[2] if len(kp) > 2 else 0.0 |
|
|
print(f"[DEBUG] 👁️ OpenPose[{idx}] {name}: ({kp[0]:.1f}, {kp[1]:.1f}) 信頼度:{conf:.3f}") |
|
|
|
|
|
return converted |
|
|
|
|
|
def _apply_dwpose_coordinate_transform(self, keypoints: List[List[float]]) -> List[List[float]]: |
|
|
"""手と顔のキーポイントを生データから正しく変換(棒人間と同じ処理)""" |
|
|
if not keypoints or len(keypoints) == 0: |
|
|
return keypoints |
|
|
|
|
|
|
|
|
|
|
|
print(f"[DEBUG] 🔄 Hand/Face coordinate normalization: {len(keypoints)} keypoints") |
|
|
|
|
|
|
|
|
kp_array = np.array(keypoints) |
|
|
|
|
|
|
|
|
normalized_kp = self._normalize_to_standard_resolution(kp_array[:, :2]) |
|
|
|
|
|
|
|
|
result = [] |
|
|
for i, (norm_kp, orig_kp) in enumerate(zip(normalized_kp, keypoints)): |
|
|
original_conf = orig_kp[2] if len(orig_kp) > 2 else 0.0 |
|
|
result.append([float(norm_kp[0]), float(norm_kp[1]), original_conf]) |
|
|
|
|
|
print(f"[DEBUG] 🎯 Normalized {len(result)} hand/face keypoints") |
|
|
return result |
|
|
|
|
|
def _extract_face_keypoints_raw(self, keypoints: List[List[float]]) -> List[List[float]]: |
|
|
"""顔キーポイントの生データを抽出(座標変換なし)""" |
|
|
if len(keypoints) >= 91: |
|
|
return keypoints[23:91] |
|
|
else: |
|
|
return [] |
|
|
|
|
|
def _extract_hand_keypoints_raw(self, keypoints: List[List[float]], is_left: bool = True) -> List[List[float]]: |
|
|
"""手キーポイントの生データを抽出(座標変換なし)""" |
|
|
if len(keypoints) >= 133: |
|
|
if is_left: |
|
|
return keypoints[91:112] |
|
|
else: |
|
|
return keypoints[112:133] |
|
|
else: |
|
|
return [] |
|
|
|
|
|
def _align_face_to_body(self, face_keypoints_raw: List[List[float]], body_keypoints: List[List[float]]) -> List[List[float]]: |
|
|
"""顔キーポイントを棒人間の鼻基準で座標系に合わせる""" |
|
|
if not face_keypoints_raw or not body_keypoints or len(body_keypoints) == 0: |
|
|
return [] |
|
|
|
|
|
|
|
|
body_nose = body_keypoints[0] |
|
|
if not body_nose or len(body_nose) < 2: |
|
|
return [] |
|
|
|
|
|
|
|
|
valid_face_points = [kp for kp in face_keypoints_raw if kp and len(kp) >= 2 and kp[2] > 0.3] |
|
|
if not valid_face_points: |
|
|
return [] |
|
|
|
|
|
face_center_x = np.mean([kp[0] for kp in valid_face_points]) |
|
|
face_center_y = np.mean([kp[1] for kp in valid_face_points]) |
|
|
|
|
|
|
|
|
offset_x = body_nose[0] - face_center_x |
|
|
offset_y = body_nose[1] - face_center_y |
|
|
|
|
|
print(f"[DEBUG] 😊 Face alignment: center=({face_center_x:.1f}, {face_center_y:.1f}) → nose=({body_nose[0]:.1f}, {body_nose[1]:.1f}), offset=({offset_x:.1f}, {offset_y:.1f})") |
|
|
|
|
|
|
|
|
aligned_face = [] |
|
|
for kp in face_keypoints_raw: |
|
|
if kp and len(kp) >= 2: |
|
|
new_x = kp[0] + offset_x |
|
|
new_y = kp[1] + offset_y |
|
|
conf = kp[2] if len(kp) > 2 else 0.0 |
|
|
aligned_face.append([new_x, new_y, conf]) |
|
|
else: |
|
|
aligned_face.append([0.0, 0.0, 0.0]) |
|
|
|
|
|
return aligned_face |
|
|
|
|
|
def _align_hand_to_body(self, hand_keypoints_raw: List[List[float]], body_keypoints: List[List[float]], is_left: bool = True) -> List[List[float]]: |
|
|
"""手キーポイントを棒人間の手首基準で座標系に合わせる""" |
|
|
if not hand_keypoints_raw or not body_keypoints: |
|
|
return [] |
|
|
|
|
|
|
|
|
wrist_index = 7 if is_left else 4 |
|
|
if len(body_keypoints) <= wrist_index: |
|
|
return [] |
|
|
|
|
|
body_wrist = body_keypoints[wrist_index] |
|
|
if not body_wrist or len(body_wrist) < 2: |
|
|
return [] |
|
|
|
|
|
|
|
|
if not hand_keypoints_raw or len(hand_keypoints_raw) == 0: |
|
|
return [] |
|
|
|
|
|
hand_wrist = hand_keypoints_raw[0] |
|
|
if not hand_wrist or len(hand_wrist) < 2: |
|
|
return [] |
|
|
|
|
|
|
|
|
offset_x = body_wrist[0] - hand_wrist[0] |
|
|
offset_y = body_wrist[1] - hand_wrist[1] |
|
|
|
|
|
hand_side = "左" if is_left else "右" |
|
|
print(f"[DEBUG] 👋 {hand_side}手 alignment: hand_wrist=({hand_wrist[0]:.1f}, {hand_wrist[1]:.1f}) → body_wrist=({body_wrist[0]:.1f}, {body_wrist[1]:.1f}), offset=({offset_x:.1f}, {offset_y:.1f})") |
|
|
|
|
|
|
|
|
aligned_hand = [] |
|
|
for kp in hand_keypoints_raw: |
|
|
if kp and len(kp) >= 2: |
|
|
new_x = kp[0] + offset_x |
|
|
new_y = kp[1] + offset_y |
|
|
conf = kp[2] if len(kp) > 2 else 0.0 |
|
|
aligned_hand.append([new_x, new_y, conf]) |
|
|
else: |
|
|
aligned_hand.append([0.0, 0.0, 0.0]) |
|
|
|
|
|
return aligned_hand |
|
|
|
|
|
def _extract_face_keypoints(self, keypoints: List[List[float]]) -> List[List[float]]: |
|
|
"""refs互換の顔キーポイント抽出""" |
|
|
if len(keypoints) >= 91: |
|
|
face_kps = keypoints[23:91] |
|
|
|
|
|
|
|
|
face_kps = self._apply_dwpose_coordinate_transform(face_kps) |
|
|
return face_kps |
|
|
else: |
|
|
return [] |
|
|
|
|
|
def _extract_hand_keypoints(self, keypoints: List[List[float]], is_left: bool = True) -> List[List[float]]: |
|
|
"""refs互換の手キーポイント抽出""" |
|
|
if len(keypoints) >= 133: |
|
|
if is_left: |
|
|
hand_kps = keypoints[91:112] |
|
|
else: |
|
|
hand_kps = keypoints[112:133] |
|
|
|
|
|
|
|
|
hand_kps = self._apply_dwpose_coordinate_transform(hand_kps) |
|
|
return hand_kps |
|
|
else: |
|
|
return [] |
|
|
|
|
|
def _apply_resolution_normalization_to_keypoints(self, keypoints: List[List[float]]) -> List[List[float]]: |
|
|
"""リスト形式のキーポイントに座標正規化を適用""" |
|
|
if not keypoints or len(keypoints) == 0: |
|
|
return keypoints |
|
|
|
|
|
|
|
|
kp_array = np.array(keypoints) |
|
|
|
|
|
|
|
|
normalized_array = self._normalize_to_standard_resolution(kp_array) |
|
|
|
|
|
|
|
|
return normalized_array.tolist() |
|
|
|
|
|
def _normalize_to_standard_resolution(self, keypoints: np.ndarray, target_resolution: Tuple[int, int] = (512, 512)) -> np.ndarray: |
|
|
"""元画像サイズから標準解像度(512x512)への座標正規化""" |
|
|
|
|
|
print(f"[DEBUG] 🔍 Keypoints shape: {keypoints.shape}, type: {type(keypoints)}") |
|
|
|
|
|
|
|
|
if keypoints.size == 0: |
|
|
print("[DEBUG] ⚠️ Empty keypoints, returning as-is") |
|
|
return keypoints |
|
|
|
|
|
|
|
|
if len(keypoints.shape) == 1: |
|
|
if len(keypoints) >= 2: |
|
|
|
|
|
keypoints = keypoints.reshape(-1, 2) |
|
|
print(f"[DEBUG] 🔄 Reshaped 1D to 2D: {keypoints.shape}") |
|
|
else: |
|
|
print("[DEBUG] ⚠️ Too few elements in 1D array") |
|
|
return keypoints |
|
|
|
|
|
|
|
|
if hasattr(self, '_original_image_size') and self._original_image_size: |
|
|
orig_w, orig_h = self._original_image_size |
|
|
print(f"[DEBUG] 🎯 Using recorded image size: {orig_w}x{orig_h}") |
|
|
else: |
|
|
|
|
|
try: |
|
|
if len(keypoints.shape) == 2 and keypoints.shape[1] >= 2: |
|
|
max_x = np.max(keypoints[:, 0]) |
|
|
max_y = np.max(keypoints[:, 1]) |
|
|
elif len(keypoints.shape) == 1 and len(keypoints) >= 2: |
|
|
max_x = np.max(keypoints[0::2]) |
|
|
max_y = np.max(keypoints[1::2]) |
|
|
else: |
|
|
print(f"[DEBUG] ⚠️ Unexpected keypoints shape: {keypoints.shape}") |
|
|
return keypoints |
|
|
|
|
|
|
|
|
orig_w = max_x * 1.2 |
|
|
orig_h = max_y * 1.2 |
|
|
|
|
|
|
|
|
if orig_w > 1000: |
|
|
if orig_w > 1070: |
|
|
orig_w, orig_h = 1080, 1080 |
|
|
else: |
|
|
orig_w, orig_h = 1024, 1024 |
|
|
else: |
|
|
orig_w, orig_h = 640, 640 |
|
|
|
|
|
print(f"[DEBUG] 📊 Estimated from keypoints: {orig_w:.0f}x{orig_h:.0f}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[DEBUG] ❌ Error getting max values: {e}") |
|
|
return keypoints |
|
|
|
|
|
print(f"[DEBUG] 🎯 Resolution normalize: orig_size=({orig_w:.0f}x{orig_h:.0f}) → target={target_resolution}") |
|
|
|
|
|
|
|
|
scale_x = target_resolution[0] / orig_w |
|
|
scale_y = target_resolution[1] / orig_h |
|
|
|
|
|
|
|
|
normalized_keypoints = keypoints.copy() |
|
|
if len(keypoints.shape) == 2 and keypoints.shape[1] >= 2: |
|
|
normalized_keypoints[:, 0] *= scale_x |
|
|
normalized_keypoints[:, 1] *= scale_y |
|
|
elif len(keypoints.shape) == 1: |
|
|
normalized_keypoints[0::2] *= scale_x |
|
|
normalized_keypoints[1::2] *= scale_y |
|
|
|
|
|
print(f"[DEBUG] 🔄 Keypoint scaling: scale=({scale_x:.3f}, {scale_y:.3f})") |
|
|
|
|
|
return normalized_keypoints |
|
|
|
|
|
def _flatten_keypoints(self, keypoints: List[List[float]]) -> List[float]: |
|
|
"""refs互換のキーポイント平坦化""" |
|
|
flattened = [] |
|
|
for kp in keypoints: |
|
|
flattened.extend(kp) |
|
|
return flattened |