| import cv2 |
| import tempfile |
| import shutil |
| import mediapipe as mp |
| import numpy as np |
| from push_up.engine import PoseEngine |
|
|
|
|
| class VideoProcessor: |
| def __init__(self): |
| self.engine = PoseEngine() |
|
|
| def _detect_orientation(self, video_path): |
| """ |
| Phát hiện hướng người tập trong video (đầu bên trái hay bên phải). |
| Trả về True nếu cần flip (đầu bên trái), False nếu không cần (đầu bên phải). |
| |
| Logic: So sánh vị trí X trung bình của cổ tay (wrist) với hông (hip). |
| Trong tư thế hít đất chuẩn (đầu phải), wrist.x > hip.x. |
| Nếu ngược lại → cần flip. |
| """ |
| cap = cv2.VideoCapture(video_path) |
| detector = mp.solutions.pose.Pose(static_image_mode=True, model_complexity=0, |
| min_detection_confidence=0.5) |
| |
| wrist_hip_diffs = [] |
| frame_idx = 0 |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| |
| sample_step = max(1, total_frames // 10) |
| |
| while cap.isOpened() and len(wrist_hip_diffs) < 10: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| if frame_idx % sample_step == 0: |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| results = detector.process(rgb) |
| if results.pose_landmarks: |
| lms = results.pose_landmarks.landmark |
| |
| wrist_x = (lms[15].x + lms[16].x) / 2 |
| |
| hip_x = (lms[23].x + lms[24].x) / 2 |
| wrist_hip_diffs.append(wrist_x - hip_x) |
| frame_idx += 1 |
| |
| cap.release() |
| detector.close() |
| |
| if not wrist_hip_diffs: |
| return False |
| |
| avg_diff = np.mean(wrist_hip_diffs) |
| |
| return avg_diff < 0 |
|
|
| def _process_video_path(self, video_path): |
| |
| needs_flip = self._detect_orientation(video_path) |
|
|
| cap = cv2.VideoCapture(video_path) |
| data_list = [] |
| curr_frame_idx = 0 |
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
| if needs_flip: |
| frame = cv2.flip(frame, 1) |
| if curr_frame_idx % 2 == 0: |
| res, _ = self.engine.extract_kinematics(frame) |
| if res: |
| |
| |
| if res["shoulder_heel_y_dist"] < 0.4: |
| res["frame_idx"] = curr_frame_idx |
| res["flipped"] = needs_flip |
| data_list.append(res) |
| curr_frame_idx += 1 |
| cap.release() |
| return data_list, video_path |
|
|
| def process_video_lightweight(self, file_buffer): |
| tfile = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") |
| file_buffer.seek(0) |
| shutil.copyfileobj(file_buffer, tfile) |
| video_path = tfile.name |
| tfile.close() |
| return self._process_video_path(video_path) |
|
|
| def process_video_from_path(self, video_path): |
| return self._process_video_path(video_path) |
|
|
| def get_frame(self, video_path, frame_idx, flip=False): |
| cap = cv2.VideoCapture(video_path) |
| cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
| ret, frame = cap.read() |
| cap.release() |
| if ret and flip: |
| frame = cv2.flip(frame, 1) |
| return frame if ret else None |
|
|