Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Evaluate a single video file with KSI metrics and natural language coaching. | |
| Supports both file paths and real-time webcam input. | |
| """ | |
| # --- DETERMINISM FIXES (MUST BE BEFORE TF IMPORT) --- | |
| import os | |
| import sys | |
| # Check for GPU flag early (before TF imports) | |
| _use_gpu = '--gpu' in sys.argv | |
| if not _use_gpu: | |
| # Force CPU mode for deterministic predictions | |
| os.environ['CUDA_VISIBLE_DEVICES'] = '-1' | |
| os.environ['MEDIAPIPE_DISABLE_GPU'] = '1' | |
| print("π Running in CPU mode for deterministic predictions (use --gpu to enable GPU)") | |
| os.environ['TF_DETERMINISTIC_OPS'] = '1' | |
| os.environ['TF_CUDNN_DETERMINISTIC'] = '1' | |
| os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' | |
| import argparse | |
| import yaml | |
| import cv2 | |
| import numpy as np | |
| from collections import deque | |
| import tensorflow as tf | |
| from tensorflow.keras.models import load_model | |
| from tensorflow.keras.applications.mobilenet_v2 import preprocess_input | |
| # Disable GPU visibility in TF if CPU mode (double check) | |
| if not _use_gpu: | |
| tf.config.set_visible_devices([], 'GPU') | |
| import mediapipe as mp | |
| from ksi_v2 import EnhancedKSI, ShotPhase | |
| from features import HybridFeatureExtractor | |
| from utils import normalize_pose, resolve_crop_config_for_video, should_skip_crop | |
| try: | |
| from natural_language_coach import generate_coaching_report | |
| NLP_AVAILABLE = True | |
| except ImportError: | |
| NLP_AVAILABLE = False | |
| def load_params(): | |
| """Load configuration from params.yaml""" | |
| with open("params.yaml") as f: | |
| return yaml.safe_load(f) | |
| def load_expert_templates(params): | |
| """Load expert reference templates""" | |
| template_path = params['expert_pipeline']['output_path'] | |
| if not os.path.exists(template_path): | |
| raise FileNotFoundError(f"Templates not found at {template_path}") | |
| return np.load(template_path, allow_pickle=True) | |
| def _smooth_signal(signal, window_size=5): | |
| """Apply exponential moving average for smoothing noisy signals.""" | |
| if len(signal) == 0: | |
| return signal | |
| alpha = 2.0 / (window_size + 1) | |
| smoothed = [signal[0]] | |
| for val in signal[1:]: | |
| smoothed.append(alpha * val + (1 - alpha) * smoothed[-1]) | |
| return np.array(smoothed) | |
| def find_contact_moment(all_landmarks, seq_len): | |
| """ | |
| Identify the contact moment using multi-joint acceleration (real-time optimized). | |
| Contact detection: | |
| - Combines wrist (16), elbow (14), shoulder (12) joints | |
| - Calculates composite "arm acceleration" (rate of velocity change) | |
| - Finds peak acceleration with temporal smoothing for robustness | |
| - Returns the window with highest acceleration (predictive of contact) | |
| Returns: | |
| contact_window_idx: Index of window containing contact | |
| contact_frame_in_window: Frame within that window where contact occurs (0-seq_len) | |
| """ | |
| max_acceleration = 0 | |
| contact_window = 0 | |
| contact_frame = 0 | |
| for win_idx, landmarks_window in enumerate(all_landmarks): | |
| # landmarks_window: (T, 33, 3) | |
| # Get right arm joints: shoulder (12), elbow (14), wrist (16) | |
| shoulder_pos = landmarks_window[:, 12, :2] # (T, 2) | |
| elbow_pos = landmarks_window[:, 14, :2] | |
| wrist_pos = landmarks_window[:, 16, :2] | |
| # Calculate velocities for each segment | |
| shoulder_vel = np.linalg.norm(np.diff(shoulder_pos, axis=0), axis=1) # (T-1,) | |
| elbow_vel = np.linalg.norm(np.diff(elbow_pos, axis=0), axis=1) | |
| wrist_vel = np.linalg.norm(np.diff(wrist_pos, axis=0), axis=1) | |
| # Composite arm velocity (wrist is primary, elbow secondary, shoulder stabilizer) | |
| # Weighted: more weight on distal joints (wrist is fastest) | |
| composite_vel = 0.5 * wrist_vel + 0.3 * elbow_vel + 0.2 * shoulder_vel | |
| # Smooth velocity for real-time robustness | |
| composite_vel_smooth = _smooth_signal(composite_vel, window_size=3) | |
| # Calculate acceleration (change in composite velocity) | |
| if len(composite_vel_smooth) > 1: | |
| acceleration = np.linalg.norm(np.diff(composite_vel_smooth)) | |
| # Check if this window has maximum acceleration | |
| if acceleration > max_acceleration: | |
| max_acceleration = acceleration | |
| contact_window = win_idx | |
| # Find frame in this window with max composite velocity | |
| contact_frame = np.argmax(composite_vel_smooth) | |
| return contact_window, contact_frame | |
| def predict_shot_type_at_contact(all_windows, all_landmarks, model, classes, cnn_dim, pipeline_type='hybrid'): | |
| """ | |
| Instead of consensus across all windows, predict based on the window | |
| containing the contact moment (highest acceleration). | |
| Returns: | |
| predictions: All predictions for reference | |
| best_prediction: The prediction at contact moment | |
| best_class: Shot class at contact | |
| contact_info: Dict with contact window and frame info | |
| """ | |
| # Find contact moment | |
| seq_len = all_windows[0].shape[0] if all_windows else 40 | |
| contact_window_idx, contact_frame = find_contact_moment(all_landmarks, seq_len) | |
| # Get all predictions first | |
| features = np.array(all_windows) # (N, T, D) | |
| # Downsample to match model's expected sequence length | |
| expected_seq_len = int(model.inputs[0].shape[1]) if model.inputs else seq_len | |
| if expected_seq_len != seq_len and expected_seq_len is not None: | |
| # Downsample using stride | |
| stride = max(1, seq_len // expected_seq_len) | |
| features = features[:, ::stride, :][:, :expected_seq_len, :] # Take every stride-th frame | |
| # Also downsample landmarks for contact detection | |
| all_landmarks = [lm[::stride][:expected_seq_len] for lm in all_landmarks] | |
| seq_len = expected_seq_len | |
| model_inputs = _prepare_model_inputs(model, x_fused=features, cnn_dim=cnn_dim) | |
| all_probs = model.predict(model_inputs, verbose=0) | |
| all_predictions = [] | |
| for i, probs in enumerate(all_probs): | |
| pred_idx = np.argmax(probs) | |
| predicted_class = classes[pred_idx] | |
| confidence = float(probs[pred_idx]) | |
| all_predictions.append({ | |
| 'window': i, | |
| 'class': predicted_class, | |
| 'confidence': confidence, | |
| 'all_scores': {classes[j]: float(probs[j]) for j in range(len(classes))} | |
| }) | |
| # Get prediction at contact | |
| best_prediction = all_predictions[contact_window_idx] | |
| best_class = best_prediction['class'] | |
| contact_info = { | |
| 'contact_window': contact_window_idx, | |
| 'contact_frame': contact_frame, | |
| 'total_windows': len(all_windows), | |
| 'seq_len': seq_len | |
| } | |
| return all_predictions, best_prediction, best_class, contact_info | |
| def extract_features_from_video(video_source, extractor, params, pipeline_type='hybrid'): | |
| """ | |
| Extract features and landmarks from video using sliding window (like realtime_hybrid). | |
| Uses image-space landmarks (pose_landmarks) so units match expert templates. | |
| Skips low-quality windows to avoid zeroed KSI. | |
| Args: | |
| video_source: File path or webcam index (0, 1, etc) | |
| extractor: Feature extractor (HybridFeatureExtractor) | |
| params: Configuration dict | |
| pipeline_type: 'hybrid' or 'pose' | |
| Returns: | |
| all_windows: List of fused feature windows | |
| all_landmarks: List of (T, 33, 3) landmarks per window | |
| frame_count: Total frames processed | |
| """ | |
| # Open video or webcam | |
| if isinstance(video_source, str) and video_source.isdigit(): | |
| cap = cv2.VideoCapture(int(video_source)) | |
| is_webcam = True | |
| else: | |
| cap = cv2.VideoCapture(video_source) | |
| is_webcam = False | |
| if not cap.isOpened(): | |
| raise RuntimeError(f"Cannot open video source: {video_source}") | |
| cfg = params['hybrid_pipeline'] | |
| seq_len = cfg['sequence_length'] | |
| cnn_dim = cfg['cnn_feature_dim'] | |
| # Correctly resolve crop config | |
| base_crop = cfg.get('crop_config', {}) | |
| overrides = params.get('crop_overrides', {}) | |
| crop_cfg = resolve_crop_config_for_video(video_source, base_crop, overrides) | |
| roi_cfg = cfg.get("cnn_roi") or {} | |
| # Use extractor's pose model | |
| window = deque(maxlen=seq_len) | |
| landmark_window = deque(maxlen=seq_len) | |
| valid_mask_window = deque(maxlen=seq_len) | |
| all_windows = [] | |
| all_landmarks = [] | |
| all_valid_ratios = [] | |
| last_pose = None | |
| last_box = None | |
| frame_count = 0 | |
| print(f"πΉ Processing video from: {video_source}") | |
| print(f" Sequence length: {seq_len} | CNN features: {cnn_dim}") | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Crop if needed | |
| if crop_cfg: | |
| frame = _apply_crop(frame, crop_cfg) | |
| # Pose detection using extractor's MediaPipe | |
| res = extractor.pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| frame_count += 1 | |
| # Extract pose landmarks (image-space) so scale matches templates | |
| if res.pose_landmarks: | |
| lm = np.array( | |
| [[l.x, l.y, l.z] for l in res.pose_landmarks.landmark], | |
| dtype=np.float32, | |
| ) | |
| pose_flat = normalize_pose(lm).astype(np.float32).flatten() | |
| last_pose = pose_flat | |
| landmark_window.append(lm) | |
| valid_mask_window.append(1) | |
| else: | |
| # Reuse last good pose; if none, mark invalid | |
| zeros_pose = np.zeros(99, dtype=np.float32) | |
| pose_flat = last_pose if last_pose is not None else zeros_pose | |
| landmark_window.append(np.zeros((33, 3), dtype=np.float32)) | |
| valid_mask_window.append(0) | |
| # Extract CNN features from frame | |
| h, w = frame.shape[:2] | |
| box = extractor._compute_pose_roi_box( | |
| getattr(res, "pose_landmarks", None), | |
| w, | |
| h, | |
| roi_cfg, | |
| last_box=last_box, | |
| ) | |
| last_box = box if box is not None else last_box | |
| roi_frame = extractor._crop_with_box(frame, box) | |
| img_size = cfg.get("cnn_input_size", 224) | |
| img = cv2.resize(roi_frame, (img_size, img_size)) | |
| img = preprocess_input(np.expand_dims(img[..., ::-1], axis=0)) | |
| cnn_feat = extractor.rgb_model.predict(img, verbose=0)[0].astype(np.float32) | |
| # Fuse pose and CNN features | |
| fused = np.concatenate([pose_flat, cnn_feat], axis=0) | |
| window.append(fused) | |
| # When window is full, save it | |
| if len(window) == seq_len: | |
| valid_ratio = sum(valid_mask_window) / float(seq_len) | |
| all_windows.append(np.array(list(window))) | |
| all_landmarks.append(np.array(list(landmark_window))) | |
| all_valid_ratios.append(valid_ratio) | |
| cap.release() | |
| extractor.pose.close() | |
| if not all_windows: | |
| raise RuntimeError("No valid windows extracted from video") | |
| # Filter out low-quality windows (too many missing poses or NaNs) | |
| filtered_windows = [] | |
| filtered_landmarks = [] | |
| for win, lm, ratio in zip(all_windows, all_landmarks, all_valid_ratios): | |
| if ratio < 0.7: # require at least 70% frames with pose | |
| continue | |
| if not np.isfinite(win).all() or not np.isfinite(lm).all(): | |
| continue | |
| if np.allclose(lm, 0): # avoid all-zero landmark windows | |
| continue | |
| filtered_windows.append(win) | |
| filtered_landmarks.append(lm) | |
| if not filtered_windows: | |
| raise RuntimeError("All windows were filtered out due to low pose quality; try a clearer video") | |
| print(f" Extracted {frame_count} frames into {len(filtered_windows)} valid windows (from {len(all_windows)} total)") | |
| print(f" Window shape: {filtered_windows[0].shape}") | |
| print(f" Landmarks shape: {filtered_landmarks[0].shape}") | |
| return filtered_windows, filtered_landmarks, frame_count | |
| print(f" Features shape: {features.shape}") | |
| print(f" Landmarks shape: {raw_landmarks.shape}") | |
| return features, raw_landmarks, frame_count | |
| def _apply_crop(frame, crop_cfg): | |
| """Apply crop to frame""" | |
| if crop_cfg is None: | |
| return frame | |
| h, w = frame.shape[:2] | |
| start_row = int(h * float(crop_cfg.get("top", 0.0))) | |
| end_row = h - int(h * float(crop_cfg.get("bottom", 0.0))) | |
| start_col = int(w * float(crop_cfg.get("left", 0.0))) | |
| end_col = w - int(w * float(crop_cfg.get("right", 0.0))) | |
| cropped = frame[start_row:end_row, start_col:end_col] | |
| return cropped if cropped.size else frame | |
| def _prepare_model_inputs(model, x_fused, cnn_dim): | |
| """Prepare inputs for model (handles different input signatures like realtime_hybrid).""" | |
| if x_fused.ndim != 3: | |
| raise ValueError(f"Expected x_fused shape (N, T, D), got {x_fused.shape}") | |
| fused_dim = int(x_fused.shape[-1]) | |
| x_cnn = x_fused[..., -cnn_dim:] if cnn_dim > 0 else x_fused[..., :0] | |
| x_pose = x_fused[..., :-cnn_dim] if cnn_dim > 0 else x_fused | |
| # For dual-input models (CNN + Pose), return both inputs in the correct order | |
| if len(model.inputs) == 2: | |
| # Typically: [cnn_input, pose_input] or [pose_input, cnn_input] | |
| # Check which input expects which features based on shape | |
| input_shapes = [int(inp.shape[-1]) for inp in model.inputs] | |
| result = [] | |
| for expected_dim in input_shapes: | |
| if expected_dim == cnn_dim: | |
| result.append(x_cnn) | |
| elif expected_dim == (fused_dim - cnn_dim): | |
| result.append(x_pose) | |
| else: | |
| raise ValueError( | |
| f"Model expects input dim {expected_dim}, but available are CNN({cnn_dim}) or Pose({fused_dim - cnn_dim}). " | |
| f"(fused_dim={fused_dim})" | |
| ) | |
| return result | |
| # Single input model: try to match the expected dimension | |
| if len(model.inputs) == 1: | |
| expected = int(model.inputs[0].shape[-1]) | |
| candidates = { | |
| int(x_cnn.shape[-1]): x_cnn, | |
| int(x_pose.shape[-1]): x_pose, | |
| int(x_fused.shape[-1]): x_fused, | |
| } | |
| if expected in candidates: | |
| return [candidates[expected]] | |
| return [x_fused] | |
| # Multiple inputs: try to match each dimension | |
| expected_dims = [] | |
| for inp in model.inputs: | |
| try: | |
| expected_dims.append(int(inp.shape[-1])) | |
| except Exception: | |
| expected_dims.append(None) | |
| prepared = [] | |
| for d in expected_dims: | |
| if d is None: | |
| prepared.append(x_fused) | |
| continue | |
| if d == cnn_dim: | |
| prepared.append(x_cnn) | |
| elif d == (fused_dim - cnn_dim): | |
| prepared.append(x_pose) | |
| else: | |
| raise ValueError( | |
| f"Model expects input dim {d}, but available are CNN({cnn_dim}) or Pose({fused_dim - cnn_dim}). " | |
| f"(fused_dim={fused_dim})" | |
| ) | |
| return prepared | |
| def predict_shot_type(all_windows, model, classes, cnn_dim, pipeline_type='hybrid'): | |
| """ | |
| Predict shot type for all windows using proper model input preparation. | |
| Returns: | |
| predictions: List of dicts with 'class', 'confidence', and 'all_scores' | |
| best_class: Most common predicted class (consensus) | |
| """ | |
| predictions = [] | |
| pred_indices = [] | |
| # Convert list of windows to array | |
| features = np.array(all_windows) # (N, T, D) | |
| # Prepare and predict | |
| model_inputs = _prepare_model_inputs(model, x_fused=features, cnn_dim=cnn_dim) | |
| all_probs = model.predict(model_inputs, verbose=0) | |
| for probs in all_probs: | |
| pred_idx = np.argmax(probs) | |
| pred_indices.append(pred_idx) | |
| predicted_class = classes[pred_idx] | |
| confidence = float(probs[pred_idx]) | |
| predictions.append({ | |
| 'class': predicted_class, | |
| 'confidence': confidence, | |
| 'all_scores': {classes[i]: float(probs[i]) for i in range(len(classes))} | |
| }) | |
| # Find most common prediction (consensus) | |
| from collections import Counter | |
| pred_counts = Counter(pred_indices) | |
| best_idx = pred_counts.most_common(1)[0][0] | |
| best_class = classes[best_idx] | |
| return predictions, best_class | |
| def evaluate_video( | |
| video_source, | |
| model_path, | |
| pipeline_type='hybrid', | |
| nlp_skill_level='intermediate', | |
| generate_report=True | |
| ): | |
| """ | |
| Main evaluation function for single video. | |
| Args: | |
| video_source: File path or webcam index (0, 1, etc) | |
| model_path: Path to trained model | |
| pipeline_type: 'hybrid' or 'pose' | |
| nlp_skill_level: Skill level for coaching ('beginner', 'intermediate', 'advanced', 'expert') | |
| generate_report: Whether to generate coaching report | |
| """ | |
| params = load_params() | |
| cfg = params[f'{pipeline_type}_pipeline'] | |
| ksi_cfg = params.get('ksi', {'weights': {'pose': 0.5, 'velocity': 0.3, 'acceleration': 0.2}}) | |
| # Load model | |
| print(f"\nπ Loading model: {model_path}") | |
| model = load_model(model_path) | |
| # Load classes | |
| data_path = cfg['data_path'] | |
| classes = sorted([d for d in os.listdir(data_path) if os.path.isdir(os.path.join(data_path, d))]) | |
| print(f"π Classes: {classes}") | |
| # Feature extractor | |
| mp_config = params['mediapipe'] | |
| extractor = HybridFeatureExtractor( | |
| mp_config=mp_config, | |
| cnn_dim=cfg['cnn_feature_dim'], | |
| cnn_input_size=cfg['cnn_input_size'], | |
| rsn_weights_path=cfg.get('rsn_pretrained_weights'), | |
| ) | |
| # Get sequence parameters | |
| seq_len = cfg['sequence_length'] | |
| stride = cfg['stride'] | |
| # Extract features from video | |
| print(f"\n{'='*70}") | |
| print(f"EXTRACTING FEATURES FROM VIDEO") | |
| print(f"{'='*70}") | |
| all_windows, all_landmarks, frame_count = extract_features_from_video( | |
| video_source, extractor, params, pipeline_type | |
| ) | |
| # Predict shot type | |
| print(f"\n{'='*70}") | |
| print(f"PREDICTING SHOT TYPE (ALL WINDOWS)") | |
| print(f"{'='*70}") | |
| all_predictions, best_prediction, best_class, contact_info = predict_shot_type_at_contact( | |
| all_windows, all_landmarks, model, classes, cfg['cnn_feature_dim'], pipeline_type | |
| ) | |
| print(f"\nπ Total predictions: {len(all_predictions)}") | |
| print(f"{'β'*70}") | |
| # Group by class and show statistics | |
| from collections import Counter | |
| pred_classes = [p['class'] for p in all_predictions] | |
| class_counts = Counter(pred_classes) | |
| print(f"\nπ― PREDICTION SUMMARY (all windows):") | |
| for shot_class in sorted(class_counts.keys()): | |
| count = class_counts[shot_class] | |
| percentage = 100 * count / len(all_predictions) | |
| confs = [p['confidence'] for p in all_predictions if p['class'] == shot_class] | |
| avg_conf = np.mean(confs) | |
| print(f" {shot_class:20s}: {count:3d} predictions ({percentage:5.1f}%) | Avg confidence: {avg_conf:.2%}") | |
| # Show contact-based prediction | |
| print(f"\n{'β'*70}") | |
| print(f"β‘ CONTACT-BASED PREDICTION (MOST RELIABLE):") | |
| print(f"{'β'*70}") | |
| print(f" Contact occurs at: Window {contact_info['contact_window']} (frame {contact_info['contact_frame']}/{contact_info['seq_len']})") | |
| print(f"\n π― Predicted at contact: {best_prediction['class']}") | |
| print(f" Confidence: {best_prediction['confidence']:.2%}") | |
| print(f" All scores at contact:") | |
| for cls, score in sorted(best_prediction['all_scores'].items(), key=lambda x: x[1], reverse=True): | |
| print(f" {cls:20s}: {score:.2%}") | |
| # Show first 10 detailed predictions | |
| print(f"\n{'β'*70}") | |
| print(f"π DETAILED PREDICTIONS (first 10 windows):") | |
| print(f"{'β'*70}") | |
| for i, pred in enumerate(all_predictions[:10]): | |
| marker = " β‘ CONTACT" if i == contact_info['contact_window'] else "" | |
| print(f"\n Window {i+1:2d}: {pred['class']:20s} ({pred['confidence']:.2%}){marker}") | |
| sorted_scores = sorted(pred['all_scores'].items(), key=lambda x: x[1], reverse=True) | |
| for cls, score in sorted_scores[:3]: | |
| print(f" {cls:20s}: {score:.2%}") | |
| if len(all_predictions) > 10: | |
| print(f"\n ... and {len(all_predictions) - 10} more predictions") | |
| # Calculate KSI | |
| print(f"\n{'='*70}") | |
| print(f"CALCULATING KSI METRICS") | |
| print(f"{'='*70}") | |
| try: | |
| templates = load_expert_templates(params) | |
| except FileNotFoundError: | |
| print("β οΈ Expert templates not found (data/expert_templates.npz).") | |
| print(" Skipping KSI metrics and coaching report.") | |
| print(" Run 'dvc repro generate_templates' to output templates.") | |
| return | |
| ksi_calc = EnhancedKSI() | |
| # Get expert template for consensus class | |
| template_key = best_class | |
| if template_key not in templates.files: | |
| template_key = f'{best_class}_variant1' | |
| if template_key not in templates.files: | |
| print(f"β οΈ Template not found for {best_class}") | |
| return | |
| expert_template = templates[template_key] | |
| if expert_template.ndim == 2 and expert_template.shape[1] == 99: | |
| expert_lm = expert_template.reshape(-1, 33, 3) | |
| else: | |
| expert_lm = expert_template | |
| # Calculate KSI for each window and average | |
| ksi_scores = [] | |
| for i, user_lm in enumerate(all_landmarks): | |
| result = ksi_calc.calculate( | |
| expert_landmarks=expert_lm, | |
| user_landmarks=user_lm, | |
| weights=ksi_cfg['weights'], | |
| ) | |
| ksi_scores.append(result) | |
| # Use average KSI result | |
| avg_ksi_total = np.mean([r.ksi_total for r in ksi_scores]) | |
| avg_ksi_weighted = np.mean([r.ksi_weighted for r in ksi_scores]) | |
| # Prefer contact window if valid, otherwise highest KSI | |
| contact_idx = contact_info['contact_window'] if ksi_scores else 0 | |
| if ksi_scores and np.isfinite(ksi_scores[contact_idx].ksi_total) and ksi_scores[contact_idx].ksi_total > 0: | |
| result = ksi_scores[contact_idx] | |
| chosen_idx = contact_idx | |
| chosen_reason = "contact window" | |
| else: | |
| best_idx = int(np.argmax([r.ksi_total for r in ksi_scores])) if ksi_scores else 0 | |
| result = ksi_scores[best_idx] | |
| chosen_idx = best_idx | |
| chosen_reason = "highest KSI" | |
| print(f"π KSI Analysis ({len(ksi_scores)} windows):") | |
| print(f" Average KSI Total: {avg_ksi_total:.3f}") | |
| print(f" Average KSI Weighted: {avg_ksi_weighted:.3f}") | |
| print(f" Using window #{chosen_idx + 1} ({chosen_reason}) for report") | |
| print(f"\n Selected KSI Score: {result.ksi_total:.3f}") | |
| print(f" KSI Weighted: {result.ksi_weighted:.3f}") | |
| print(f" Phase scores: {result.phase_scores}") | |
| print(f" Component scores: {result.components}") | |
| # Generate coaching report | |
| if generate_report and NLP_AVAILABLE: | |
| print(f"\n{'='*70}") | |
| print(f"GENERATING COACHING REPORT") | |
| print(f"{'='*70}") | |
| os.makedirs("coaching_reports", exist_ok=True) | |
| report = generate_coaching_report( | |
| ksi_result=result, | |
| shot_type_str=best_class, | |
| skill_level_str=nlp_skill_level, | |
| output_format='text', | |
| simplified=True | |
| ) | |
| report_filename = f"coaching_reports/{best_class}_video_ksi{result.ksi_total:.3f}_report.txt" | |
| with open(report_filename, 'w') as f: | |
| f.write(report) | |
| print(f"β Report saved: {report_filename}") | |
| print(f"\n{'='*70}") | |
| print("π COACHING REPORT PREVIEW") | |
| print(f"{'='*70}") | |
| print(report) | |
| print(f"\n{'='*70}") | |
| print("β¨ EVALUATION COMPLETE") | |
| print(f"{'='*70}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Evaluate a single video with KSI metrics and coaching") | |
| parser.add_argument("video", type=str, help="Video file path or webcam index (0, 1, etc)") | |
| parser.add_argument("--type", choices=['pose', 'hybrid'], default='hybrid', help="Pipeline type (default: hybrid)") | |
| parser.add_argument("--model", type=str, default="models/tcn_hybrid_tuned.h5", help="Model path") | |
| parser.add_argument("--skill", type=str, default='intermediate', | |
| choices=['beginner', 'intermediate', 'advanced', 'expert'], | |
| help="Skill level for coaching (default: intermediate)") | |
| parser.add_argument("--no-report", action='store_true', help="Skip coaching report generation") | |
| parser.add_argument("--gpu", action='store_true', help="Use GPU for inference (faster but less deterministic)") | |
| args = parser.parse_args() | |
| evaluate_video( | |
| video_source=args.video, | |
| model_path=args.model, | |
| pipeline_type=args.type, | |
| nlp_skill_level=args.skill, | |
| generate_report=not args.no_report | |
| ) | |