Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import cv2 | |
| import mediapipe as mp | |
| import torch | |
| import numpy as np | |
| import tempfile | |
| from transformers import pipeline | |
| from PIL import Image | |
| # Initialize MediaPipe Pose | |
| mp_pose = mp.solutions.pose | |
| # Hugging Face pretrained model for action recognition | |
| action_model = pipeline( | |
| "image-classification", | |
| model="rvv-karma/Human-Action-Recognition-VIT-Base-patch16-224" | |
| ) | |
| def detect_pose_and_activity(video_file): | |
| """ | |
| Process the uploaded video to detect human poses and classify activity. | |
| Video is limited to 10 seconds. Returns annotated video and predicted action. | |
| """ | |
| try: | |
| # Save uploaded video temporarily | |
| temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") | |
| temp_video.write(open(video_file, "rb").read()) | |
| temp_video.close() | |
| cap = cv2.VideoCapture(temp_video.name) | |
| if not cap.isOpened(): | |
| return None, "Error: Could not open video." | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| if fps == 0: | |
| fps = 30 # fallback | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| max_frames = int(min(total_frames/fps, 10) * fps) # limit 10s | |
| output_frames = [] | |
| action_predictions = [] | |
| # Process frames | |
| with mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5) as pose: | |
| for _ in range(max_frames): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Pose detection | |
| image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| results = pose.process(image_rgb) | |
| if results.pose_landmarks: | |
| mp.solutions.drawing_utils.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS) | |
| output_frames.append(frame) | |
| # Convert frame to PIL image for Hugging Face model | |
| pil_image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| pred = action_model(pil_image) | |
| action_predictions.append(pred[0]['label']) | |
| cap.release() | |
| if len(output_frames) == 0: | |
| return None, "Error: No frames to process." | |
| # Take the most frequent predicted action | |
| action_label = max(set(action_predictions), key=action_predictions.count) | |
| # Save annotated video | |
| output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name | |
| height, width, _ = output_frames[0].shape | |
| out = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height)) | |
| for f in output_frames: | |
| out.write(f) | |
| out.release() | |
| return output_file, f"Predicted Action: {action_label}" | |
| except Exception as e: | |
| return None, f"Runtime Error: {str(e)}" | |
| # Gradio Interface | |
| iface = gr.Interface( | |
| fn=detect_pose_and_activity, | |
| inputs=gr.Video(label="Upload a Video (max 10s)"), | |
| outputs=[gr.Video(label="Pose Detection Output"), gr.Textbox(label="Detected Action")], | |
| title="Human Pose & Activity Recognition", | |
| description="Upload a short video (max 10s). The app detects human poses and predicts the activity (e.g., dancing, cycling, running)." | |
| ) | |
| iface.launch() |