|
|
import numpy as np |
|
|
from ultralytics import YOLO |
|
|
import cv2 |
|
|
import mediapipe as mp |
|
|
import subprocess |
|
|
import time |
|
|
|
|
|
|
|
|
model = YOLO("yolov8n-pose.pt", verbose=False) |
|
|
|
|
|
|
|
|
mp_face_mesh = mp.solutions.face_mesh |
|
|
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5) |
|
|
|
|
|
def detect_activity(keypoints, face_landmarks, previous_keypoints=None): |
|
|
""" |
|
|
Detect activity based on pose keypoints and facial landmarks. |
|
|
:param keypoints: A numpy array of shape (16, 2) representing (x, y) keypoints. |
|
|
:param face_landmarks: A list of facial landmarks for detecting lip movement. |
|
|
:param previous_keypoints: A numpy array of shape (16, 2) representing keypoints from the previous frame. |
|
|
:return: Detected activity as a string. |
|
|
""" |
|
|
def distance(pt1, pt2): |
|
|
return np.linalg.norm(np.array(pt1) - np.array(pt2)) |
|
|
|
|
|
|
|
|
NOSE, L_SHOULDER, R_SHOULDER, L_HIP, R_HIP, L_KNEE, R_KNEE, L_ANKLE, R_ANKLE, L_WRIST, R_WRIST = ( |
|
|
0, 5, 6, 11, 12, 13, 14, 15, 16, 9, 10 |
|
|
) |
|
|
|
|
|
if keypoints is None or len(keypoints) == 0: |
|
|
return "None" |
|
|
|
|
|
|
|
|
nose = keypoints[NOSE] |
|
|
left_shoulder = keypoints[L_SHOULDER] |
|
|
right_shoulder = keypoints[R_SHOULDER] |
|
|
left_hip = keypoints[L_HIP] |
|
|
right_hip = keypoints[R_HIP] |
|
|
left_knee = keypoints[L_KNEE] |
|
|
right_knee = keypoints[R_KNEE] |
|
|
left_ankle = keypoints[L_ANKLE] |
|
|
right_ankle = keypoints[R_ANKLE] |
|
|
left_wrist = keypoints[L_WRIST] |
|
|
right_wrist = keypoints[R_WRIST] |
|
|
|
|
|
|
|
|
torso_length = distance(nose, (left_hip + right_hip) / 2) |
|
|
arm_movement = distance(left_wrist, left_shoulder) + distance(right_wrist, right_shoulder) |
|
|
leg_movement = distance(left_knee, left_ankle) + distance(right_knee, right_ankle) |
|
|
total_movement = arm_movement + leg_movement |
|
|
|
|
|
|
|
|
is_talking = False |
|
|
if face_landmarks: |
|
|
upper_lip = face_landmarks[13] |
|
|
lower_lip = face_landmarks[14] |
|
|
lip_distance = distance(upper_lip, lower_lip) |
|
|
is_talking = lip_distance > 5 |
|
|
|
|
|
|
|
|
if is_talking: |
|
|
return "Talking" |
|
|
elif total_movement > torso_length * 1.2: |
|
|
return "Dancing" |
|
|
elif leg_movement > torso_length * 0.3: |
|
|
return "Running" |
|
|
elif arm_movement < torso_length * 0.2 and leg_movement < torso_length * 0.1: |
|
|
return "Standing" |
|
|
elif leg_movement > 0.1 and leg_movement < torso_length * 0.3: |
|
|
return "Walking" |
|
|
else: |
|
|
return "Other Activity" |
|
|
|
|
|
def process_gif(gif_path, confidence_score): |
|
|
""" |
|
|
Detect keypoints in a GIF and classify activities. |
|
|
:param gif_path: Path to the input GIF. |
|
|
""" |
|
|
cap = cv2.VideoCapture(gif_path) |
|
|
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
fps = int(cap.get(cv2.CAP_PROP_FPS)) |
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
|
|
output_path = f"annotated_{gif_path}" |
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) |
|
|
|
|
|
previous_keypoints = None |
|
|
while cap.isOpened(): |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
|
|
|
results = model.predict(source=frame, conf=0.5, save=False, verbose=False) |
|
|
if results is None or len(results) == 0 or not hasattr(results[0], 'keypoints') or results[0].keypoints is None: |
|
|
continue |
|
|
for result in results: |
|
|
for pose in result.keypoints.xy: |
|
|
keypoints = np.array(pose) |
|
|
|
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
face_results = face_mesh.process(rgb_frame) |
|
|
|
|
|
face_landmarks = [] |
|
|
if face_results.multi_face_landmarks: |
|
|
for face_landmark in face_results.multi_face_landmarks: |
|
|
for landmark in face_landmark.landmark: |
|
|
x = int(landmark.x * frame.shape[1]) |
|
|
y = int(landmark.y * frame.shape[0]) |
|
|
face_landmarks.append((x, y)) |
|
|
|
|
|
activity = detect_activity(keypoints, face_landmarks, previous_keypoints) |
|
|
|
|
|
|
|
|
cv2.putText(frame, activity, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA) |
|
|
|
|
|
|
|
|
for x, y in keypoints: |
|
|
cv2.circle(frame, (int(x), int(y)), 5, (255, 0, 0), -1) |
|
|
|
|
|
|
|
|
for x, y in face_landmarks: |
|
|
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1) |
|
|
|
|
|
previous_keypoints = keypoints |
|
|
|
|
|
out.write(frame) |
|
|
|
|
|
if cv2.waitKey(1) & 0xFF == ord('q'): |
|
|
break |
|
|
|
|
|
out.release() |
|
|
repaired_path = f"repaired_{output_path}" |
|
|
|
|
|
|
|
|
command = [ |
|
|
'ffmpeg', '-y', |
|
|
'-i', output_path, |
|
|
'-c:v', 'libx264', |
|
|
'-c:a', 'aac', |
|
|
repaired_path |
|
|
] |
|
|
|
|
|
if retry_file_access(output_path): |
|
|
|
|
|
try: |
|
|
subprocess.run(command, check=True) |
|
|
print("Video processed successfully") |
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"Error occurred: {e}") |
|
|
|
|
|
return repaired_path |
|
|
|
|
|
def retry_file_access(file_path, retries=3, delay=2): |
|
|
for i in range(retries): |
|
|
try: |
|
|
|
|
|
with open(file_path, 'rb'): |
|
|
return True |
|
|
except IOError: |
|
|
print(f"File is not ready yet. Retrying... {i+1}/{retries}") |
|
|
time.sleep(delay) |
|
|
print("File is not accessible after multiple retries.") |
|
|
return False |
|
|
|