from __future__ import annotations from collections import Counter from datetime import datetime from pathlib import Path import json import pickle import uuid import cv2 import mediapipe as mp import numpy as np from push_up.evaluator import PushUpEvaluator from push_up.feedback_graph import generate_rep_visual_feedback from push_up.processor import VideoProcessor from push_up.template_registry import ( DEFAULT_TEMPLATE_ID, get_template, resolve_template_video_path, safe_template_id, ) BASE_DIR = Path(__file__).resolve().parent.parent TEMPLATE_SOURCE = BASE_DIR / "data" / "templates" / "push_up_template.mp4" TEMPLATE_CACHE = BASE_DIR / "data" / "processed" / "pushup_template.pkl" DEBUG_ARTIFACTS_ROOT = BASE_DIR / "analysis_artifacts" VIDEO_TEST_LOG_FILE = BASE_DIR / ".ai-log" / "video_test_runs.jsonl" ANALYSIS_DIR = Path("analysis") ERROR_GUIDANCE = { "not_deep_enough": "Hạ ngực thấp hơn ở pha xuống và giữ nhịp kiểm soát thay vì rơi tự do.", "hip_sag": "Siết bụng và mông để giữ thân người thành một đường thẳng từ vai đến gót chân.", "body_not_straight": "Ổn định vai, hông và gót trên cùng một trục trước khi bắt đầu rep tiếp theo.", "head_misaligned": "Giữ cổ trung tính, mắt nhìn xuống sàn phía trước tay một khoảng ngắn.", "hip_pike": "Tránh đẩy hông lên cao, hãy giữ vai và hông di chuyển cùng nhau.", } ERROR_LABELS = { "not_deep_enough": "Chưa hạ người đủ sâu", "hip_sag": "Võng lưng", "body_not_straight": "Cơ thể chưa giữ thẳng", "head_misaligned": "Gập cổ hoặc cúi đầu quá mức", "hip_pike": "Nhô mông quá cao", } ARROW_LABELS = { "not_deep_enough": "Ngực chưa đủ thấp", "hip_sag": "Hông bị võng xuống", "body_not_straight": "Hông lệch khỏi trục", "head_misaligned": "Cổ cúi quá mức", "hip_pike": "Hông quá cao", } SEVERITY_RANK = {"high": 3, "medium": 2, "low": 1} def template_cache_exists(template_id: str | None = None) -> bool: return _template_cache_path(template_id).exists() def prepare_template_cache(template_id: str | None = None, force: bool = False) -> dict: template = get_template(template_id) template_source = resolve_template_video_path(template) template_cache = _template_cache_path(template["id"]) template_cache.parent.mkdir(parents=True, exist_ok=True) if template_cache.exists() and not force: with template_cache.open("rb") as file: cache_payload = pickle.load(file) if Path(cache_payload.get("source", "")).exists(): return cache_payload # The cache can be committed or moved across machines. Keep the # expensive landmark cache, but refresh the source video path so later # frame extraction does not point at another developer's absolute path. cache_payload["source"] = str(template_source) cache_payload["template_id"] = template["id"] cache_payload["template_title"] = template["title"] with template_cache.open("wb") as file: pickle.dump(cache_payload, file) return cache_payload processor = VideoProcessor() template_data, _ = processor.process_video_from_path(str(template_source)) evaluator = PushUpEvaluator() evaluation = evaluator.evaluate(template_data, template_data) cache_payload = { "source": str(template_source), "template_id": template["id"], "template_title": template["title"], "prepared_at": datetime.utcnow().isoformat(), "template_data": template_data, "template_rep_count": evaluation["ex_reps_count"] if not evaluation.get("error") else 0, } with template_cache.open("wb") as file: pickle.dump(cache_payload, file) return cache_payload def analyze_pushup( student_video_path: str | Path, upload_root: str | Path | None = None, *, save_artifacts: bool = False, template_id: str | None = None, ) -> dict: """Analyze one student video. By default this does not persist student videos, per-run JSON, or frame images. Set save_artifacts=True only for debugging/offline inspection. """ upload_root = Path(upload_root) if upload_root is not None else DEBUG_ARTIFACTS_ROOT student_video_path = Path(student_video_path) cache_payload = prepare_template_cache(template_id) processor = VideoProcessor() evaluator = PushUpEvaluator() expert_data = cache_payload["template_data"] template_video_path = Path(cache_payload.get("source") or TEMPLATE_SOURCE) if not template_video_path.exists(): template_video_path = TEMPLATE_SOURCE selected_template_id = str(cache_payload.get("template_id") or DEFAULT_TEMPLATE_ID) selected_template_title = str(cache_payload.get("template_title") or "Hít đất chuẩn") run_id = "" run_dir: Path | None = None if save_artifacts: run_id = datetime.now().strftime("%Y%m%d-%H%M%S") + "-" + uuid.uuid4().hex[:8] run_dir = upload_root / ANALYSIS_DIR / run_id run_dir.mkdir(parents=True, exist_ok=True) student_data, _ = processor.process_video_from_path(str(student_video_path)) if not student_data: payload = { "error": "Không nhận diện được tư thế hít đất trong video. Vui lòng quay rõ toàn thân, đủ sáng và thử lại.", "exercise": "pushup", "exercise_label": "Hít đất / Push-up", "template_id": selected_template_id, "template_title": selected_template_title, "student_video_path": _safe_relative_path(student_video_path, upload_root), } if save_artifacts and run_dir is not None: with (run_dir / "result.json").open("w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, indent=2) _write_video_test_run_log( payload=payload, run_id=run_id, run_dir=run_dir, student_video_path=student_video_path, template_video_path=template_video_path, ) return payload result = evaluator.evaluate(expert_data, student_data) if result.get("error"): payload = { "error": result["error"], "exercise": "pushup", "exercise_label": "Hít đất / Push-up", "template_id": selected_template_id, "template_title": selected_template_title, "student_video_path": _safe_relative_path(student_video_path, upload_root), } if save_artifacts and run_dir is not None: with (run_dir / "result.json").open("w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, indent=2) _write_video_test_run_log( payload=payload, run_id=run_id, run_dir=run_dir, student_video_path=student_video_path, template_video_path=template_video_path, ) return payload rep_cards = [] error_counter: Counter[str] = Counter() high_severity_reps = 0 good_reps = 0 for rep in result["rep_results"]: rep_errors = [_serialize_error(error) for error in rep["errors"]] student_frame = _select_student_frame_for_rep(rep, student_data) expert_frame = expert_data[rep["w_pair"][1]] student_rel = "" expert_rel = "" llm_visual = _empty_rep_visual_feedback() if save_artifacts: student_artifact = ANALYSIS_DIR / run_id / f"student_rep_{rep['rep_num']}.jpg" expert_artifact = ANALYSIS_DIR / run_id / f"expert_rep_{rep['rep_num']}.jpg" student_abs = upload_root / student_artifact expert_abs = upload_root / expert_artifact student_landmarks = _save_pose_frame( processor=processor, video_path=student_video_path, frame_idx=student_frame["frame_idx"], destination=student_abs, flip=student_frame.get("flipped", False), ) _save_pose_frame( processor=processor, video_path=template_video_path, frame_idx=expert_frame["frame_idx"], destination=expert_abs, flip=expert_frame.get("flipped", False), ) if _should_request_rep_visual_feedback(rep, rep_errors): llm_visual = generate_rep_visual_feedback( student_image_path=str(student_abs), expert_image_path=str(expert_abs), rep_context=_build_rep_visual_context(rep, rep_errors), ) rule_arrow = _build_rule_arrow(rep_errors, student_landmarks) if rule_arrow: llm_visual["arrow"] = rule_arrow _draw_llm_arrow_on_image(student_abs, rule_arrow) elif llm_visual.get("arrow"): _draw_llm_arrow_on_image(student_abs, llm_visual["arrow"]) student_rel = str(student_artifact) expert_rel = str(expert_artifact) if rep["score"] >= 0.88 and not rep_errors: good_reps += 1 if any(error["severity"] == "high" for error in rep_errors): high_severity_reps += 1 error_counter.update(error["type"] for error in rep_errors) rep_cards.append( { "rep_num": int(rep["rep_num"]), "score": round(float(rep["score"]), 4), "score_pct": round(float(rep["score"]) * 100, 1), "status": _rep_status_label(float(rep["score"]), rep_errors), "status_tone": _rep_status_tone(float(rep["score"]), rep_errors), "errors": rep_errors, "error_labels": [error["label"] for error in rep_errors], "primary_error": rep_errors[0]["label"] if rep_errors else "Không có lỗi nghiêm trọng", "feedback": _rep_feedback(rep_errors), "rule_feedback": _rep_feedback(rep_errors), "llm_feedback": llm_visual.get("feedback", ""), "llm_feedback_source": llm_visual.get("source", ""), "llm_feedback_error": llm_visual.get("error", ""), "llm_visual_error_label": llm_visual.get("visual_error_label", ""), "llm_arrow": llm_visual.get("arrow"), "student_frame_path": student_rel, "expert_frame_path": expert_rel, "rule_score_pct": round(float(rep["rule_score"]) * 100, 1), "dtw_score_pct": round(float(rep["dtw_score"]) * 100, 1), } ) summary = _build_summary( overall_score=float(result["overall_score"]), rep_results=rep_cards, total_reps=int(result["st_reps_count"]), target_reps=int(result["ex_reps_count"]), main_errors=error_counter, ) main_errors = [ { "type": error_type, "label": ERROR_LABELS.get(error_type, error_type), "count": count, "severity": _highest_severity(rep_cards, error_type), "guidance": ERROR_GUIDANCE.get(error_type, "Ưu tiên sửa lỗi này trước trong các rep tiếp theo."), } for error_type, count in error_counter.most_common() ] payload = { "error": None, "exercise": "pushup", "exercise_label": "Hít đất / Push-up", "template_id": selected_template_id, "template_title": selected_template_title, "overall_score": round(float(result["overall_score"]), 4), "overall_score_pct": round(float(result["overall_score"]) * 100, 1), "student_reps": int(result["st_reps_count"]), "expert_reps": int(result["ex_reps_count"]), "good_reps": good_reps, "serious_reps": high_severity_reps, "summary": summary, "main_errors": main_errors, "rep_results": rep_cards, "student_video_path": "", } payload["coach_feedback"] = "" if save_artifacts and run_dir is not None: payload["student_video_path"] = _safe_relative_path(student_video_path, upload_root) with (run_dir / "result.json").open("w", encoding="utf-8") as file: json.dump(payload, file, ensure_ascii=False, indent=2) _write_video_test_run_log( payload=payload, run_id=run_id, run_dir=run_dir, student_video_path=student_video_path, template_video_path=template_video_path, ) return payload def _template_cache_path(template_id: str | None = None) -> Path: normalized_id = safe_template_id(template_id or DEFAULT_TEMPLATE_ID) if normalized_id == DEFAULT_TEMPLATE_ID: return TEMPLATE_CACHE return BASE_DIR / "data" / "processed" / f"pushup_template_{normalized_id}.pkl" def _write_video_test_run_log( *, payload: dict, run_id: str, run_dir: Path, student_video_path: Path, template_video_path: Path, ) -> None: """Append a lightweight per-video test log without copying image artifacts.""" try: index_entry = _build_video_test_index_entry( payload=payload, run_id=run_id, run_dir=run_dir, student_video_path=student_video_path, template_video_path=template_video_path, ) VIDEO_TEST_LOG_FILE.parent.mkdir(parents=True, exist_ok=True) with VIDEO_TEST_LOG_FILE.open("a", encoding="utf-8") as file: file.write(json.dumps(index_entry, ensure_ascii=False) + "\n") except OSError as exc: print(f"Video test logging failed: {exc}") def _build_video_test_index_entry( *, payload: dict, run_id: str, run_dir: Path, student_video_path: Path, template_video_path: Path, ) -> dict: rep_results = payload.get("rep_results", []) created_at = datetime.now().isoformat(timespec="seconds") return { "ts": created_at, "run_id": run_id, "exercise": payload.get("exercise", "pushup"), "student_video_name": student_video_path.name, "student_video_path": student_video_path.as_posix(), "template_video_path": template_video_path.as_posix(), "overall_score_pct": payload.get("overall_score_pct"), "student_reps": payload.get("student_reps"), "expert_reps": payload.get("expert_reps"), "error": payload.get("error"), "summary": payload.get("summary", ""), "main_errors": payload.get("main_errors", []), "result_json": _project_relative_path(run_dir / "result.json"), "artifact_dir": _project_relative_path(run_dir), "rep_logs": [ { "rep_num": rep.get("rep_num"), "score_pct": rep.get("score_pct"), "primary_error": rep.get("primary_error"), "rule_feedback": rep.get("rule_feedback"), "llm_feedback": rep.get("llm_feedback"), "llm_feedback_source": rep.get("llm_feedback_source"), "llm_feedback_error": rep.get("llm_feedback_error"), "student_frame_path": rep.get("student_frame_path", ""), "expert_frame_path": rep.get("expert_frame_path", ""), } for rep in rep_results ], } def _save_pose_frame( processor: VideoProcessor, video_path: Path, frame_idx: int, destination: Path, flip: bool, ) -> list[dict]: destination.parent.mkdir(parents=True, exist_ok=True) frame = processor.get_frame(str(video_path), frame_idx, flip=flip) if frame is None: return [] frame = _remove_source_status_badge(frame) _, landmarks = processor.engine.extract_kinematics(frame.copy(), is_static=True) landmark_points = _landmarks_to_points(landmarks) if landmarks else [] if landmarks: mp.solutions.drawing_utils.draw_landmarks( frame, landmarks, mp.solutions.pose.POSE_CONNECTIONS, ) cv2.imwrite(str(destination), frame) return landmark_points def _remove_source_status_badge(frame): """Remove pre-existing check/cross badges baked into some test videos. Some source clips already contain a large white square with a green check or red cross near the top of the frame. Those source labels can contradict the current rule-based arrow, so strip them before drawing our own annotations. """ height, width = frame.shape[:2] search_y_max = int(height * 0.45) if search_y_max <= 0: return frame search = frame[:search_y_max] white_mask = cv2.inRange(search, np.array([235, 235, 235]), np.array([255, 255, 255])) contours, _ = cv2.findContours(white_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) if not contours: return frame hsv = cv2.cvtColor(search, cv2.COLOR_BGR2HSV) green_mask = cv2.inRange(hsv, np.array([35, 60, 60]), np.array([90, 255, 255])) red_mask_low = cv2.inRange(hsv, np.array([0, 70, 60]), np.array([12, 255, 255])) red_mask_high = cv2.inRange(hsv, np.array([170, 70, 60]), np.array([180, 255, 255])) symbol_mask = cv2.bitwise_or(green_mask, cv2.bitwise_or(red_mask_low, red_mask_high)) cleanup_mask = np.zeros((height, width), dtype=np.uint8) min_area = max(900, int(width * height * 0.0004)) max_area = int(width * height * 0.04) for contour in contours: x, y, box_w, box_h = cv2.boundingRect(contour) area = box_w * box_h if area < min_area or area > max_area: continue aspect = box_w / box_h if box_h else 0 if not 0.55 <= aspect <= 1.8: continue pad_x = max(12, int(box_w * 0.12)) pad_y = max(12, int(box_h * 0.12)) x1 = _clamp(x - pad_x, 0, width - 1) y1 = _clamp(y - pad_y, 0, height - 1) x2 = _clamp(x + box_w + pad_x, 0, width) y2 = _clamp(y + box_h + pad_y, 0, search_y_max) if x2 <= x1 or y2 <= y1: continue symbol_pixels = cv2.countNonZero(symbol_mask[y1:y2, x1:x2]) if symbol_pixels < max(30, int(area * 0.01)): continue cleanup_mask[y1:y2, x1:x2] = 255 if not cv2.countNonZero(cleanup_mask): return frame return cv2.inpaint(frame, cleanup_mask, 5, cv2.INPAINT_TELEA) def _landmarks_to_points(landmarks) -> list[dict]: return [ { "x": float(landmark.x), "y": float(landmark.y), "visibility": float(getattr(landmark, "visibility", 1.0)), } for landmark in landmarks.landmark ] def _select_student_frame_for_rep(rep: dict, student_data: list[dict]) -> dict: if not student_data: raise ValueError("No student pose frames available for rep frame selection") error_frame_indices = [] for error in rep.get("errors", []): error_frame_indices.extend(error.get("frames", [])) if not error_frame_indices: fallback_index = _safe_frame_index(rep.get("w_pair", (0, 0))[0], len(student_data)) return student_data[fallback_index] target_frame = error_frame_indices[0] start, end = rep.get("range", (0, len(student_data))) candidates = student_data[start:end] or student_data return min(candidates, key=lambda frame: abs(frame["frame_idx"] - target_frame)) def _safe_frame_index(index: object, length: int) -> int: try: parsed = int(index) except (TypeError, ValueError): parsed = 0 return _clamp(parsed, 0, length - 1) def _should_request_rep_visual_feedback(rep: dict, rep_errors: list[dict]) -> bool: return bool(rep_errors) def _empty_rep_visual_feedback() -> dict: return { "source": "", "is_error": False, "visual_error_label": "", "feedback": "", "arrow": None, } def _build_rep_visual_context(rep: dict, rep_errors: list[dict]) -> dict: return { "rep_num": int(rep["rep_num"]), "score_pct": round(float(rep["score"]) * 100, 1), "rule_score_pct": round(float(rep["rule_score"]) * 100, 1), "dtw_score_pct": round(float(rep["dtw_score"]) * 100, 1), "rule_errors": [ { "type": error["type"], "label": error["label"], "severity": error["severity"], "guidance": error["guidance"], } for error in rep_errors ], "rule_feedback": _rep_feedback(rep_errors), } def _build_rule_arrow(rep_errors: list[dict], landmark_points: list[dict]) -> dict | None: if not rep_errors or not landmark_points: return None for error in rep_errors: error_type = error.get("type", "") target = _arrow_target_for_error(error_type, landmark_points) if target: return { "x": target["x"], "y": target["y"], "label": ARROW_LABELS.get(error_type, error.get("label") or "Vị trí cần sửa"), "source": "rule_landmark", "error_type": error_type, } return None def _arrow_target_for_error(error_type: str, landmark_points: list[dict]) -> dict | None: if error_type in {"hip_sag", "hip_pike", "body_not_straight"}: return ( _average_pose_points(landmark_points, [23, 24]) or _average_pose_points(landmark_points, [11, 12, 23, 24]) ) if error_type == "not_deep_enough": shoulder = _average_pose_points(landmark_points, [11, 12]) hip = _average_pose_points(landmark_points, [23, 24]) if shoulder and hip: return _interpolate_pose_points(shoulder, hip, 0.30) return shoulder or _average_pose_points(landmark_points, [13, 14]) if error_type == "head_misaligned": return ( _average_pose_points(landmark_points, [0, 7, 8]) or _average_pose_points(landmark_points, [0, 11, 12]) ) return _average_pose_points(landmark_points, [11, 12, 23, 24]) def _average_pose_points(landmark_points: list[dict], indices: list[int]) -> dict | None: valid_points = [ point for index in indices if (point := _pose_point(landmark_points, index)) is not None and point["visibility"] >= 0.10 ] if not valid_points: valid_points = [ point for index in indices if (point := _pose_point(landmark_points, index)) is not None ] if not valid_points: return None return { "x": _clamp_float(sum(point["x"] for point in valid_points) / len(valid_points), 0.02, 0.98), "y": _clamp_float(sum(point["y"] for point in valid_points) / len(valid_points), 0.02, 0.98), } def _pose_point(landmark_points: list[dict], index: int) -> dict | None: if index >= len(landmark_points): return None point = landmark_points[index] x = float(point.get("x", -1.0)) y = float(point.get("y", -1.0)) if not np.isfinite(x) or not np.isfinite(y) or x < -0.05 or x > 1.05 or y < -0.05 or y > 1.05: return None return { "x": _clamp_float(x, 0.02, 0.98), "y": _clamp_float(y, 0.02, 0.98), "visibility": float(point.get("visibility", 1.0)), } def _interpolate_pose_points(start: dict, end: dict, ratio: float) -> dict: ratio = _clamp_float(ratio, 0.0, 1.0) return { "x": _clamp_float(start["x"] + (end["x"] - start["x"]) * ratio, 0.02, 0.98), "y": _clamp_float(start["y"] + (end["y"] - start["y"]) * ratio, 0.02, 0.98), } def _clamp_float(value: float, lower: float, upper: float) -> float: return max(lower, min(value, upper)) def _draw_llm_arrow_on_image(image_path: Path, arrow: dict) -> None: frame = cv2.imread(str(image_path)) if frame is None: return height, width = frame.shape[:2] target_x = int(float(arrow["x"]) * width) target_y = int(float(arrow["y"]) * height) target_x = _clamp(target_x, 0, width - 1) target_y = _clamp(target_y, 0, height - 1) start_x, start_y = _arrow_start_point(target_x, target_y, width, height) label = str(arrow.get("label") or "Vị trí cần sửa") cv2.arrowedLine( frame, (start_x, start_y), (target_x, target_y), (0, 0, 255), 5, tipLength=0.22, ) cv2.circle(frame, (target_x, target_y), 22, (0, 0, 255), 5) _draw_label(frame, label, start_x, start_y - 42) cv2.imwrite(str(image_path), frame) def _arrow_start_point(target_x: int, target_y: int, width: int, height: int) -> tuple[int, int]: x_offset = _clamp(int(width * 0.24), 150, 240) y_offset = _clamp(int(height * 0.20), 90, 150) start_x = target_x - x_offset if start_x < 20: start_x = target_x + x_offset start_y = target_y - y_offset if start_y < 48: start_y = target_y + y_offset return ( _clamp(start_x, 20, width - 40), _clamp(start_y, 48, height - 32), ) def _draw_label(frame, label: str, x: int, y: int) -> None: height, width = frame.shape[:2] try: from PIL import Image, ImageDraw, ImageFont except ImportError: return font = _load_label_font(ImageFont, _clamp(width // 44, 18, 26)) image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) draw = ImageDraw.Draw(image) text_bbox = draw.textbbox((0, 0), label, font=font) text_w = text_bbox[2] - text_bbox[0] text_h = text_bbox[3] - text_bbox[1] padding_x = 9 padding_y = 7 x = _clamp(x, 8, width - text_w - padding_x * 2 - 8) y = _clamp(y, 8, height - text_h - padding_y * 2 - 8) draw.rounded_rectangle( (x, y, x + text_w + padding_x * 2, y + text_h + padding_y * 2), radius=4, fill=(220, 0, 0), ) draw.text((x + padding_x, y + padding_y), label, font=font, fill=(255, 255, 255)) frame[:, :] = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) def _load_label_font(image_font, size: int): font_candidates = [ Path("C:/Windows/Fonts/arial.ttf"), Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"), Path("/System/Library/Fonts/Supplemental/Arial Unicode.ttf"), ] for font_path in font_candidates: if font_path.exists(): return image_font.truetype(str(font_path), size=size) return image_font.load_default() def _clamp(value: int, lower: int, upper: int) -> int: if upper < lower: return lower return max(lower, min(value, upper)) def _serialize_error(error: dict) -> dict: return { "type": error["type"], "label": ERROR_LABELS.get(error["type"], error["message"]), "message": error["message"], "severity": error["severity"], "guidance": ERROR_GUIDANCE.get(error["type"], "Giữ nhịp ổn định và kiểm soát tốt hơn."), } def _rep_status_label(score: float, errors: list[dict]) -> str: if not errors and score >= 0.88: return "Tốt" if score >= 0.75: return "Cần chú ý" return "Có lỗi" def _rep_status_tone(score: float, errors: list[dict]) -> str: if not errors and score >= 0.88: return "good" if score >= 0.75: return "warning" return "danger" def _rep_feedback(errors: list[dict]) -> str: if not errors: return "Rep ổn định, thân người được giữ khá tốt và không có lỗi nghiêm trọng." suggestions = [] for error in errors: suggestion = error["guidance"] if suggestion not in suggestions: suggestions.append(suggestion) return " ".join(suggestions[:2]) def _highest_severity(rep_results: list[dict], error_type: str) -> str: highest = "low" for rep in rep_results: for error in rep["errors"]: if error["type"] == error_type and SEVERITY_RANK[error["severity"]] > SEVERITY_RANK[highest]: highest = error["severity"] return highest def _build_summary( overall_score: float, rep_results: list[dict], total_reps: int, target_reps: int, main_errors: Counter[str], ) -> str: if not rep_results: return "Chưa có rep hợp lệ để phân tích." trailing_drop = rep_results[-1]["score"] < rep_results[0]["score"] - 0.1 strongest_rep = max(rep_results, key=lambda rep: rep["score"]) segments = [f"Bạn hoàn thành {total_reps} rep"] if total_reps < target_reps: segments[0] += f", thấp hơn mục tiêu {target_reps} rep." else: segments[0] += "." if overall_score >= 0.88: segments.append("Kỹ thuật tổng thể ổn định và khá gần với mẫu chuẩn.") elif overall_score >= 0.75: segments.append("Kỹ thuật tương đối tốt nhưng vẫn có vài rep cần chỉnh lại form.") else: segments.append("Form chưa ổn định, nên ưu tiên sửa các lỗi chính trước khi tăng số rep.") if main_errors: top_error, count = main_errors.most_common(1)[0] segments.append(f"Lỗi xuất hiện nhiều nhất là {ERROR_LABELS.get(top_error, top_error).lower()} ở {count} rep.") if trailing_drop: segments.append("Các rep cuối có dấu hiệu giảm chất lượng, khả năng cao do mất ổn định hoặc xuống sức.") else: segments.append(f"Rep tốt nhất là rep {strongest_rep['rep_num']} với điểm {strongest_rep['score_pct']:.1f}%.") return " ".join(segments) def _safe_relative_path(path: Path, base: Path) -> str: try: return path.relative_to(base).as_posix() except ValueError: return path.as_posix() def _project_relative_path(path: Path) -> str: try: return path.relative_to(BASE_DIR).as_posix() except ValueError: return path.as_posix()