AI-Coach / push_up /analysis_service.py
Hoang Duc Hung
admin-user
435b60d
from __future__ import annotations
from collections import Counter
from datetime import datetime
from pathlib import Path
import json
import pickle
import uuid
import cv2
import mediapipe as mp
import numpy as np
from push_up.evaluator import PushUpEvaluator
from push_up.feedback_graph import generate_rep_visual_feedback
from push_up.processor import VideoProcessor
from push_up.template_registry import (
DEFAULT_TEMPLATE_ID,
get_template,
resolve_template_video_path,
safe_template_id,
)
BASE_DIR = Path(__file__).resolve().parent.parent
TEMPLATE_SOURCE = BASE_DIR / "data" / "templates" / "push_up_template.mp4"
TEMPLATE_CACHE = BASE_DIR / "data" / "processed" / "pushup_template.pkl"
DEBUG_ARTIFACTS_ROOT = BASE_DIR / "analysis_artifacts"
VIDEO_TEST_LOG_FILE = BASE_DIR / ".ai-log" / "video_test_runs.jsonl"
ANALYSIS_DIR = Path("analysis")
ERROR_GUIDANCE = {
"not_deep_enough": "Hạ ngực thấp hơn ở pha xuống và giữ nhịp kiểm soát thay vì rơi tự do.",
"hip_sag": "Siết bụng và mông để giữ thân người thành một đường thẳng từ vai đến gót chân.",
"body_not_straight": "Ổn định vai, hông và gót trên cùng một trục trước khi bắt đầu rep tiếp theo.",
"head_misaligned": "Giữ cổ trung tính, mắt nhìn xuống sàn phía trước tay một khoảng ngắn.",
"hip_pike": "Tránh đẩy hông lên cao, hãy giữ vai và hông di chuyển cùng nhau.",
}
ERROR_LABELS = {
"not_deep_enough": "Chưa hạ người đủ sâu",
"hip_sag": "Võng lưng",
"body_not_straight": "Cơ thể chưa giữ thẳng",
"head_misaligned": "Gập cổ hoặc cúi đầu quá mức",
"hip_pike": "Nhô mông quá cao",
}
ARROW_LABELS = {
"not_deep_enough": "Ngực chưa đủ thấp",
"hip_sag": "Hông bị võng xuống",
"body_not_straight": "Hông lệch khỏi trục",
"head_misaligned": "Cổ cúi quá mức",
"hip_pike": "Hông quá cao",
}
SEVERITY_RANK = {"high": 3, "medium": 2, "low": 1}
def template_cache_exists(template_id: str | None = None) -> bool:
return _template_cache_path(template_id).exists()
def prepare_template_cache(template_id: str | None = None, force: bool = False) -> dict:
template = get_template(template_id)
template_source = resolve_template_video_path(template)
template_cache = _template_cache_path(template["id"])
template_cache.parent.mkdir(parents=True, exist_ok=True)
if template_cache.exists() and not force:
with template_cache.open("rb") as file:
cache_payload = pickle.load(file)
if Path(cache_payload.get("source", "")).exists():
return cache_payload
# The cache can be committed or moved across machines. Keep the
# expensive landmark cache, but refresh the source video path so later
# frame extraction does not point at another developer's absolute path.
cache_payload["source"] = str(template_source)
cache_payload["template_id"] = template["id"]
cache_payload["template_title"] = template["title"]
with template_cache.open("wb") as file:
pickle.dump(cache_payload, file)
return cache_payload
processor = VideoProcessor()
template_data, _ = processor.process_video_from_path(str(template_source))
evaluator = PushUpEvaluator()
evaluation = evaluator.evaluate(template_data, template_data)
cache_payload = {
"source": str(template_source),
"template_id": template["id"],
"template_title": template["title"],
"prepared_at": datetime.utcnow().isoformat(),
"template_data": template_data,
"template_rep_count": evaluation["ex_reps_count"] if not evaluation.get("error") else 0,
}
with template_cache.open("wb") as file:
pickle.dump(cache_payload, file)
return cache_payload
def analyze_pushup(
student_video_path: str | Path,
upload_root: str | Path | None = None,
*,
save_artifacts: bool = False,
template_id: str | None = None,
) -> dict:
"""Analyze one student video.
By default this does not persist student videos, per-run JSON, or frame images.
Set save_artifacts=True only for debugging/offline inspection.
"""
upload_root = Path(upload_root) if upload_root is not None else DEBUG_ARTIFACTS_ROOT
student_video_path = Path(student_video_path)
cache_payload = prepare_template_cache(template_id)
processor = VideoProcessor()
evaluator = PushUpEvaluator()
expert_data = cache_payload["template_data"]
template_video_path = Path(cache_payload.get("source") or TEMPLATE_SOURCE)
if not template_video_path.exists():
template_video_path = TEMPLATE_SOURCE
selected_template_id = str(cache_payload.get("template_id") or DEFAULT_TEMPLATE_ID)
selected_template_title = str(cache_payload.get("template_title") or "Hít đất chuẩn")
run_id = ""
run_dir: Path | None = None
if save_artifacts:
run_id = datetime.now().strftime("%Y%m%d-%H%M%S") + "-" + uuid.uuid4().hex[:8]
run_dir = upload_root / ANALYSIS_DIR / run_id
run_dir.mkdir(parents=True, exist_ok=True)
student_data, _ = processor.process_video_from_path(str(student_video_path))
if not student_data:
payload = {
"error": "Không nhận diện được tư thế hít đất trong video. Vui lòng quay rõ toàn thân, đủ sáng và thử lại.",
"exercise": "pushup",
"exercise_label": "Hít đất / Push-up",
"template_id": selected_template_id,
"template_title": selected_template_title,
"student_video_path": _safe_relative_path(student_video_path, upload_root),
}
if save_artifacts and run_dir is not None:
with (run_dir / "result.json").open("w", encoding="utf-8") as file:
json.dump(payload, file, ensure_ascii=False, indent=2)
_write_video_test_run_log(
payload=payload,
run_id=run_id,
run_dir=run_dir,
student_video_path=student_video_path,
template_video_path=template_video_path,
)
return payload
result = evaluator.evaluate(expert_data, student_data)
if result.get("error"):
payload = {
"error": result["error"],
"exercise": "pushup",
"exercise_label": "Hít đất / Push-up",
"template_id": selected_template_id,
"template_title": selected_template_title,
"student_video_path": _safe_relative_path(student_video_path, upload_root),
}
if save_artifacts and run_dir is not None:
with (run_dir / "result.json").open("w", encoding="utf-8") as file:
json.dump(payload, file, ensure_ascii=False, indent=2)
_write_video_test_run_log(
payload=payload,
run_id=run_id,
run_dir=run_dir,
student_video_path=student_video_path,
template_video_path=template_video_path,
)
return payload
rep_cards = []
error_counter: Counter[str] = Counter()
high_severity_reps = 0
good_reps = 0
for rep in result["rep_results"]:
rep_errors = [_serialize_error(error) for error in rep["errors"]]
student_frame = _select_student_frame_for_rep(rep, student_data)
expert_frame = expert_data[rep["w_pair"][1]]
student_rel = ""
expert_rel = ""
llm_visual = _empty_rep_visual_feedback()
if save_artifacts:
student_artifact = ANALYSIS_DIR / run_id / f"student_rep_{rep['rep_num']}.jpg"
expert_artifact = ANALYSIS_DIR / run_id / f"expert_rep_{rep['rep_num']}.jpg"
student_abs = upload_root / student_artifact
expert_abs = upload_root / expert_artifact
student_landmarks = _save_pose_frame(
processor=processor,
video_path=student_video_path,
frame_idx=student_frame["frame_idx"],
destination=student_abs,
flip=student_frame.get("flipped", False),
)
_save_pose_frame(
processor=processor,
video_path=template_video_path,
frame_idx=expert_frame["frame_idx"],
destination=expert_abs,
flip=expert_frame.get("flipped", False),
)
if _should_request_rep_visual_feedback(rep, rep_errors):
llm_visual = generate_rep_visual_feedback(
student_image_path=str(student_abs),
expert_image_path=str(expert_abs),
rep_context=_build_rep_visual_context(rep, rep_errors),
)
rule_arrow = _build_rule_arrow(rep_errors, student_landmarks)
if rule_arrow:
llm_visual["arrow"] = rule_arrow
_draw_llm_arrow_on_image(student_abs, rule_arrow)
elif llm_visual.get("arrow"):
_draw_llm_arrow_on_image(student_abs, llm_visual["arrow"])
student_rel = str(student_artifact)
expert_rel = str(expert_artifact)
if rep["score"] >= 0.88 and not rep_errors:
good_reps += 1
if any(error["severity"] == "high" for error in rep_errors):
high_severity_reps += 1
error_counter.update(error["type"] for error in rep_errors)
rep_cards.append(
{
"rep_num": int(rep["rep_num"]),
"score": round(float(rep["score"]), 4),
"score_pct": round(float(rep["score"]) * 100, 1),
"status": _rep_status_label(float(rep["score"]), rep_errors),
"status_tone": _rep_status_tone(float(rep["score"]), rep_errors),
"errors": rep_errors,
"error_labels": [error["label"] for error in rep_errors],
"primary_error": rep_errors[0]["label"] if rep_errors else "Không có lỗi nghiêm trọng",
"feedback": _rep_feedback(rep_errors),
"rule_feedback": _rep_feedback(rep_errors),
"llm_feedback": llm_visual.get("feedback", ""),
"llm_feedback_source": llm_visual.get("source", ""),
"llm_feedback_error": llm_visual.get("error", ""),
"llm_visual_error_label": llm_visual.get("visual_error_label", ""),
"llm_arrow": llm_visual.get("arrow"),
"student_frame_path": student_rel,
"expert_frame_path": expert_rel,
"rule_score_pct": round(float(rep["rule_score"]) * 100, 1),
"dtw_score_pct": round(float(rep["dtw_score"]) * 100, 1),
}
)
summary = _build_summary(
overall_score=float(result["overall_score"]),
rep_results=rep_cards,
total_reps=int(result["st_reps_count"]),
target_reps=int(result["ex_reps_count"]),
main_errors=error_counter,
)
main_errors = [
{
"type": error_type,
"label": ERROR_LABELS.get(error_type, error_type),
"count": count,
"severity": _highest_severity(rep_cards, error_type),
"guidance": ERROR_GUIDANCE.get(error_type, "Ưu tiên sửa lỗi này trước trong các rep tiếp theo."),
}
for error_type, count in error_counter.most_common()
]
payload = {
"error": None,
"exercise": "pushup",
"exercise_label": "Hít đất / Push-up",
"template_id": selected_template_id,
"template_title": selected_template_title,
"overall_score": round(float(result["overall_score"]), 4),
"overall_score_pct": round(float(result["overall_score"]) * 100, 1),
"student_reps": int(result["st_reps_count"]),
"expert_reps": int(result["ex_reps_count"]),
"good_reps": good_reps,
"serious_reps": high_severity_reps,
"summary": summary,
"main_errors": main_errors,
"rep_results": rep_cards,
"student_video_path": "",
}
payload["coach_feedback"] = ""
if save_artifacts and run_dir is not None:
payload["student_video_path"] = _safe_relative_path(student_video_path, upload_root)
with (run_dir / "result.json").open("w", encoding="utf-8") as file:
json.dump(payload, file, ensure_ascii=False, indent=2)
_write_video_test_run_log(
payload=payload,
run_id=run_id,
run_dir=run_dir,
student_video_path=student_video_path,
template_video_path=template_video_path,
)
return payload
def _template_cache_path(template_id: str | None = None) -> Path:
normalized_id = safe_template_id(template_id or DEFAULT_TEMPLATE_ID)
if normalized_id == DEFAULT_TEMPLATE_ID:
return TEMPLATE_CACHE
return BASE_DIR / "data" / "processed" / f"pushup_template_{normalized_id}.pkl"
def _write_video_test_run_log(
*,
payload: dict,
run_id: str,
run_dir: Path,
student_video_path: Path,
template_video_path: Path,
) -> None:
"""Append a lightweight per-video test log without copying image artifacts."""
try:
index_entry = _build_video_test_index_entry(
payload=payload,
run_id=run_id,
run_dir=run_dir,
student_video_path=student_video_path,
template_video_path=template_video_path,
)
VIDEO_TEST_LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
with VIDEO_TEST_LOG_FILE.open("a", encoding="utf-8") as file:
file.write(json.dumps(index_entry, ensure_ascii=False) + "\n")
except OSError as exc:
print(f"Video test logging failed: {exc}")
def _build_video_test_index_entry(
*,
payload: dict,
run_id: str,
run_dir: Path,
student_video_path: Path,
template_video_path: Path,
) -> dict:
rep_results = payload.get("rep_results", [])
created_at = datetime.now().isoformat(timespec="seconds")
return {
"ts": created_at,
"run_id": run_id,
"exercise": payload.get("exercise", "pushup"),
"student_video_name": student_video_path.name,
"student_video_path": student_video_path.as_posix(),
"template_video_path": template_video_path.as_posix(),
"overall_score_pct": payload.get("overall_score_pct"),
"student_reps": payload.get("student_reps"),
"expert_reps": payload.get("expert_reps"),
"error": payload.get("error"),
"summary": payload.get("summary", ""),
"main_errors": payload.get("main_errors", []),
"result_json": _project_relative_path(run_dir / "result.json"),
"artifact_dir": _project_relative_path(run_dir),
"rep_logs": [
{
"rep_num": rep.get("rep_num"),
"score_pct": rep.get("score_pct"),
"primary_error": rep.get("primary_error"),
"rule_feedback": rep.get("rule_feedback"),
"llm_feedback": rep.get("llm_feedback"),
"llm_feedback_source": rep.get("llm_feedback_source"),
"llm_feedback_error": rep.get("llm_feedback_error"),
"student_frame_path": rep.get("student_frame_path", ""),
"expert_frame_path": rep.get("expert_frame_path", ""),
}
for rep in rep_results
],
}
def _save_pose_frame(
processor: VideoProcessor,
video_path: Path,
frame_idx: int,
destination: Path,
flip: bool,
) -> list[dict]:
destination.parent.mkdir(parents=True, exist_ok=True)
frame = processor.get_frame(str(video_path), frame_idx, flip=flip)
if frame is None:
return []
frame = _remove_source_status_badge(frame)
_, landmarks = processor.engine.extract_kinematics(frame.copy(), is_static=True)
landmark_points = _landmarks_to_points(landmarks) if landmarks else []
if landmarks:
mp.solutions.drawing_utils.draw_landmarks(
frame,
landmarks,
mp.solutions.pose.POSE_CONNECTIONS,
)
cv2.imwrite(str(destination), frame)
return landmark_points
def _remove_source_status_badge(frame):
"""Remove pre-existing check/cross badges baked into some test videos.
Some source clips already contain a large white square with a green check or
red cross near the top of the frame. Those source labels can contradict the
current rule-based arrow, so strip them before drawing our own annotations.
"""
height, width = frame.shape[:2]
search_y_max = int(height * 0.45)
if search_y_max <= 0:
return frame
search = frame[:search_y_max]
white_mask = cv2.inRange(search, np.array([235, 235, 235]), np.array([255, 255, 255]))
contours, _ = cv2.findContours(white_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours:
return frame
hsv = cv2.cvtColor(search, cv2.COLOR_BGR2HSV)
green_mask = cv2.inRange(hsv, np.array([35, 60, 60]), np.array([90, 255, 255]))
red_mask_low = cv2.inRange(hsv, np.array([0, 70, 60]), np.array([12, 255, 255]))
red_mask_high = cv2.inRange(hsv, np.array([170, 70, 60]), np.array([180, 255, 255]))
symbol_mask = cv2.bitwise_or(green_mask, cv2.bitwise_or(red_mask_low, red_mask_high))
cleanup_mask = np.zeros((height, width), dtype=np.uint8)
min_area = max(900, int(width * height * 0.0004))
max_area = int(width * height * 0.04)
for contour in contours:
x, y, box_w, box_h = cv2.boundingRect(contour)
area = box_w * box_h
if area < min_area or area > max_area:
continue
aspect = box_w / box_h if box_h else 0
if not 0.55 <= aspect <= 1.8:
continue
pad_x = max(12, int(box_w * 0.12))
pad_y = max(12, int(box_h * 0.12))
x1 = _clamp(x - pad_x, 0, width - 1)
y1 = _clamp(y - pad_y, 0, height - 1)
x2 = _clamp(x + box_w + pad_x, 0, width)
y2 = _clamp(y + box_h + pad_y, 0, search_y_max)
if x2 <= x1 or y2 <= y1:
continue
symbol_pixels = cv2.countNonZero(symbol_mask[y1:y2, x1:x2])
if symbol_pixels < max(30, int(area * 0.01)):
continue
cleanup_mask[y1:y2, x1:x2] = 255
if not cv2.countNonZero(cleanup_mask):
return frame
return cv2.inpaint(frame, cleanup_mask, 5, cv2.INPAINT_TELEA)
def _landmarks_to_points(landmarks) -> list[dict]:
return [
{
"x": float(landmark.x),
"y": float(landmark.y),
"visibility": float(getattr(landmark, "visibility", 1.0)),
}
for landmark in landmarks.landmark
]
def _select_student_frame_for_rep(rep: dict, student_data: list[dict]) -> dict:
if not student_data:
raise ValueError("No student pose frames available for rep frame selection")
error_frame_indices = []
for error in rep.get("errors", []):
error_frame_indices.extend(error.get("frames", []))
if not error_frame_indices:
fallback_index = _safe_frame_index(rep.get("w_pair", (0, 0))[0], len(student_data))
return student_data[fallback_index]
target_frame = error_frame_indices[0]
start, end = rep.get("range", (0, len(student_data)))
candidates = student_data[start:end] or student_data
return min(candidates, key=lambda frame: abs(frame["frame_idx"] - target_frame))
def _safe_frame_index(index: object, length: int) -> int:
try:
parsed = int(index)
except (TypeError, ValueError):
parsed = 0
return _clamp(parsed, 0, length - 1)
def _should_request_rep_visual_feedback(rep: dict, rep_errors: list[dict]) -> bool:
return bool(rep_errors)
def _empty_rep_visual_feedback() -> dict:
return {
"source": "",
"is_error": False,
"visual_error_label": "",
"feedback": "",
"arrow": None,
}
def _build_rep_visual_context(rep: dict, rep_errors: list[dict]) -> dict:
return {
"rep_num": int(rep["rep_num"]),
"score_pct": round(float(rep["score"]) * 100, 1),
"rule_score_pct": round(float(rep["rule_score"]) * 100, 1),
"dtw_score_pct": round(float(rep["dtw_score"]) * 100, 1),
"rule_errors": [
{
"type": error["type"],
"label": error["label"],
"severity": error["severity"],
"guidance": error["guidance"],
}
for error in rep_errors
],
"rule_feedback": _rep_feedback(rep_errors),
}
def _build_rule_arrow(rep_errors: list[dict], landmark_points: list[dict]) -> dict | None:
if not rep_errors or not landmark_points:
return None
for error in rep_errors:
error_type = error.get("type", "")
target = _arrow_target_for_error(error_type, landmark_points)
if target:
return {
"x": target["x"],
"y": target["y"],
"label": ARROW_LABELS.get(error_type, error.get("label") or "Vị trí cần sửa"),
"source": "rule_landmark",
"error_type": error_type,
}
return None
def _arrow_target_for_error(error_type: str, landmark_points: list[dict]) -> dict | None:
if error_type in {"hip_sag", "hip_pike", "body_not_straight"}:
return (
_average_pose_points(landmark_points, [23, 24])
or _average_pose_points(landmark_points, [11, 12, 23, 24])
)
if error_type == "not_deep_enough":
shoulder = _average_pose_points(landmark_points, [11, 12])
hip = _average_pose_points(landmark_points, [23, 24])
if shoulder and hip:
return _interpolate_pose_points(shoulder, hip, 0.30)
return shoulder or _average_pose_points(landmark_points, [13, 14])
if error_type == "head_misaligned":
return (
_average_pose_points(landmark_points, [0, 7, 8])
or _average_pose_points(landmark_points, [0, 11, 12])
)
return _average_pose_points(landmark_points, [11, 12, 23, 24])
def _average_pose_points(landmark_points: list[dict], indices: list[int]) -> dict | None:
valid_points = [
point
for index in indices
if (point := _pose_point(landmark_points, index)) is not None
and point["visibility"] >= 0.10
]
if not valid_points:
valid_points = [
point
for index in indices
if (point := _pose_point(landmark_points, index)) is not None
]
if not valid_points:
return None
return {
"x": _clamp_float(sum(point["x"] for point in valid_points) / len(valid_points), 0.02, 0.98),
"y": _clamp_float(sum(point["y"] for point in valid_points) / len(valid_points), 0.02, 0.98),
}
def _pose_point(landmark_points: list[dict], index: int) -> dict | None:
if index >= len(landmark_points):
return None
point = landmark_points[index]
x = float(point.get("x", -1.0))
y = float(point.get("y", -1.0))
if not np.isfinite(x) or not np.isfinite(y) or x < -0.05 or x > 1.05 or y < -0.05 or y > 1.05:
return None
return {
"x": _clamp_float(x, 0.02, 0.98),
"y": _clamp_float(y, 0.02, 0.98),
"visibility": float(point.get("visibility", 1.0)),
}
def _interpolate_pose_points(start: dict, end: dict, ratio: float) -> dict:
ratio = _clamp_float(ratio, 0.0, 1.0)
return {
"x": _clamp_float(start["x"] + (end["x"] - start["x"]) * ratio, 0.02, 0.98),
"y": _clamp_float(start["y"] + (end["y"] - start["y"]) * ratio, 0.02, 0.98),
}
def _clamp_float(value: float, lower: float, upper: float) -> float:
return max(lower, min(value, upper))
def _draw_llm_arrow_on_image(image_path: Path, arrow: dict) -> None:
frame = cv2.imread(str(image_path))
if frame is None:
return
height, width = frame.shape[:2]
target_x = int(float(arrow["x"]) * width)
target_y = int(float(arrow["y"]) * height)
target_x = _clamp(target_x, 0, width - 1)
target_y = _clamp(target_y, 0, height - 1)
start_x, start_y = _arrow_start_point(target_x, target_y, width, height)
label = str(arrow.get("label") or "Vị trí cần sửa")
cv2.arrowedLine(
frame,
(start_x, start_y),
(target_x, target_y),
(0, 0, 255),
5,
tipLength=0.22,
)
cv2.circle(frame, (target_x, target_y), 22, (0, 0, 255), 5)
_draw_label(frame, label, start_x, start_y - 42)
cv2.imwrite(str(image_path), frame)
def _arrow_start_point(target_x: int, target_y: int, width: int, height: int) -> tuple[int, int]:
x_offset = _clamp(int(width * 0.24), 150, 240)
y_offset = _clamp(int(height * 0.20), 90, 150)
start_x = target_x - x_offset
if start_x < 20:
start_x = target_x + x_offset
start_y = target_y - y_offset
if start_y < 48:
start_y = target_y + y_offset
return (
_clamp(start_x, 20, width - 40),
_clamp(start_y, 48, height - 32),
)
def _draw_label(frame, label: str, x: int, y: int) -> None:
height, width = frame.shape[:2]
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
return
font = _load_label_font(ImageFont, _clamp(width // 44, 18, 26))
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
draw = ImageDraw.Draw(image)
text_bbox = draw.textbbox((0, 0), label, font=font)
text_w = text_bbox[2] - text_bbox[0]
text_h = text_bbox[3] - text_bbox[1]
padding_x = 9
padding_y = 7
x = _clamp(x, 8, width - text_w - padding_x * 2 - 8)
y = _clamp(y, 8, height - text_h - padding_y * 2 - 8)
draw.rounded_rectangle(
(x, y, x + text_w + padding_x * 2, y + text_h + padding_y * 2),
radius=4,
fill=(220, 0, 0),
)
draw.text((x + padding_x, y + padding_y), label, font=font, fill=(255, 255, 255))
frame[:, :] = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
def _load_label_font(image_font, size: int):
font_candidates = [
Path("C:/Windows/Fonts/arial.ttf"),
Path("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf"),
Path("/System/Library/Fonts/Supplemental/Arial Unicode.ttf"),
]
for font_path in font_candidates:
if font_path.exists():
return image_font.truetype(str(font_path), size=size)
return image_font.load_default()
def _clamp(value: int, lower: int, upper: int) -> int:
if upper < lower:
return lower
return max(lower, min(value, upper))
def _serialize_error(error: dict) -> dict:
return {
"type": error["type"],
"label": ERROR_LABELS.get(error["type"], error["message"]),
"message": error["message"],
"severity": error["severity"],
"guidance": ERROR_GUIDANCE.get(error["type"], "Giữ nhịp ổn định và kiểm soát tốt hơn."),
}
def _rep_status_label(score: float, errors: list[dict]) -> str:
if not errors and score >= 0.88:
return "Tốt"
if score >= 0.75:
return "Cần chú ý"
return "Có lỗi"
def _rep_status_tone(score: float, errors: list[dict]) -> str:
if not errors and score >= 0.88:
return "good"
if score >= 0.75:
return "warning"
return "danger"
def _rep_feedback(errors: list[dict]) -> str:
if not errors:
return "Rep ổn định, thân người được giữ khá tốt và không có lỗi nghiêm trọng."
suggestions = []
for error in errors:
suggestion = error["guidance"]
if suggestion not in suggestions:
suggestions.append(suggestion)
return " ".join(suggestions[:2])
def _highest_severity(rep_results: list[dict], error_type: str) -> str:
highest = "low"
for rep in rep_results:
for error in rep["errors"]:
if error["type"] == error_type and SEVERITY_RANK[error["severity"]] > SEVERITY_RANK[highest]:
highest = error["severity"]
return highest
def _build_summary(
overall_score: float,
rep_results: list[dict],
total_reps: int,
target_reps: int,
main_errors: Counter[str],
) -> str:
if not rep_results:
return "Chưa có rep hợp lệ để phân tích."
trailing_drop = rep_results[-1]["score"] < rep_results[0]["score"] - 0.1
strongest_rep = max(rep_results, key=lambda rep: rep["score"])
segments = [f"Bạn hoàn thành {total_reps} rep"]
if total_reps < target_reps:
segments[0] += f", thấp hơn mục tiêu {target_reps} rep."
else:
segments[0] += "."
if overall_score >= 0.88:
segments.append("Kỹ thuật tổng thể ổn định và khá gần với mẫu chuẩn.")
elif overall_score >= 0.75:
segments.append("Kỹ thuật tương đối tốt nhưng vẫn có vài rep cần chỉnh lại form.")
else:
segments.append("Form chưa ổn định, nên ưu tiên sửa các lỗi chính trước khi tăng số rep.")
if main_errors:
top_error, count = main_errors.most_common(1)[0]
segments.append(f"Lỗi xuất hiện nhiều nhất là {ERROR_LABELS.get(top_error, top_error).lower()}{count} rep.")
if trailing_drop:
segments.append("Các rep cuối có dấu hiệu giảm chất lượng, khả năng cao do mất ổn định hoặc xuống sức.")
else:
segments.append(f"Rep tốt nhất là rep {strongest_rep['rep_num']} với điểm {strongest_rep['score_pct']:.1f}%.")
return " ".join(segments)
def _safe_relative_path(path: Path, base: Path) -> str:
try:
return path.relative_to(base).as_posix()
except ValueError:
return path.as_posix()
def _project_relative_path(path: Path) -> str:
try:
return path.relative_to(BASE_DIR).as_posix()
except ValueError:
return path.as_posix()