Spaces:
Running
Running
| from PIL import Image | |
| import gradio as gr | |
| from A8.pose_estimator import MoveNetPoseEstimator | |
| from A12.pose_interpolator import smooth_pose_sequence | |
| #http://127.0.0.1:7860from A12.service.ui import run_a12_tab | |
| from A12.service.ui import run_a12_video_tab | |
| from exercise_pipeline import ExercisePipeline | |
| import json | |
| import csv | |
| import os | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional | |
| import numpy as np | |
| import cv2 | |
| import tempfile | |
| import time | |
| # Initialize MoveNet pose estimator | |
| pose_estimator = MoveNetPoseEstimator(model_name='lightning') | |
| # COCO Keypoint definitions (17 keypoints) | |
| KEYPOINT_NAMES = [ | |
| 'nose', | |
| 'left_eye', | |
| 'right_eye', | |
| 'left_ear', | |
| 'right_ear', | |
| 'left_shoulder', | |
| 'right_shoulder', | |
| 'left_elbow', | |
| 'right_elbow', | |
| 'left_wrist', | |
| 'right_wrist', | |
| 'left_hip', | |
| 'right_hip', | |
| 'left_knee', | |
| 'right_knee', | |
| 'left_ankle', | |
| 'right_ankle' | |
| ] | |
| def extract_joint_positions_from_movenet(pose_result: Dict[str, Any]) -> Dict[str, Any]: | |
| """Extract joint positions from MoveNet pose result.""" | |
| keypoints = pose_result.get('keypoints', {}) | |
| all_keypoints = [] | |
| for joint_name in KEYPOINT_NAMES: | |
| kp = keypoints.get(joint_name, {}) | |
| x = kp.get('x') | |
| y = kp.get('y') | |
| score = kp.get('confidence') | |
| all_keypoints.append({ | |
| "x": x, | |
| "y": y, | |
| "score": score, | |
| "name": joint_name | |
| }) | |
| return { | |
| "poses": [{ | |
| "pose_id": 0, | |
| "total_score": 0.0, | |
| "total_parts": len([k for k in all_keypoints if k['x'] is not None]), | |
| "keypoints": all_keypoints | |
| }], | |
| "timestamp": datetime.now().isoformat(), | |
| "joint_names": KEYPOINT_NAMES, | |
| "inference_time_ms": pose_result.get('inference_time_ms', 0) | |
| } | |
| def save_to_csv(joint_data: Dict[str, Any], filename: str = None) -> str: | |
| """Save joint positions to CSV file.""" | |
| if filename is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"pose_data_{timestamp}.csv" | |
| filepath = os.path.join("pose_outputs", filename) | |
| os.makedirs("pose_outputs", exist_ok=True) | |
| with open(filepath, 'w', newline='') as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerow(["Pose_ID", "Joint", "X", "Y", "Confidence", "Visible"]) | |
| poses = joint_data.get("poses", []) | |
| for pose in poses: | |
| pose_id = pose.get("pose_id", 0) | |
| for kp in pose.get("keypoints", []): | |
| x = kp.get("x") | |
| y = kp.get("y") | |
| score = kp.get("score") | |
| name = kp.get("name", "Unknown") | |
| visible = "Yes" if x is not None and y is not None else "No" | |
| writer.writerow([ | |
| pose_id, | |
| name, | |
| f"{x:.2f}" if x is not None else "N/A", | |
| f"{y:.2f}" if y is not None else "N/A", | |
| f"{score:.3f}" if score is not None else "N/A", | |
| visible | |
| ]) | |
| writer.writerow([]) | |
| writer.writerow(["Timestamp", joint_data.get("timestamp", "")]) | |
| writer.writerow(["Inference_Time_ms", joint_data.get("inference_time_ms", 0)]) | |
| return filepath | |
| def save_to_json(joint_data: Dict[str, Any], filename: str = None) -> str: | |
| """Save joint positions to JSON file.""" | |
| if filename is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"pose_data_{timestamp}.json" | |
| filepath = os.path.join("pose_outputs", filename) | |
| os.makedirs("pose_outputs", exist_ok=True) | |
| with open(filepath, 'w') as jsonfile: | |
| json.dump(joint_data, jsonfile, indent=2) | |
| return filepath | |
| def process_single_image(image: Image.Image, confidence_threshold: float = 0.3) -> tuple: | |
| """Process a single image and return annotated image with pose data.""" | |
| img_array = np.array(image.convert("RGB")) | |
| img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) | |
| pose_result = pose_estimator.detect_pose(img_bgr) | |
| joint_data = extract_joint_positions_from_movenet(pose_result) | |
| result_bgr = pose_estimator.draw_keypoints(img_bgr, pose_result, confidence_threshold=confidence_threshold) | |
| result_rgb = cv2.cvtColor(result_bgr, cv2.COLOR_BGR2RGB) | |
| result_image = Image.fromarray(result_rgb) | |
| csv_path = save_to_csv(joint_data) | |
| json_path = save_to_json(joint_data) | |
| joint_data["csv_path"] = csv_path | |
| joint_data["json_path"] = json_path | |
| return result_image, joint_data | |
| def process_video_frame(frame: np.ndarray, confidence_threshold: float = 0.3) -> np.ndarray: | |
| """Process a single video frame and return annotated frame.""" | |
| # Handle frame format - OpenCV videos are BGR with 3 channels | |
| # If frame has 3 channels, assume BGR. If 4 channels, convert BGRA to BGR. | |
| # If grayscale (2D), convert to BGR. | |
| if len(frame.shape) == 3: | |
| if frame.shape[2] == 3: | |
| img_bgr = frame # Already BGR | |
| elif frame.shape[2] == 4: | |
| img_bgr = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) # Convert BGRA to BGR | |
| else: | |
| img_bgr = frame # Fallback | |
| else: | |
| img_bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) # Convert grayscale to BGR | |
| pose_result = pose_estimator.detect_pose(img_bgr) | |
| annotated_bgr = pose_estimator.draw_keypoints(img_bgr, pose_result, confidence_threshold=confidence_threshold) | |
| return annotated_bgr | |
| def format_pose_output(joint_data: Dict[str, Any]) -> str: | |
| """Format pose data for display in Gradio.""" | |
| output = "### Detected Poses\n\n" | |
| output += f"**Timestamp:** {joint_data.get('timestamp', 'N/A')}\n" | |
| output += f"**Inference Time:** {joint_data.get('inference_time_ms', 0):.2f} ms\n\n" | |
| poses = joint_data.get("poses", []) | |
| if not poses: | |
| output += "No pose data available.\n\n" | |
| else: | |
| for pose in poses: | |
| output += f"#### Pose #{pose.get('pose_id', 0)}\n" | |
| output += f"- **Total Parts:** {pose.get('total_parts', 0)}\n\n" | |
| output += "| Joint | X | Y | Confidence | Visible |\n" | |
| output += "|-------|---|---|------------|---------|\n" | |
| for kp in pose.get("keypoints", []): | |
| name = kp.get("name", "Unknown") | |
| x = kp.get("x") | |
| y = kp.get("y") | |
| score = kp.get("score") | |
| x_str = f"{x:.1f}" if x is not None else "N/A" | |
| y_str = f"{y:.1f}" if y is not None else "N/A" | |
| score_str = f"{score:.3f}" if score is not None else "N/A" | |
| visible = "Yes" if x is not None and y is not None else "No" | |
| output += f"| {name} | {x_str} | {y_str} | {score_str} | {visible} |\n" | |
| output += "\n" | |
| output += f"**CSV File:** `{joint_data.get('csv_path', 'N/A')}`\n" | |
| output += f"**JSON File:** `{joint_data.get('json_path', 'N/A')}`\n" | |
| return output | |
| def run_a14_pipeline(video_path, quality_threshold): | |
| if video_path is None: | |
| return None, "No video uploaded", "N/A", {} | |
| pipeline = ExercisePipeline(quality_threshold=quality_threshold) | |
| try: | |
| results = pipeline.process_video(video_path) | |
| finally: | |
| pipeline.close() | |
| # Handle UGLY case | |
| if results is None or results.get("pipeline_stopped"): | |
| return ( | |
| None, | |
| f"REJECTED — Poor recording quality " | |
| f"(conf: {results.get('recording_confidence', 0):.2f})", | |
| "N/A", | |
| results or {} | |
| ) | |
| # Handle SUCCESS case | |
| stem = Path(video_path).stem | |
| pipeline_dir = Path(__file__).parent | |
| out_dir = pipeline_dir / "outputs" | |
| video_3d_path = out_dir / f"{stem}_skeleton.mp4" | |
| video_3d = None | |
| if video_3d_path.exists(): | |
| import shutil | |
| import tempfile | |
| tmp = tempfile.NamedTemporaryFile( | |
| suffix='.mp4', delete=False) | |
| shutil.copy(str(video_3d_path), tmp.name) | |
| video_3d = tmp.name | |
| print(f" Copied to temp: {tmp.name}") | |
| status_text = (f"ACCEPTED — Recording OK " | |
| f"(conf: {results.get('recording_confidence', 0):.2f})") | |
| quality_text = (f"{results.get('quality_label', 'N/A')} " | |
| f"({results.get('quality_confidence', 0):.1%})") | |
| return ( | |
| video_3d, # 1. a14_3d_output | |
| status_text, # 2. a14_rec_status | |
| quality_text, # 3. a14_exercise_quality | |
| results # 4. a14_json_output | |
| ) | |
| def process_and_display(image: Image.Image, confidence_threshold: float = 0.3) -> tuple: | |
| """Process image and return pose output with data files.""" | |
| result, joint_data = process_single_image(image, confidence_threshold) | |
| pose_info = format_pose_output(joint_data) | |
| return result, pose_info | |
| def process_webcam_video( | |
| video_path: str, | |
| confidence_threshold: float = 0.3, | |
| smoothing_strategy: str = "exponential", | |
| smoothing_method: str = "zscore", | |
| progress=gr.Progress() | |
| ) -> tuple: | |
| """Process uploaded video with pose estimation.""" | |
| if video_path is None: | |
| return None, "No video uploaded." | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| return None, "Could not open video." | |
| # Get video properties | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| print(f"Video properties: FPS={fps}, Width={width}, Height={height}, TotalFrames={total_frames}") | |
| # Validate FPS - if it's extremely high or invalid, use a reasonable default | |
| if fps <= 0 or fps > 240: # 240 FPS is unrealistically high for normal videos | |
| print(f"Invalid FPS ({fps}), using default 30 FPS") | |
| fps = 30 | |
| else: | |
| print(f"Using FPS: {fps}") | |
| # Create output video | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_path = os.path.join("pose_outputs", f"annotated_video_{timestamp}.mp4") | |
| os.makedirs("pose_outputs", exist_ok=True) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| # Verify video writer opened successfully | |
| if not out.isOpened(): | |
| print(f"Error: Video writer failed to open. Output path: {output_path}") | |
| return None, "Failed to create output video. Please check the video format and try again." | |
| all_keypoints = [] | |
| frame_count = 0 | |
| progress(0, desc="Processing video...") | |
| while True: | |
| ret, frame = cap.read() | |
| if not ret: | |
| print(f"Frame read failed at frame {frame_count}") | |
| break | |
| # Debug: Check frame properties | |
| print(f"Frame {frame_count}: shape={frame.shape if frame is not None else None}") | |
| # Process frame | |
| annotated_frame = process_video_frame(frame, confidence_threshold) | |
| # Verify frame dimensions match video writer | |
| if annotated_frame.shape[1] != width or annotated_frame.shape[0] != height: | |
| print(f"Resizing frame from {annotated_frame.shape[1]}x{annotated_frame.shape[0]} to {width}x{height}") | |
| annotated_frame = cv2.resize(annotated_frame, (width, height)) | |
| out.write(annotated_frame) | |
| # Extract keypoints for this frame | |
| img_bgr = frame if frame.shape[2] == 3 else cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| pose_result = pose_estimator.detect_pose(img_bgr) | |
| joint_data = extract_joint_positions_from_movenet(pose_result) | |
| joint_data['frame_id'] = frame_count | |
| joint_data['timestamp'] = frame_count / fps if fps > 0 else 0 | |
| all_keypoints.append(joint_data) | |
| frame_count += 1 | |
| # Update progress | |
| if frame_count % 30 == 0: | |
| progress(frame_count / total_frames if total_frames > 0 else 0, desc=f"Processing frame {frame_count}/{total_frames if total_frames > 0 else '?'}...") | |
| cap.release() | |
| out.release() | |
| print(f"Total frames processed: {frame_count}") | |
| # Apply smoothing to the keypoints | |
| try: | |
| smoothed_keypoints = smooth_pose_sequence( | |
| all_keypoints, | |
| strategy=smoothing_strategy, | |
| outlier_method=smoothing_method, | |
| outlier_threshold=3.0, | |
| window_size=7, | |
| min_confidence=0.2, | |
| ) | |
| except Exception as e: | |
| print(f"Error applying smoothing: {e}") | |
| # Fallback to original keypoints if smoothing fails | |
| smoothed_keypoints = all_keypoints | |
| # Save smoothed keypoints to CSV | |
| csv_path = os.path.join("pose_outputs", f"video_keypoints_{timestamp}.csv") | |
| with open(csv_path, 'w', newline='') as csvfile: | |
| writer = csv.writer(csvfile) | |
| writer.writerow(["Frame_ID", "Joint", "X", "Y", "Confidence", "Visible"]) | |
| for frame_data in smoothed_keypoints: | |
| frame_id = frame_data.get('frame_id', 0) | |
| for kp in frame_data['poses'][0]['keypoints']: | |
| x = kp.get('x') | |
| y = kp.get('y') | |
| score = kp.get('score') | |
| name = kp.get('name', 'Unknown') | |
| visible = "Yes" if x is not None and y is not None else "No" | |
| writer.writerow([ | |
| frame_id, | |
| name, | |
| f"{x:.2f}" if x is not None else "N/A", | |
| f"{y:.2f}" if y is not None else "N/A", | |
| f"{score:.3f}" if score is not None else "N/A", | |
| visible | |
| ]) | |
| avg_inference = np.mean([k.get('inference_time_ms', 0) for k in all_keypoints]) if all_keypoints else 0 | |
| result_text = f"""### Video Processing Complete | |
| - **Frames processed:** {frame_count} | |
| - **Average inference time:** {avg_inference:.2f} ms/frame | |
| - **Output video:** `{output_path}` | |
| - **Keypoints CSV:** `{csv_path}` | |
| """ | |
| return output_path, result_text | |
| # Gradio UI with Tabs | |
| with gr.Blocks(title="MoveNet Pose Estimation") as demo: | |
| gr.Markdown("# 🏃 MoveNet Pose Estimation") | |
| gr.Markdown("Estimate human poses using Google's MoveNet model. Supports single images and video files.") | |
| with gr.Tabs(): | |
| # Image Processing Tab | |
| with gr.TabItem("📸 Image Processing"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Upload Image") | |
| image_input = gr.Image(type="pil", label="Input Image") | |
| confidence_slider = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.05, | |
| label="Confidence Threshold" | |
| ) | |
| process_btn = gr.Button("🚀 Process Image", variant="primary") | |
| with gr.Column(): | |
| gr.Markdown("### Results") | |
| image_output = gr.Image(type="pil", label="Annotated Output") | |
| pose_text = gr.Textbox(label="Pose Data", lines=15) | |
| process_btn.click( | |
| fn=process_and_display, | |
| inputs=[image_input, confidence_slider], | |
| outputs=[image_output, pose_text] | |
| ) | |
| # Video Processing Tab | |
| with gr.TabItem("🎥 Video Processing"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Upload Video") | |
| video_input = gr.Video(label="Input Video") | |
| video_confidence = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.05, | |
| label="Confidence Threshold" | |
| ) | |
| smoothing_strategy = gr.Dropdown( | |
| choices=["exponential", "moving_average", "gaussian", "median", "savitzky_golay", "kalman", "spline", "hybrid"], | |
| value="exponential", | |
| label="Smoothing Strategy" | |
| ) | |
| smoothing_method = gr.Dropdown( | |
| choices=["zscore", "velocity", "none"], | |
| value="zscore", | |
| label="Outlier Detection Method" | |
| ) | |
| process_video_btn = gr.Button("🎬 Process Video", variant="primary") | |
| with gr.Column(): | |
| gr.Markdown("### Results") | |
| video_output = gr.Video(label="Annotated Video") | |
| video_result = gr.Textbox(label="Processing Results", lines=15) | |
| process_video_btn.click( | |
| fn=process_webcam_video, | |
| inputs=[video_input, video_confidence, smoothing_strategy, smoothing_method], | |
| outputs=[video_output, video_result] | |
| ) | |
| # A12 Video Pipeline Tab | |
| with gr.TabItem("🧪 Video Pipeline"): | |
| gr.Markdown( | |
| """ | |
| ### Issue #12: App development and pipeline integration | |
| Endpoint alternative chosen: **Gradio tab inside the existing app.py**. | |
| **Input:** one video file. | |
| **Output:** annotated cut 2D video, 3D skeleton animation video, keypoints CSV, | |
| and good/bad classification JSON. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| a12_video_input = gr.Video(label="Input exercise video") | |
| a12_confidence = gr.Slider( | |
| minimum=0.0, | |
| maximum=1.0, | |
| value=0.3, | |
| step=0.05, | |
| label="Confidence threshold" | |
| ) | |
| a12_smoothing_strategy = gr.Dropdown( | |
| choices=[ | |
| "exponential", | |
| "moving_average", | |
| "gaussian", | |
| "median", | |
| "savitzky_golay", | |
| "kalman", | |
| "spline", | |
| "hybrid" | |
| ], | |
| value="exponential", | |
| label="Smoothing strategy", | |
| ) | |
| a12_smoothing_method = gr.Dropdown( | |
| choices=["zscore", "velocity", "none"], | |
| value="zscore", | |
| label="Outlier detection method", | |
| ) | |
| a12_run_btn = gr.Button("Run A12 pipeline", variant="primary") | |
| with gr.Column(): | |
| #a12_video_output = gr.Video(label="Annotated cut 2D video") | |
| a12_animation_output = gr.Video(label="3D Skeleton Animation") | |
| a12_keypoints_file = gr.File(label="3D joint CSV") | |
| a12_json_output = gr.JSON(label="Structured output") | |
| a12_summary = gr.Markdown() | |
| a12_run_btn.click( | |
| fn=run_a12_video_tab, | |
| inputs=[ | |
| a12_video_input, | |
| a12_confidence, | |
| a12_smoothing_strategy, | |
| a12_smoothing_method | |
| ], | |
| outputs=[ | |
| a12_animation_output, | |
| a12_keypoints_file, | |
| a12_json_output, | |
| a12_summary | |
| ], | |
| ) | |
| # Exercise pipeline A14 | |
| with gr.TabItem("Exercise Analysis (A14)"): | |
| gr.Markdown( | |
| """ | |
| ## A14: Advanced Exercise Pipeline | |
| **Features:** Automated 'Ugly' recording rejection + 'Good/Bad' form classification. | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| a14_input_video = gr.Video(label="Upload Exercise Video") | |
| a14_threshold = gr.Slider( | |
| minimum=0.1, maximum=0.9, value=0.6, step=0.05, | |
| label="Recording Quality Threshold" | |
| ) | |
| a14_run_btn = gr.Button("Run Full Analysis", variant="primary") | |
| with gr.Column(): | |
| # High-visibility results | |
| with gr.Row(): | |
| a14_rec_status = gr.Textbox(label="Recording Status", interactive=False) | |
| a14_exercise_quality = gr.Label(label="Exercise quality") | |
| a14_3d_output = gr.Video(label="3D Skeleton Animation") | |
| a14_json_output = gr.JSON(label="Full Metadata") | |
| # Link the button to the bridge function | |
| a14_run_btn.click( | |
| fn=run_a14_pipeline, | |
| inputs=[a14_input_video, a14_threshold], | |
| outputs=[ | |
| a14_3d_output, | |
| a14_rec_status, | |
| a14_exercise_quality, | |
| a14_json_output | |
| ] | |
| ) | |
| # Example section | |
| with gr.Accordion("ℹ️ Information", open=False): | |
| gr.Markdown(""" | |
| ### Features | |
| - **Single Image Processing**: Upload and process static images | |
| - **Video Processing**: Upload video files for pose estimation | |
| - **17 COCO Keypoints**: Detects nose, eyes, ears, shoulders, elbows, wrists, hips, knees, and ankles | |
| - **Confidence Threshold**: Adjust detection sensitivity | |
| - **CSV/JSON Export**: Download pose data for further analysis | |
| ### Model Details | |
| - Model: MoveNet SinglePose (Lightning) | |
| - Input size: 192x192 pixels | |
| - Fast and efficient real-time pose estimation | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |