from PIL import Image import gradio as gr from A8.pose_estimator import MoveNetPoseEstimator from A12.pose_interpolator import smooth_pose_sequence #http://127.0.0.1:7860from A12.service.ui import run_a12_tab from A12.service.ui import run_a12_video_tab from exercise_pipeline import ExercisePipeline import json import csv import os from pathlib import Path from datetime import datetime from typing import Dict, List, Any, Optional import numpy as np import cv2 import tempfile import time # Initialize MoveNet pose estimator pose_estimator = MoveNetPoseEstimator(model_name='lightning') # COCO Keypoint definitions (17 keypoints) KEYPOINT_NAMES = [ 'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear', 'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist', 'left_hip', 'right_hip', 'left_knee', 'right_knee', 'left_ankle', 'right_ankle' ] def extract_joint_positions_from_movenet(pose_result: Dict[str, Any]) -> Dict[str, Any]: """Extract joint positions from MoveNet pose result.""" keypoints = pose_result.get('keypoints', {}) all_keypoints = [] for joint_name in KEYPOINT_NAMES: kp = keypoints.get(joint_name, {}) x = kp.get('x') y = kp.get('y') score = kp.get('confidence') all_keypoints.append({ "x": x, "y": y, "score": score, "name": joint_name }) return { "poses": [{ "pose_id": 0, "total_score": 0.0, "total_parts": len([k for k in all_keypoints if k['x'] is not None]), "keypoints": all_keypoints }], "timestamp": datetime.now().isoformat(), "joint_names": KEYPOINT_NAMES, "inference_time_ms": pose_result.get('inference_time_ms', 0) } def save_to_csv(joint_data: Dict[str, Any], filename: str = None) -> str: """Save joint positions to CSV file.""" if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"pose_data_{timestamp}.csv" filepath = os.path.join("pose_outputs", filename) os.makedirs("pose_outputs", exist_ok=True) with open(filepath, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Pose_ID", "Joint", "X", "Y", "Confidence", "Visible"]) poses = joint_data.get("poses", []) for pose in poses: pose_id = pose.get("pose_id", 0) for kp in pose.get("keypoints", []): x = kp.get("x") y = kp.get("y") score = kp.get("score") name = kp.get("name", "Unknown") visible = "Yes" if x is not None and y is not None else "No" writer.writerow([ pose_id, name, f"{x:.2f}" if x is not None else "N/A", f"{y:.2f}" if y is not None else "N/A", f"{score:.3f}" if score is not None else "N/A", visible ]) writer.writerow([]) writer.writerow(["Timestamp", joint_data.get("timestamp", "")]) writer.writerow(["Inference_Time_ms", joint_data.get("inference_time_ms", 0)]) return filepath def save_to_json(joint_data: Dict[str, Any], filename: str = None) -> str: """Save joint positions to JSON file.""" if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"pose_data_{timestamp}.json" filepath = os.path.join("pose_outputs", filename) os.makedirs("pose_outputs", exist_ok=True) with open(filepath, 'w') as jsonfile: json.dump(joint_data, jsonfile, indent=2) return filepath def process_single_image(image: Image.Image, confidence_threshold: float = 0.3) -> tuple: """Process a single image and return annotated image with pose data.""" img_array = np.array(image.convert("RGB")) img_bgr = cv2.cvtColor(img_array, cv2.COLOR_RGB2BGR) pose_result = pose_estimator.detect_pose(img_bgr) joint_data = extract_joint_positions_from_movenet(pose_result) result_bgr = pose_estimator.draw_keypoints(img_bgr, pose_result, confidence_threshold=confidence_threshold) result_rgb = cv2.cvtColor(result_bgr, cv2.COLOR_BGR2RGB) result_image = Image.fromarray(result_rgb) csv_path = save_to_csv(joint_data) json_path = save_to_json(joint_data) joint_data["csv_path"] = csv_path joint_data["json_path"] = json_path return result_image, joint_data def process_video_frame(frame: np.ndarray, confidence_threshold: float = 0.3) -> np.ndarray: """Process a single video frame and return annotated frame.""" # Handle frame format - OpenCV videos are BGR with 3 channels # If frame has 3 channels, assume BGR. If 4 channels, convert BGRA to BGR. # If grayscale (2D), convert to BGR. if len(frame.shape) == 3: if frame.shape[2] == 3: img_bgr = frame # Already BGR elif frame.shape[2] == 4: img_bgr = cv2.cvtColor(frame, cv2.COLOR_BGRA2BGR) # Convert BGRA to BGR else: img_bgr = frame # Fallback else: img_bgr = cv2.cvtColor(frame, cv2.COLOR_GRAY2BGR) # Convert grayscale to BGR pose_result = pose_estimator.detect_pose(img_bgr) annotated_bgr = pose_estimator.draw_keypoints(img_bgr, pose_result, confidence_threshold=confidence_threshold) return annotated_bgr def format_pose_output(joint_data: Dict[str, Any]) -> str: """Format pose data for display in Gradio.""" output = "### Detected Poses\n\n" output += f"**Timestamp:** {joint_data.get('timestamp', 'N/A')}\n" output += f"**Inference Time:** {joint_data.get('inference_time_ms', 0):.2f} ms\n\n" poses = joint_data.get("poses", []) if not poses: output += "No pose data available.\n\n" else: for pose in poses: output += f"#### Pose #{pose.get('pose_id', 0)}\n" output += f"- **Total Parts:** {pose.get('total_parts', 0)}\n\n" output += "| Joint | X | Y | Confidence | Visible |\n" output += "|-------|---|---|------------|---------|\n" for kp in pose.get("keypoints", []): name = kp.get("name", "Unknown") x = kp.get("x") y = kp.get("y") score = kp.get("score") x_str = f"{x:.1f}" if x is not None else "N/A" y_str = f"{y:.1f}" if y is not None else "N/A" score_str = f"{score:.3f}" if score is not None else "N/A" visible = "Yes" if x is not None and y is not None else "No" output += f"| {name} | {x_str} | {y_str} | {score_str} | {visible} |\n" output += "\n" output += f"**CSV File:** `{joint_data.get('csv_path', 'N/A')}`\n" output += f"**JSON File:** `{joint_data.get('json_path', 'N/A')}`\n" return output def run_a14_pipeline(video_path, quality_threshold): if video_path is None: return None, "No video uploaded", "N/A", {} pipeline = ExercisePipeline(quality_threshold=quality_threshold) try: results = pipeline.process_video(video_path) finally: pipeline.close() # Handle UGLY case if results is None or results.get("pipeline_stopped"): return ( None, f"REJECTED โ€” Poor recording quality " f"(conf: {results.get('recording_confidence', 0):.2f})", "N/A", results or {} ) # Handle SUCCESS case stem = Path(video_path).stem pipeline_dir = Path(__file__).parent out_dir = pipeline_dir / "outputs" video_3d_path = out_dir / f"{stem}_skeleton.mp4" video_3d = None if video_3d_path.exists(): import shutil import tempfile tmp = tempfile.NamedTemporaryFile( suffix='.mp4', delete=False) shutil.copy(str(video_3d_path), tmp.name) video_3d = tmp.name print(f" Copied to temp: {tmp.name}") status_text = (f"ACCEPTED โ€” Recording OK " f"(conf: {results.get('recording_confidence', 0):.2f})") quality_text = (f"{results.get('quality_label', 'N/A')} " f"({results.get('quality_confidence', 0):.1%})") return ( video_3d, # 1. a14_3d_output status_text, # 2. a14_rec_status quality_text, # 3. a14_exercise_quality results # 4. a14_json_output ) def process_and_display(image: Image.Image, confidence_threshold: float = 0.3) -> tuple: """Process image and return pose output with data files.""" result, joint_data = process_single_image(image, confidence_threshold) pose_info = format_pose_output(joint_data) return result, pose_info def process_webcam_video( video_path: str, confidence_threshold: float = 0.3, smoothing_strategy: str = "exponential", smoothing_method: str = "zscore", progress=gr.Progress() ) -> tuple: """Process uploaded video with pose estimation.""" if video_path is None: return None, "No video uploaded." cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None, "Could not open video." # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) print(f"Video properties: FPS={fps}, Width={width}, Height={height}, TotalFrames={total_frames}") # Validate FPS - if it's extremely high or invalid, use a reasonable default if fps <= 0 or fps > 240: # 240 FPS is unrealistically high for normal videos print(f"Invalid FPS ({fps}), using default 30 FPS") fps = 30 else: print(f"Using FPS: {fps}") # Create output video timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_path = os.path.join("pose_outputs", f"annotated_video_{timestamp}.mp4") os.makedirs("pose_outputs", exist_ok=True) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) # Verify video writer opened successfully if not out.isOpened(): print(f"Error: Video writer failed to open. Output path: {output_path}") return None, "Failed to create output video. Please check the video format and try again." all_keypoints = [] frame_count = 0 progress(0, desc="Processing video...") while True: ret, frame = cap.read() if not ret: print(f"Frame read failed at frame {frame_count}") break # Debug: Check frame properties print(f"Frame {frame_count}: shape={frame.shape if frame is not None else None}") # Process frame annotated_frame = process_video_frame(frame, confidence_threshold) # Verify frame dimensions match video writer if annotated_frame.shape[1] != width or annotated_frame.shape[0] != height: print(f"Resizing frame from {annotated_frame.shape[1]}x{annotated_frame.shape[0]} to {width}x{height}") annotated_frame = cv2.resize(annotated_frame, (width, height)) out.write(annotated_frame) # Extract keypoints for this frame img_bgr = frame if frame.shape[2] == 3 else cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) pose_result = pose_estimator.detect_pose(img_bgr) joint_data = extract_joint_positions_from_movenet(pose_result) joint_data['frame_id'] = frame_count joint_data['timestamp'] = frame_count / fps if fps > 0 else 0 all_keypoints.append(joint_data) frame_count += 1 # Update progress if frame_count % 30 == 0: progress(frame_count / total_frames if total_frames > 0 else 0, desc=f"Processing frame {frame_count}/{total_frames if total_frames > 0 else '?'}...") cap.release() out.release() print(f"Total frames processed: {frame_count}") # Apply smoothing to the keypoints try: smoothed_keypoints = smooth_pose_sequence( all_keypoints, strategy=smoothing_strategy, outlier_method=smoothing_method, outlier_threshold=3.0, window_size=7, min_confidence=0.2, ) except Exception as e: print(f"Error applying smoothing: {e}") # Fallback to original keypoints if smoothing fails smoothed_keypoints = all_keypoints # Save smoothed keypoints to CSV csv_path = os.path.join("pose_outputs", f"video_keypoints_{timestamp}.csv") with open(csv_path, 'w', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(["Frame_ID", "Joint", "X", "Y", "Confidence", "Visible"]) for frame_data in smoothed_keypoints: frame_id = frame_data.get('frame_id', 0) for kp in frame_data['poses'][0]['keypoints']: x = kp.get('x') y = kp.get('y') score = kp.get('score') name = kp.get('name', 'Unknown') visible = "Yes" if x is not None and y is not None else "No" writer.writerow([ frame_id, name, f"{x:.2f}" if x is not None else "N/A", f"{y:.2f}" if y is not None else "N/A", f"{score:.3f}" if score is not None else "N/A", visible ]) avg_inference = np.mean([k.get('inference_time_ms', 0) for k in all_keypoints]) if all_keypoints else 0 result_text = f"""### Video Processing Complete - **Frames processed:** {frame_count} - **Average inference time:** {avg_inference:.2f} ms/frame - **Output video:** `{output_path}` - **Keypoints CSV:** `{csv_path}` """ return output_path, result_text # Gradio UI with Tabs with gr.Blocks(title="MoveNet Pose Estimation") as demo: gr.Markdown("# ๐Ÿƒ MoveNet Pose Estimation") gr.Markdown("Estimate human poses using Google's MoveNet model. Supports single images and video files.") with gr.Tabs(): # Image Processing Tab with gr.TabItem("๐Ÿ“ธ Image Processing"): with gr.Row(): with gr.Column(): gr.Markdown("### Upload Image") image_input = gr.Image(type="pil", label="Input Image") confidence_slider = gr.Slider( minimum=0.0, maximum=1.0, value=0.3, step=0.05, label="Confidence Threshold" ) process_btn = gr.Button("๐Ÿš€ Process Image", variant="primary") with gr.Column(): gr.Markdown("### Results") image_output = gr.Image(type="pil", label="Annotated Output") pose_text = gr.Textbox(label="Pose Data", lines=15) process_btn.click( fn=process_and_display, inputs=[image_input, confidence_slider], outputs=[image_output, pose_text] ) # Video Processing Tab with gr.TabItem("๐ŸŽฅ Video Processing"): with gr.Row(): with gr.Column(): gr.Markdown("### Upload Video") video_input = gr.Video(label="Input Video") video_confidence = gr.Slider( minimum=0.0, maximum=1.0, value=0.3, step=0.05, label="Confidence Threshold" ) smoothing_strategy = gr.Dropdown( choices=["exponential", "moving_average", "gaussian", "median", "savitzky_golay", "kalman", "spline", "hybrid"], value="exponential", label="Smoothing Strategy" ) smoothing_method = gr.Dropdown( choices=["zscore", "velocity", "none"], value="zscore", label="Outlier Detection Method" ) process_video_btn = gr.Button("๐ŸŽฌ Process Video", variant="primary") with gr.Column(): gr.Markdown("### Results") video_output = gr.Video(label="Annotated Video") video_result = gr.Textbox(label="Processing Results", lines=15) process_video_btn.click( fn=process_webcam_video, inputs=[video_input, video_confidence, smoothing_strategy, smoothing_method], outputs=[video_output, video_result] ) # A12 Video Pipeline Tab with gr.TabItem("๐Ÿงช Video Pipeline"): gr.Markdown( """ ### Issue #12: App development and pipeline integration Endpoint alternative chosen: **Gradio tab inside the existing app.py**. **Input:** one video file. **Output:** annotated cut 2D video, 3D skeleton animation video, keypoints CSV, and good/bad classification JSON. """ ) with gr.Row(): with gr.Column(): a12_video_input = gr.Video(label="Input exercise video") a12_confidence = gr.Slider( minimum=0.0, maximum=1.0, value=0.3, step=0.05, label="Confidence threshold" ) a12_smoothing_strategy = gr.Dropdown( choices=[ "exponential", "moving_average", "gaussian", "median", "savitzky_golay", "kalman", "spline", "hybrid" ], value="exponential", label="Smoothing strategy", ) a12_smoothing_method = gr.Dropdown( choices=["zscore", "velocity", "none"], value="zscore", label="Outlier detection method", ) a12_run_btn = gr.Button("Run A12 pipeline", variant="primary") with gr.Column(): #a12_video_output = gr.Video(label="Annotated cut 2D video") a12_animation_output = gr.Video(label="3D Skeleton Animation") a12_keypoints_file = gr.File(label="3D joint CSV") a12_json_output = gr.JSON(label="Structured output") a12_summary = gr.Markdown() a12_run_btn.click( fn=run_a12_video_tab, inputs=[ a12_video_input, a12_confidence, a12_smoothing_strategy, a12_smoothing_method ], outputs=[ a12_animation_output, a12_keypoints_file, a12_json_output, a12_summary ], ) # Exercise pipeline A14 with gr.TabItem("Exercise Analysis (A14)"): gr.Markdown( """ ## A14: Advanced Exercise Pipeline **Features:** Automated 'Ugly' recording rejection + 'Good/Bad' form classification. """ ) with gr.Row(): with gr.Column(): a14_input_video = gr.Video(label="Upload Exercise Video") a14_threshold = gr.Slider( minimum=0.1, maximum=0.9, value=0.6, step=0.05, label="Recording Quality Threshold" ) a14_run_btn = gr.Button("Run Full Analysis", variant="primary") with gr.Column(): # High-visibility results with gr.Row(): a14_rec_status = gr.Textbox(label="Recording Status", interactive=False) a14_exercise_quality = gr.Label(label="Exercise quality") a14_3d_output = gr.Video(label="3D Skeleton Animation") a14_json_output = gr.JSON(label="Full Metadata") # Link the button to the bridge function a14_run_btn.click( fn=run_a14_pipeline, inputs=[a14_input_video, a14_threshold], outputs=[ a14_3d_output, a14_rec_status, a14_exercise_quality, a14_json_output ] ) # Example section with gr.Accordion("โ„น๏ธ Information", open=False): gr.Markdown(""" ### Features - **Single Image Processing**: Upload and process static images - **Video Processing**: Upload video files for pose estimation - **17 COCO Keypoints**: Detects nose, eyes, ears, shoulders, elbows, wrists, hips, knees, and ankles - **Confidence Threshold**: Adjust detection sensitivity - **CSV/JSON Export**: Download pose data for further analysis ### Model Details - Model: MoveNet SinglePose (Lightning) - Input size: 192x192 pixels - Fast and efficient real-time pose estimation """) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)