File size: 3,808 Bytes
015f0f2
f7c47ba
c5fba2e
af17b79
 
8a40a91
f5ffff9
 
f7c47ba
956cfce
f7c47ba
a55051b
c5fba2e
 
 
 
 
af17b79
 
f5ffff9
c5fba2e
9a13995
 
 
 
 
f5ffff9
015f0f2
c5fba2e
015f0f2
 
 
956cfce
015f0f2
f5ffff9
c5fba2e
f5ffff9
015f0f2
f5ffff9
9a13995
f5ffff9
015f0f2
c5fba2e
a55051b
015f0f2
9a13995
956cfce
9a13995
 
 
 
 
 
015f0f2
 
 
956cfce
9a13995
 
 
 
 
 
015f0f2
af17b79
956cfce
015f0f2
956cfce
9a13995
 
 
 
 
 
015f0f2
c5fba2e
af17b79
c5fba2e
 
af17b79
9a13995
 
 
 
c5fba2e
9a13995
015f0f2
c5fba2e
015f0f2
 
 
 
 
 
 
9a13995
015f0f2
 
f5ffff9
015f0f2
f5ffff9
015f0f2
af17b79
015f0f2
af17b79
9a13995
 
015f0f2
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import gradio as gr
import cv2
import mediapipe as mp
import torch
import numpy as np
import tempfile
from transformers import pipeline
from PIL import Image

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose

# Hugging Face pretrained model for action recognition
action_model = pipeline(
    "image-classification",
    model="rvv-karma/Human-Action-Recognition-VIT-Base-patch16-224"
)

def detect_pose_and_activity(video_file):
    """
    Process the uploaded video to detect human poses and classify activity.
    Optimizations:
      - Skip frames
      - Resize frames
      - Batch action prediction
    Returns annotated video and predicted action.
    """
    try:
        # Save uploaded video temporarily
        temp_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4")
        temp_video.write(open(video_file, "rb").read())
        temp_video.close()

        cap = cv2.VideoCapture(temp_video.name)
        if not cap.isOpened():
            return None, "Error: Could not open video."

        fps = cap.get(cv2.CAP_PROP_FPS)
        if fps == 0:
            fps = 30

        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        max_frames = int(min(total_frames/fps, 10) * fps)  # limit 10s

        output_frames = []
        pil_frames_for_model = []

        frame_skip = 2  # process every 2nd frame
        target_size = (224, 224)  # Resize for faster inference

        with mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
            frame_index = 0
            while frame_index < max_frames:
                ret, frame = cap.read()
                if not ret:
                    break

                # Resize frame for speed
                frame_small = cv2.resize(frame, target_size)
                image_rgb = cv2.cvtColor(frame_small, cv2.COLOR_BGR2RGB)

                # Pose detection on full frame
                results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
                if results.pose_landmarks:
                    mp.solutions.drawing_utils.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

                output_frames.append(frame)

                # Only process every Nth frame for action prediction
                if frame_index % frame_skip == 0:
                    pil_image = Image.fromarray(image_rgb)
                    pil_frames_for_model.append(pil_image)

                frame_index += 1

        cap.release()

        if len(output_frames) == 0:
            return None, "Error: No frames to process."

        # Batch prediction
        preds = action_model(pil_frames_for_model)
        action_labels = [pred['label'] for pred in preds]

        # Take the most frequent predicted action
        final_action = max(set(action_labels), key=action_labels.count)

        # Save annotated video
        output_file = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
        height, width, _ = output_frames[0].shape
        out = cv2.VideoWriter(output_file, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))
        for f in output_frames:
            out.write(f)
        out.release()

        return output_file, f"Predicted Action: {final_action}"

    except Exception as e:
        return None, f"Runtime Error: {str(e)}"

# Gradio Interface
iface = gr.Interface(
    fn=detect_pose_and_activity,
    inputs=gr.Video(label="Upload a Video (max 10s)"),
    outputs=[gr.Video(label="Pose Detection Output"), gr.Textbox(label="Detected Action")],
    title="Human Pose & Activity Recognition (Optimized)",
    description="Upload a short video (max 10s). The app detects human poses and predicts the activity quickly using frame skipping, resizing, and batch predictions."
)

iface.launch()