File size: 6,239 Bytes
a913145
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import numpy as np
from ultralytics import YOLO
import cv2
import mediapipe as mp
import subprocess
import time

# Load YOLO Pose Detection model
model = YOLO("yolov8n-pose.pt", verbose=False)  # Adjust for accuracy if needed

# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5)

def detect_activity(keypoints, face_landmarks, previous_keypoints=None):
    """
    Detect activity based on pose keypoints and facial landmarks.
    :param keypoints: A numpy array of shape (16, 2) representing (x, y) keypoints.
    :param face_landmarks: A list of facial landmarks for detecting lip movement.
    :param previous_keypoints: A numpy array of shape (16, 2) representing keypoints from the previous frame.
    :return: Detected activity as a string.
    """
    def distance(pt1, pt2):
        return np.linalg.norm(np.array(pt1) - np.array(pt2))

    # Keypoint indices
    NOSE, L_SHOULDER, R_SHOULDER, L_HIP, R_HIP, L_KNEE, R_KNEE, L_ANKLE, R_ANKLE, L_WRIST, R_WRIST = (
        0, 5, 6, 11, 12, 13, 14, 15, 16, 9, 10
    )

    if keypoints is None or len(keypoints) == 0:
        return "None"

    # Extract important keypoints
    nose = keypoints[NOSE]
    left_shoulder = keypoints[L_SHOULDER]
    right_shoulder = keypoints[R_SHOULDER]
    left_hip = keypoints[L_HIP]
    right_hip = keypoints[R_HIP]
    left_knee = keypoints[L_KNEE]
    right_knee = keypoints[R_KNEE]
    left_ankle = keypoints[L_ANKLE]
    right_ankle = keypoints[R_ANKLE]
    left_wrist = keypoints[L_WRIST]
    right_wrist = keypoints[R_WRIST]

    # Calculate distances and movement metrics
    torso_length = distance(nose, (left_hip + right_hip) / 2)
    arm_movement = distance(left_wrist, left_shoulder) + distance(right_wrist, right_shoulder)
    leg_movement = distance(left_knee, left_ankle) + distance(right_knee, right_ankle)
    total_movement = arm_movement + leg_movement

    # Detect mouth movement for talking
    is_talking = False
    if face_landmarks:
        upper_lip = face_landmarks[13]  # MediaPipe index for upper lip
        lower_lip = face_landmarks[14]  # MediaPipe index for lower lip
        lip_distance = distance(upper_lip, lower_lip)
        is_talking = lip_distance > 5  # Define a suitable threshold

    # Detect activities
    if is_talking:
        return "Talking"
    elif total_movement > torso_length * 1.2:  # Detect dancing based on large synchronized movement
        return "Dancing"
    elif leg_movement > torso_length * 0.3:
        return "Running"
    elif arm_movement < torso_length * 0.2 and leg_movement < torso_length * 0.1:
        return "Standing"
    elif leg_movement > 0.1 and leg_movement < torso_length * 0.3:
        return "Walking"
    else:
        return "Other Activity"

def process_gif(gif_path, confidence_score):
    """
    Detect keypoints in a GIF and classify activities.
    :param gif_path: Path to the input GIF.
    """
    cap = cv2.VideoCapture(gif_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    output_path = f"annotated_{gif_path}"
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    previous_keypoints = None
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Perform pose detection
        results = model.predict(source=frame, conf=0.5, save=False, verbose=False)
        if results is None or len(results) == 0 or not hasattr(results[0], 'keypoints') or results[0].keypoints is None:
            continue
        for result in results:
            for pose in result.keypoints.xy:  # Loop through detected people
                keypoints = np.array(pose)

                # Detect facial landmarks using MediaPipe
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                face_results = face_mesh.process(rgb_frame)

                face_landmarks = []
                if face_results.multi_face_landmarks:
                    for face_landmark in face_results.multi_face_landmarks:
                        for landmark in face_landmark.landmark:
                            x = int(landmark.x * frame.shape[1])
                            y = int(landmark.y * frame.shape[0])
                            face_landmarks.append((x, y))

                activity = detect_activity(keypoints, face_landmarks, previous_keypoints)

                # Annotate the activity on the frame
                cv2.putText(frame, activity, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)

                # Draw keypoints
                for x, y in keypoints:
                    cv2.circle(frame, (int(x), int(y)), 5, (255, 0, 0), -1)

                # Draw facial landmarks
                for x, y in face_landmarks:
                    cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)

                previous_keypoints = keypoints  # Update for temporal analysis

        out.write(frame)
        # cv2.imshow("Pose Activity Detection", frame)
        if cv2.waitKey(1) & 0xFF == ord('q'):  # Press 'q' to quit
            break

    out.release()
    repaired_path = f"repaired_{output_path}"

    # Define the ffmpeg command
    command = [
        'ffmpeg', '-y',
        '-i', output_path,
        '-c:v', 'libx264',
        '-c:a', 'aac',
        repaired_path
    ]

    if retry_file_access(output_path):
        # Run the command
        try:
            subprocess.run(command, check=True)
            print("Video processed successfully")
        except subprocess.CalledProcessError as e:
            print(f"Error occurred: {e}")

    return repaired_path

def retry_file_access(file_path, retries=3, delay=2):
    for i in range(retries):
        try:
            # Try accessing the file
            with open(file_path, 'rb'):
                return True
        except IOError:
            print(f"File is not ready yet. Retrying... {i+1}/{retries}")
            time.sleep(delay)
    print("File is not accessible after multiple retries.")
    return False