| from __future__ import annotations |
|
|
| from collections import Counter |
| from datetime import datetime |
| from pathlib import Path |
| import json |
| import pickle |
| import uuid |
|
|
| import cv2 |
| import mediapipe as mp |
| import numpy as np |
|
|
| from push_up.evaluator import PushUpEvaluator |
| from push_up.feedback_graph import generate_rep_visual_feedback |
| from push_up.processor import VideoProcessor |
| from push_up.template_registry import ( |
| DEFAULT_TEMPLATE_ID, |
| get_template, |
| resolve_template_video_path, |
| safe_template_id, |
| ) |
|
|
|
|
| BASE_DIR = Path(__file__).resolve().parent.parent |
| TEMPLATE_SOURCE = BASE_DIR / "data" / "templates" / "push_up_template.mp4" |
| TEMPLATE_CACHE = BASE_DIR / "data" / "processed" / "pushup_template.pkl" |
| DEBUG_ARTIFACTS_ROOT = BASE_DIR / "analysis_artifacts" |
| VIDEO_TEST_LOG_FILE = BASE_DIR / ".ai-log" / "video_test_runs.jsonl" |
| ANALYSIS_DIR = Path("analysis") |
|
|
| ERROR_GUIDANCE = { |
| "not_deep_enough": "Hạ ngực thấp hơn ở pha xuống và giữ nhịp kiểm soát thay vì rơi tự do.", |
| "hip_sag": "Siết bụng và mông để giữ thân người thành một đường thẳng từ vai đến gót chân.", |
| "body_not_straight": "Ổn định vai, hông và gót trên cùng một trục trước khi bắt đầu rep tiếp theo.", |
| "head_misaligned": "Giữ cổ trung tính, mắt nhìn xuống sàn phía trước tay một khoảng ngắn.", |
| "hip_pike": "Tránh đẩy hông lên cao, hãy giữ vai và hông di chuyển cùng nhau.", |
| } |
|
|
| ERROR_LABELS = { |
| "not_deep_enough": "Chưa hạ người đủ sâu", |
| "hip_sag": "Võng lưng", |
| "body_not_straight": "Cơ thể chưa giữ thẳng", |
| "head_misaligned": "Gập cổ hoặc cúi đầu quá mức", |
| "hip_pike": "Nhô mông quá cao", |
| } |
|
|
| ARROW_LABELS = { |
| "not_deep_enough": "Ngực chưa đủ thấp", |
| "hip_sag": "Hông bị võng xuống", |
| "body_not_straight": "Hông lệch khỏi trục", |
| "head_misaligned": "Cổ cúi quá mức", |
| "hip_pike": "Hông quá cao", |
| } |
|
|
| SEVERITY_RANK = {"high": 3, "medium": 2, "low": 1} |
|
|
|
|
| def template_cache_exists(template_id: str | None = None) -> bool: |
| return _template_cache_path(template_id).exists() |
|
|
|
|
| def prepare_template_cache(template_id: str | None = None, force: bool = False) -> dict: |
| template = get_template(template_id) |
| template_source = resolve_template_video_path(template) |
| template_cache = _template_cache_path(template["id"]) |
| template_cache.parent.mkdir(parents=True, exist_ok=True) |
| if template_cache.exists() and not force: |
| with template_cache.open("rb") as file: |
| cache_payload = pickle.load(file) |
| if Path(cache_payload.get("source", "")).exists(): |
| return cache_payload |
|
|
| |
| |
| |
| cache_payload["source"] = str(template_source) |
| cache_payload["template_id"] = template["id"] |
| cache_payload["template_title"] = template["title"] |
| with template_cache.open("wb") as file: |
| pickle.dump(cache_payload, file) |
| return cache_payload |
|
|
| processor = VideoProcessor() |
| template_data, _ = processor.process_video_from_path(str(template_source)) |
| evaluator = PushUpEvaluator() |
| evaluation = evaluator.evaluate(template_data, template_data) |
| cache_payload = { |
| "source": str(template_source), |
| "template_id": template["id"], |
| "template_title": template["title"], |
| "prepared_at": datetime.utcnow().isoformat(), |
| "template_data": template_data, |
| "template_rep_count": evaluation["ex_reps_count"] if not evaluation.get("error") else 0, |
| } |
| with template_cache.open("wb") as file: |
| pickle.dump(cache_payload, file) |
| return cache_payload |
|
|
|
|
| def analyze_pushup( |
| student_video_path: str | Path, |
| upload_root: str | Path | None = None, |
| *, |
| save_artifacts: bool = False, |
| template_id: str | None = None, |
| ) -> dict: |
| """Analyze one student video. |
| |
| By default this does not persist student videos, per-run JSON, or frame images. |
| Set save_artifacts=True only for debugging/offline inspection. |
| """ |
| upload_root = Path(upload_root) if upload_root is not None else DEBUG_ARTIFACTS_ROOT |
| student_video_path = Path(student_video_path) |
| cache_payload = prepare_template_cache(template_id) |
| processor = VideoProcessor() |
| evaluator = PushUpEvaluator() |
|
|
| expert_data = cache_payload["template_data"] |
| template_video_path = Path(cache_payload.get("source") or TEMPLATE_SOURCE) |
| if not template_video_path.exists(): |
| template_video_path = TEMPLATE_SOURCE |
| selected_template_id = str(cache_payload.get("template_id") or DEFAULT_TEMPLATE_ID) |
| selected_template_title = str(cache_payload.get("template_title") or "Hít đất chuẩn") |
|
|
| run_id = "" |
| run_dir: Path | None = None |
| if save_artifacts: |
| run_id = datetime.now().strftime("%Y%m%d-%H%M%S") + "-" + uuid.uuid4().hex[:8] |
| run_dir = upload_root / ANALYSIS_DIR / run_id |
| run_dir.mkdir(parents=True, exist_ok=True) |
|
|
| student_data, _ = processor.process_video_from_path(str(student_video_path)) |
| if not student_data: |
| payload = { |
| "error": "Không nhận diện được tư thế hít đất trong video. Vui lòng quay rõ toàn thân, đủ sáng và thử lại.", |
| "exercise": "pushup", |
| "exercise_label": "Hít đất / Push-up", |
| "template_id": selected_template_id, |
| "template_title": selected_template_title, |
| "student_video_path": _safe_relative_path(student_video_path, upload_root), |
| } |
| if save_artifacts and run_dir is not None: |
| with (run_dir / "result.json").open("w", encoding="utf-8") as file: |
| json.dump(payload, file, ensure_ascii=False, indent=2) |
| _write_video_test_run_log( |
| payload=payload, |
| run_id=run_id, |
| run_dir=run_dir, |
| student_video_path=student_video_path, |
| template_video_path=template_video_path, |
| ) |
| return payload |
|
|
| result = evaluator.evaluate(expert_data, student_data) |
| if result.get("error"): |
| payload = { |
| "error": result["error"], |
| "exercise": "pushup", |
| "exercise_label": "Hít đất / Push-up", |
| "template_id": selected_template_id, |
| "template_title": selected_template_title, |
| "student_video_path": _safe_relative_path(student_video_path, upload_root), |
| } |
| if save_artifacts and run_dir is not None: |
| with (run_dir / "result.json").open("w", encoding="utf-8") as file: |
| json.dump(payload, file, ensure_ascii=False, indent=2) |
| _write_video_test_run_log( |
| payload=payload, |
| run_id=run_id, |
| run_dir=run_dir, |
| student_video_path=student_video_path, |
| template_video_path=template_video_path, |
| ) |
| return payload |
|
|
| rep_cards = [] |
| error_counter: Counter[str] = Counter() |
| high_severity_reps = 0 |
| good_reps = 0 |
|
|
| for rep in result["rep_results"]: |
| rep_errors = [_serialize_error(error) for error in rep["errors"]] |
| student_frame = _select_student_frame_for_rep(rep, student_data) |
| expert_frame = expert_data[rep["w_pair"][1]] |
|
|
| student_rel = "" |
| expert_rel = "" |
| llm_visual = _empty_rep_visual_feedback() |
|
|
| if save_artifacts: |
| student_artifact = ANALYSIS_DIR / run_id / f"student_rep_{rep['rep_num']}.jpg" |
| expert_artifact = ANALYSIS_DIR / run_id / f"expert_rep_{rep['rep_num']}.jpg" |
| student_abs = upload_root / student_artifact |
| expert_abs = upload_root / expert_artifact |
| student_landmarks = _save_pose_frame( |
| processor=processor, |
| video_path=student_video_path, |
| frame_idx=student_frame["frame_idx"], |
| destination=student_abs, |
| flip=student_frame.get("flipped", False), |
| ) |
| _save_pose_frame( |
| processor=processor, |
| video_path=template_video_path, |
| frame_idx=expert_frame["frame_idx"], |
| destination=expert_abs, |
| flip=expert_frame.get("flipped", False), |
| ) |
| if _should_request_rep_visual_feedback(rep, rep_errors): |
| llm_visual = generate_rep_visual_feedback( |
| student_image_path=str(student_abs), |
| expert_image_path=str(expert_abs), |
| rep_context=_build_rep_visual_context(rep, rep_errors), |
| ) |
| rule_arrow = _build_rule_arrow(rep_errors, student_landmarks) |
| if rule_arrow: |
| llm_visual["arrow"] = rule_arrow |
| _draw_llm_arrow_on_image(student_abs, rule_arrow) |
| elif llm_visual.get("arrow"): |
| _draw_llm_arrow_on_image(student_abs, llm_visual["arrow"]) |
| student_rel = str(student_artifact) |
| expert_rel = str(expert_artifact) |
|
|
| if rep["score"] >= 0.88 and not rep_errors: |
| good_reps += 1 |
| if any(error["severity"] == "high" for error in rep_errors): |
| high_severity_reps += 1 |
| error_counter.update(error["type"] for error in rep_errors) |
|
|
| rep_cards.append( |
| { |
| "rep_num": int(rep["rep_num"]), |
| "score": round(float(rep["score"]), 4), |
| "score_pct": round(float(rep["score"]) * 100, 1), |
| "status": _rep_status_label(float(rep["score"]), rep_errors), |
| "status_tone": _rep_status_tone(float(rep["score"]), rep_errors), |
| "errors": rep_errors, |
| "error_labels": [error["label"] for error in rep_errors], |
| "primary_error": rep_errors[0]["label"] if rep_errors else "Không có lỗi nghiêm trọng", |
| "feedback": _rep_feedback(rep_errors), |
| "rule_feedback": _rep_feedback(rep_errors), |
| "llm_feedback": llm_visual.get("feedback", ""), |
| "llm_feedback_source": llm_visual.get("source", ""), |
| "llm_feedback_error": llm_visual.get("error", ""), |
| "llm_visual_error_label": llm_visual.get("visual_error_label", ""), |
| "llm_arrow": llm_visual.get("arrow"), |
| "student_frame_path": student_rel, |
| "expert_frame_path": expert_rel, |
| "rule_score_pct": round(float(rep["rule_score"]) * 100, 1), |
| "dtw_score_pct": round(float(rep["dtw_score"]) * 100, 1), |
| } |
| ) |
|
|
| summary = _build_summary( |
| overall_score=float(result["overall_score"]), |
| rep_results=rep_cards, |
| total_reps=int(result["st_reps_count"]), |
| target_reps=int(result["ex_reps_count"]), |
| main_errors=error_counter, |
| ) |
|
|
| main_errors = [ |
| { |
| "type": error_type, |
| "label": ERROR_LABELS.get(error_type, error_type), |
| "count": count, |
| "severity": _highest_severity(rep_cards, error_type), |
| "guidance": ERROR_GUIDANCE.get(error_type, "Ưu tiên sửa lỗi này trước trong các rep tiếp theo."), |
| } |
| for error_type, count in error_counter.most_common() |
| ] |
|
|
| payload = { |
| "error": None, |
| "exercise": "pushup", |
| "exercise_label": "Hít đất / Push-up", |
| "template_id": selected_template_id, |
| "template_title": selected_template_title, |
| "overall_score": round(float(result["overall_score"]), 4), |
| "overall_score_pct": round(float(result["overall_score"]) * 100, 1), |
| "student_reps": int(result["st_reps_count"]), |
| "expert_reps": int(result["ex_reps_count"]), |
| "good_reps": good_reps, |
| "serious_reps": high_severity_reps, |
| "summary": summary, |
| "main_errors": main_errors, |
| "rep_results": rep_cards, |
| "student_video_path": "", |
| } |
| payload["coach_feedback"] = "" |
|
|
| if save_artifacts and run_dir is not None: |
| payload["student_video_path"] = _safe_relative_path(student_video_path, upload_root) |
| with (run_dir / "result.json").open("w", encoding="utf-8") as file: |
| json.dump(payload, file, ensure_ascii=False, indent=2) |
| _write_video_test_run_log( |
| payload=payload, |
| run_id=run_id, |
| run_dir=run_dir, |
| student_video_path=student_video_path, |
| template_video_path=template_video_path, |
| ) |
|
|
| return payload |
|
|
|
|
| def _template_cache_path(template_id: str | None = None) -> Path: |
| normalized_id = safe_template_id(template_id or DEFAULT_TEMPLATE_ID) |
| if normalized_id == DEFAULT_TEMPLATE_ID: |
| return TEMPLATE_CACHE |
| return BASE_DIR / "data" / "processed" / f"pushup_template_{normalized_id}.pkl" |
|
|
|
|
| def _write_video_test_run_log( |
| *, |
| payload: dict, |
| run_id: str, |
| run_dir: Path, |
| student_video_path: Path, |
| template_video_path: Path, |
| ) -> None: |
| """Append a lightweight per-video test log without copying image artifacts.""" |
| try: |
| index_entry = _build_video_test_index_entry( |
| payload=payload, |
| run_id=run_id, |
| run_dir=run_dir, |
| student_video_path=student_video_path, |
| template_video_path=template_video_path, |
| ) |
| VIDEO_TEST_LOG_FILE.parent.mkdir(parents=True, exist_ok=True) |
| with VIDEO_TEST_LOG_FILE.open("a", encoding="utf-8") as file: |
| file.write(json.dumps(index_entry, ensure_ascii=False) + "\n") |
| except OSError as exc: |
| print(f"Video test logging failed: {exc}") |
|
|
|
|
| def _build_video_test_index_entry( |
| *, |
| payload: dict, |
| run_id: str, |
| run_dir: Path, |
| student_video_path: Path, |
| template_video_path: Path, |
| ) -> dict: |
| rep_results = payload.get("rep_results", []) |
| created_at = datetime.now().isoformat(timespec="seconds") |
| return { |
| "ts": created_at, |
| "run_id": run_id, |
| "exercise": payload.get("exercise", "pushup"), |
| "student_video_name": student_video_path.name, |
| "student_video_path": student_video_path.as_posix(), |
| "template_video_path": template_video_path.as_posix(), |
| "overall_score_pct": payload.get("overall_score_pct"), |
| "student_reps": payload.get("student_reps"), |
| "expert_reps": payload.get("expert_reps"), |
| "error": payload.get("error"), |
| "summary": payload.get("summary", ""), |
| "main_errors": payload.get("main_errors", []), |
| "result_json": _project_relative_path(run_dir / "result.json"), |
| "artifact_dir": _project_relative_path(run_dir), |
| "rep_logs": [ |
| { |
| "rep_num": rep.get("rep_num"), |
| "score_pct": rep.get("score_pct"), |
| "primary_error": rep.get("primary_error"), |
| "rule_feedback": rep.get("rule_feedback"), |
| "llm_feedback": rep.get("llm_feedback"), |
| "llm_feedback_source": rep.get("llm_feedback_source"), |
| "llm_feedback_error": rep.get("llm_feedback_error"), |
| "student_frame_path": rep.get("student_frame_path", ""), |
| "expert_frame_path": rep.get("expert_frame_path", ""), |
| } |
| for rep in rep_results |
| ], |
| } |
|
|
|
|
| def _save_pose_frame( |
| processor: VideoProcessor, |
| video_path: Path, |
| frame_idx: int, |
| destination: Path, |
| flip: bool, |
| ) -> list[dict]: |
| destination.parent.mkdir(parents=True, exist_ok=True) |
| frame = processor.get_frame(str(video_path), frame_idx, flip=flip) |
| if frame is None: |
| return [] |
|
|
| frame = _remove_source_status_badge(frame) |
| _, landmarks = processor.engine.extract_kinematics(frame.copy(), is_static=True) |
| landmark_points = _landmarks_to_points(landmarks) if landmarks else [] |
| if landmarks: |
| mp.solutions.drawing_utils.draw_landmarks( |
| frame, |
| landmarks, |
| mp.solutions.pose.POSE_CONNECTIONS, |
| ) |
| cv2.imwrite(str(destination), frame) |
| return landmark_points |
|
|
|
|
| def _remove_source_status_badge(frame): |
| """Remove pre-existing check/cross badges baked into some test videos. |
| |
| Some source clips already contain a large white square with a green check or |
| red cross near the top of the frame. Those source labels can contradict the |
| current rule-based arrow, so strip them before drawing our own annotations. |
| """ |
| height, width = frame.shape[:2] |
| search_y_max = int(height * 0.45) |
| if search_y_max <= 0: |
| return frame |
|
|
| search = frame[:search_y_max] |
| white_mask = cv2.inRange(search, np.array([235, 235, 235]), np.array([255, 255, 255])) |
| contours, _ = cv2.findContours(white_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) |
| if not contours: |
| return frame |
|
|
| hsv = cv2.cvtColor(search, cv2.COLOR_BGR2HSV) |
| green_mask = cv2.inRange(hsv, np.array([35, 60, 60]), np.array([90, 255, 255])) |
| red_mask_low = cv2.inRange(hsv, np.array([0, 70, 60]), np.array([12, 255, 255])) |
| red_mask_high = cv2.inRange(hsv, np.array([170, 70, 60]), np.array([180, 255, 255])) |
| symbol_mask = cv2.bitwise_or(green_mask, cv2.bitwise_or(red_mask_low, red_mask_high)) |
|
|
| cleanup_mask = np.zeros((height, width), dtype=np.uint8) |
| min_area = max(900, int(width * height * 0.0004)) |
| max_area = int(width * height * 0.04) |
| for contour in contours: |
| x, y, box_w, box_h = cv2.boundingRect(contour) |
| area = box_w * box_h |
| if area < min_area or area > max_area: |
| continue |
| aspect = box_w / box_h if box_h else 0 |
| if not 0.55 <= aspect <= 1.8: |
| continue |
|
|
| pad_x = max(12, int(box_w * 0.12)) |
| pad_y = max(12, int(box_h * 0.12)) |
| x1 = _clamp(x - pad_x, 0, width - 1) |
| y1 = _clamp(y - pad_y, 0, height - 1) |
| x2 = _clamp(x + box_w + pad_x, 0, width) |
| y2 = _clamp(y + box_h + pad_y, 0, search_y_max) |
| if x2 <= x1 or y2 <= y1: |
| continue |
|
|
| symbol_pixels = cv2.countNonZero(symbol_mask[y1:y2, x1:x2]) |
| if symbol_pixels < max(30, int(area * 0.01)): |
| continue |
| cleanup_mask[y1:y2, x1:x2] = 255 |
|
|
| if not cv2.countNonZero(cleanup_mask): |
| return frame |
|
|
| return cv2.inpaint(frame, cleanup_mask, 5, cv2.INPAINT_TELEA) |
|
|
|
|
| def _landmarks_to_points(landmarks) -> list[dict]: |
| return [ |
| { |
| "x": float(landmark.x), |
| "y": float(landmark.y), |
| "visibility": float(getattr(landmark, "visibility", 1.0)), |
| } |
| for landmark in landmarks.landmark |
| ] |
|
|
|
|
| def _select_student_frame_for_rep(rep: dict, student_data: list[dict]) -> dict: |
| if not student_data: |
| raise ValueError("No student pose frames available for rep frame selection") |
|
|
| error_frame_indices = [] |
| for error in rep.get("errors", []): |
| error_frame_indices.extend(error.get("frames", [])) |
|
|
| if not error_frame_indices: |
| fallback_index = _safe_frame_index(rep.get("w_pair", (0, 0))[0], len(student_data)) |
| return student_data[fallback_index] |
|
|
| target_frame = error_frame_indices[0] |
| start, end = rep.get("range", (0, len(student_data))) |
| candidates = student_data[start:end] or student_data |
| return min(candidates, key=lambda frame: abs(frame["frame_idx"] - target_frame)) |
|
|
|
|
| def _safe_frame_index(index: object, length: int) -> int: |
| try: |
| parsed = int(index) |
| except (TypeError, ValueError): |
| parsed = 0 |
| return _clamp(parsed, 0, length - 1) |
|
|
|
|
| def _should_request_rep_visual_feedback(rep: dict, rep_errors: list[dict]) -> bool: |
| return bool(rep_errors) |
|
|
|
|
| def _empty_rep_visual_feedback() -> dict: |
| return { |
| "source": "", |
| "is_error": False, |
| "visual_error_label": "", |
| "feedback": "", |
| "arrow": None, |
| } |
|
|
|
|
| def _build_rep_visual_context(rep: dict, rep_errors: list[dict]) -> dict: |
| return { |
| "rep_num": int(rep["rep_num"]), |
| "score_pct": round(float(rep["score"]) * 100, 1), |
| "rule_score_pct": round(float(rep["rule_score"]) * 100, 1), |
| "dtw_score_pct": round(float(rep["dtw_score"]) * 100, 1), |
| "rule_errors": [ |
| { |
| "type": error["type"], |
| "label": error["label"], |
| "severity": error["severity"], |
| "guidance": error["guidance"], |
| } |
| for error in rep_errors |
| ], |
| "rule_feedback": _rep_feedback(rep_errors), |
| } |
|
|
|
|
| def _build_rule_arrow(rep_errors: list[dict], landmark_points: list[dict]) -> dict | None: |
| if not rep_errors or not landmark_points: |
| return None |
|
|
| for error in rep_errors: |
| error_type = error.get("type", "") |
| target = _arrow_target_for_error(error_type, landmark_points) |
| if target: |
| return { |
| "x": target["x"], |
| "y": target["y"], |
| "label": ARROW_LABELS.get(error_type, error.get("label") or "Vị trí cần sửa"), |
| "source": "rule_landmark", |
| "error_type": error_type, |
| } |
| return None |
|
|
|
|
| def _arrow_target_for_error(error_type: str, landmark_points: list[dict]) -> dict | None: |
| if error_type in {"hip_sag", "hip_pike", "body_not_straight"}: |
| return ( |
| _average_pose_points(landmark_points, [23, 24]) |
| or _average_pose_points(landmark_points, [11, 12, 23, 24]) |
| ) |
|
|
| if error_type == "not_deep_enough": |
| shoulder = _average_pose_points(landmark_points, [11, 12]) |
| hip = _average_pose_points(landmark_points, [23, 24]) |
| if shoulder and hip: |
| return _interpolate_pose_points(shoulder, hip, 0.30) |
| return shoulder or _average_pose_points(landmark_points, [13, 14]) |
|
|
| if error_type == "head_misaligned": |
| return ( |
| _average_pose_points(landmark_points, [0, 7, 8]) |
| or _average_pose_points(landmark_points, [0, 11, 12]) |
| ) |
|
|
| return _average_pose_points(landmark_points, [11, 12, 23, 24]) |
|
|
|
|
| def _average_pose_points(landmark_points: list[dict], indices: list[int]) -> dict | None: |
| valid_points = [ |
| point |
| for index in indices |
| if (point := _pose_point(landmark_points, index)) is not None |
| and point["visibility"] >= 0.10 |
| ] |
| if not valid_points: |
| valid_points = [ |
| point |
| for index in indices |
| if (point := _pose_point(landmark_points, index)) is not None |
| ] |
| if not valid_points: |
| return None |
|
|
| return { |
| "x": _clamp_float(sum(point["x"] for point in valid_points) / len(valid_points), 0.02, 0.98), |
| "y": _clamp_float(sum(point["y"] for point in valid_points) / len(valid_points), 0.02, 0.98), |
| } |
|
|
|
|
| def _pose_point(landmark_points: list[dict], index: int) -> dict | None: |
| if index >= len(landmark_points): |
| return None |
|
|
| point = landmark_points[index] |
| x = float(point.get("x", -1.0)) |
| y = float(point.get("y", -1.0)) |
| if not np.isfinite(x) or not np.isfinite(y) or x < -0.05 or x > 1.05 or y < -0.05 or y > 1.05: |
| return None |
| return { |
| "x": _clamp_float(x, 0.02, 0.98), |
| "y": _clamp_float(y, 0.02, 0.98), |
| "visibility": float(point.get("visibility", 1.0)), |
| } |
|
|
|
|
| def _interpolate_pose_points(start: dict, end: dict, ratio: float) -> dict: |
| ratio = _clamp_float(ratio, 0.0, 1.0) |
| return { |
| "x": _clamp_float(start["x"] + (end["x"] - start["x"]) * ratio, 0.02, 0.98), |
| "y": _clamp_float(start["y"] + (end["y"] - start["y"]) * ratio, 0.02, 0.98), |
| } |
|
|
|
|
| def _clamp_float(value: float, lower: float, upper: float) -> float: |
| return max(lower, min(value, upper)) |
|
|
|
|
| def _draw_llm_arrow_on_image(image_path: Path, arrow: dict) -> None: |
| frame = cv2.imread(str(image_path)) |
| if frame is None: |
| return |
|
|
| height, width = frame.shape[:2] |
| target_x = int(float(arrow["x"]) * width) |
| target_y = int(float(arrow["y"]) * height) |
| target_x = _clamp(target_x, 0, width - 1) |
| target_y = _clamp(target_y, 0, height - 1) |
| start_x, start_y = _arrow_start_point(target_x, target_y, width, height) |
| label = str(arrow.get("label") or "Vị trí cần sửa") |
|
|
| cv2.arrowedLine( |
| frame, |
| (start_x, start_y), |
| (target_x, target_y), |
| (0, 0, 255), |
| 5, |
| tipLength=0.22, |
| ) |
| cv2.circle(frame, (target_x, target_y), 22, (0, 0, 255), 5) |
| _draw_label(frame, label, start_x, start_y - 42) |
| cv2.imwrite(str(image_path), frame) |
|
|
|
|
| def _arrow_start_point(target_x: int, target_y: int, width: int, height: int) -> tuple[int, int]: |
| x_offset = _clamp(int(width * 0.24), 150, 240) |
| y_offset = _clamp(int(height * 0.20), 90, 150) |
|
|
| start_x = target_x - x_offset |
| if start_x < 20: |
| start_x = target_x + x_offset |
|
|
| start_y = target_y - y_offset |
| if start_y < 48: |
| start_y = target_y + y_offset |
|
|
| return ( |
| _clamp(start_x, 20, width - 40), |
| _clamp(start_y, 48, height - 32), |
| ) |
|
|
|
|
| def _draw_label(frame, label: str, x: int, y: int) -> None: |
| height, width = frame.shape[:2] |
| try: |
| from PIL import Image, ImageDraw, ImageFont |
| except ImportError: |
| return |
|
|
| font = _load_label_font(ImageFont, _clamp(width // 44, 18, 26)) |
| image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) |
| draw = ImageDraw.Draw(image) |
| text_bbox = draw.textbbox((0, 0), label, font=font) |
| text_w = text_bbox[2] - text_bbox[0] |
| text_h = text_bbox[3] - text_bbox[1] |
| padding_x = 9 |
| padding_y = 7 |
| x = _clamp(x, 8, width - text_w - padding_x * 2 - 8) |
| y = _clamp(y, 8, height - text_h - padding_y * 2 - 8) |
|
|
| draw.rounded_rectangle( |
| (x, y, x + text_w + padding_x * 2, y + text_h + padding_y * 2), |
| radius=4, |
| fill=(220, 0, 0), |
| ) |
| draw.text((x + padding_x, y + padding_y), label, font=font, fill=(255, 255, 255)) |
| frame[:, :] = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) |
|
|
|
|
| def _load_label_font(image_font, size: int): |
| font_candidates = [ |
| Path("C:/Windows/Fonts/arial.ttf"), |
| Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"), |
| Path("/System/Library/Fonts/Supplemental/Arial Unicode.ttf"), |
| ] |
| for font_path in font_candidates: |
| if font_path.exists(): |
| return image_font.truetype(str(font_path), size=size) |
| return image_font.load_default() |
|
|
|
|
| def _clamp(value: int, lower: int, upper: int) -> int: |
| if upper < lower: |
| return lower |
| return max(lower, min(value, upper)) |
|
|
|
|
| def _serialize_error(error: dict) -> dict: |
| return { |
| "type": error["type"], |
| "label": ERROR_LABELS.get(error["type"], error["message"]), |
| "message": error["message"], |
| "severity": error["severity"], |
| "guidance": ERROR_GUIDANCE.get(error["type"], "Giữ nhịp ổn định và kiểm soát tốt hơn."), |
| } |
|
|
|
|
| def _rep_status_label(score: float, errors: list[dict]) -> str: |
| if not errors and score >= 0.88: |
| return "Tốt" |
| if score >= 0.75: |
| return "Cần chú ý" |
| return "Có lỗi" |
|
|
|
|
| def _rep_status_tone(score: float, errors: list[dict]) -> str: |
| if not errors and score >= 0.88: |
| return "good" |
| if score >= 0.75: |
| return "warning" |
| return "danger" |
|
|
|
|
| def _rep_feedback(errors: list[dict]) -> str: |
| if not errors: |
| return "Rep ổn định, thân người được giữ khá tốt và không có lỗi nghiêm trọng." |
| suggestions = [] |
| for error in errors: |
| suggestion = error["guidance"] |
| if suggestion not in suggestions: |
| suggestions.append(suggestion) |
| return " ".join(suggestions[:2]) |
|
|
|
|
| def _highest_severity(rep_results: list[dict], error_type: str) -> str: |
| highest = "low" |
| for rep in rep_results: |
| for error in rep["errors"]: |
| if error["type"] == error_type and SEVERITY_RANK[error["severity"]] > SEVERITY_RANK[highest]: |
| highest = error["severity"] |
| return highest |
|
|
|
|
| def _build_summary( |
| overall_score: float, |
| rep_results: list[dict], |
| total_reps: int, |
| target_reps: int, |
| main_errors: Counter[str], |
| ) -> str: |
| if not rep_results: |
| return "Chưa có rep hợp lệ để phân tích." |
|
|
| trailing_drop = rep_results[-1]["score"] < rep_results[0]["score"] - 0.1 |
| strongest_rep = max(rep_results, key=lambda rep: rep["score"]) |
|
|
| segments = [f"Bạn hoàn thành {total_reps} rep"] |
| if total_reps < target_reps: |
| segments[0] += f", thấp hơn mục tiêu {target_reps} rep." |
| else: |
| segments[0] += "." |
|
|
| if overall_score >= 0.88: |
| segments.append("Kỹ thuật tổng thể ổn định và khá gần với mẫu chuẩn.") |
| elif overall_score >= 0.75: |
| segments.append("Kỹ thuật tương đối tốt nhưng vẫn có vài rep cần chỉnh lại form.") |
| else: |
| segments.append("Form chưa ổn định, nên ưu tiên sửa các lỗi chính trước khi tăng số rep.") |
|
|
| if main_errors: |
| top_error, count = main_errors.most_common(1)[0] |
| segments.append(f"Lỗi xuất hiện nhiều nhất là {ERROR_LABELS.get(top_error, top_error).lower()} ở {count} rep.") |
|
|
| if trailing_drop: |
| segments.append("Các rep cuối có dấu hiệu giảm chất lượng, khả năng cao do mất ổn định hoặc xuống sức.") |
| else: |
| segments.append(f"Rep tốt nhất là rep {strongest_rep['rep_num']} với điểm {strongest_rep['score_pct']:.1f}%.") |
|
|
| return " ".join(segments) |
|
|
|
|
| def _safe_relative_path(path: Path, base: Path) -> str: |
| try: |
| return path.relative_to(base).as_posix() |
| except ValueError: |
| return path.as_posix() |
|
|
|
|
| def _project_relative_path(path: Path) -> str: |
| try: |
| return path.relative_to(BASE_DIR).as_posix() |
| except ValueError: |
| return path.as_posix() |
|
|