|
|
import os |
|
|
import sys |
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
from utils import suppress_warnings |
|
|
|
|
|
import time |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import torch |
|
|
from PIL import Image |
|
|
from torchvision.transforms import Compose, Resize, Pad, ToTensor, Normalize, Lambda |
|
|
import cv2 |
|
|
import logging |
|
|
from sklearn.linear_model import LogisticRegression |
|
|
from scipy.stats import mode |
|
|
from numpy.lib.stride_tricks import sliding_window_view |
|
|
|
|
|
from .model_loader import ModelLoader |
|
|
from utils.mediapipe_extractor import MediaPipeExtractor |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class VideoProcessor: |
|
|
def __init__(self, weights_dir="models/weights"): |
|
|
"""Initialize video processor with models""" |
|
|
self.SEQ_LEN = 32 |
|
|
self.OVERLAP_TEST = 0 |
|
|
self.IMG_SIZE = 112 |
|
|
|
|
|
self.ACTIONS = [ |
|
|
"barbell biceps curl","lateral raise","push-up","bench press", |
|
|
"chest fly machine","deadlift","decline bench press","hammer curl", |
|
|
"hip thrust","incline bench press","lat pulldown","leg extension", |
|
|
"leg raises","plank","pull Up","romanian deadlift","russian twist", |
|
|
"shoulder press","squat","t bar row","tricep Pushdown","tricep dips" |
|
|
] |
|
|
|
|
|
|
|
|
logger.info("Initializing VideoProcessor...") |
|
|
self.model_loader = ModelLoader(weights_dir) |
|
|
self.mediapipe_extractor = MediaPipeExtractor() |
|
|
|
|
|
|
|
|
self.video_transforms = self._make_video_transforms() |
|
|
|
|
|
|
|
|
self.meta_learner = None |
|
|
|
|
|
logger.info("VideoProcessor initialized successfully") |
|
|
|
|
|
def _make_video_transforms(self): |
|
|
"""Create video transforms for Swin3D""" |
|
|
def pad_to_square(img: Image.Image) -> Image.Image: |
|
|
w, h = img.size |
|
|
m = max(w, h) |
|
|
pad = ((m-w)//2, (m-h)//2, m-w-(m-w)//2, m-h-(m-h)//2) |
|
|
return Pad(pad, fill=0)(img) |
|
|
|
|
|
return Compose([ |
|
|
Lambda(pad_to_square), |
|
|
Resize((self.IMG_SIZE, self.IMG_SIZE)), |
|
|
ToTensor(), |
|
|
Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)) |
|
|
]) |
|
|
|
|
|
def _sliding_windows(self, seq, seq_len, overlap): |
|
|
"""Generate overlapping windows with padding""" |
|
|
step = seq_len - overlap |
|
|
windows = [] |
|
|
for start in range(0, len(seq), step): |
|
|
end = start + seq_len |
|
|
if end <= len(seq): |
|
|
windows.append(seq[start:end]) |
|
|
else: |
|
|
|
|
|
part = seq[start:] |
|
|
if len(part) > 0: |
|
|
pad = np.repeat(part[-1:], seq_len - len(part), axis=0) |
|
|
windows.append(np.vstack([part, pad])) |
|
|
return windows |
|
|
|
|
|
def _get_windows_numpy(self, data: np.ndarray, seq_len: int, overlap: int): |
|
|
"""Generate windows using sliding_window_view for angle branch""" |
|
|
step = seq_len - overlap |
|
|
L, D = data.shape |
|
|
n_w = int(np.ceil((L-overlap)/step)) |
|
|
pad_len = n_w*step + overlap - L |
|
|
if pad_len > 0: |
|
|
pad = np.repeat(data[-1:], pad_len, axis=0) |
|
|
data = np.vstack([data, pad]) |
|
|
win = sliding_window_view(data, window_shape=seq_len, axis=0) |
|
|
return win[::step] |
|
|
|
|
|
def _process_video_for_swin3d(self, video_path): |
|
|
"""Process video for Swin3D model""" |
|
|
import av |
|
|
|
|
|
logger.info("Processing video for Swin3D...") |
|
|
|
|
|
container = av.open(video_path) |
|
|
stream = container.streams.video[0] |
|
|
|
|
|
|
|
|
frames = [] |
|
|
for frame in container.decode(video=0): |
|
|
frames.append(frame.to_image()) |
|
|
container.close() |
|
|
|
|
|
|
|
|
Xs = [] |
|
|
step = self.SEQ_LEN |
|
|
for start_f in range(0, len(frames), step): |
|
|
window_frames = frames[start_f:start_f + self.SEQ_LEN] |
|
|
|
|
|
|
|
|
if len(window_frames) < self.SEQ_LEN: |
|
|
if window_frames: |
|
|
last = window_frames[-1] |
|
|
window_frames += [last.copy() for _ in range(self.SEQ_LEN - len(window_frames))] |
|
|
else: |
|
|
blank = Image.new("RGB", (self.IMG_SIZE, self.IMG_SIZE)) |
|
|
window_frames = [blank] * self.SEQ_LEN |
|
|
|
|
|
|
|
|
proc = [self.video_transforms(img) for img in window_frames] |
|
|
video = torch.stack(proc, dim=1) |
|
|
Xs.append(video) |
|
|
|
|
|
return Xs |
|
|
|
|
|
def _get_top_predictions(self, probabilities, top_k=3): |
|
|
"""Get top-k predictions with probabilities""" |
|
|
top_indices = np.argsort(probabilities)[-top_k:][::-1] |
|
|
top_predictions = [] |
|
|
|
|
|
for idx in top_indices: |
|
|
top_predictions.append({ |
|
|
'exercise': self.ACTIONS[idx], |
|
|
'probability': float(probabilities[idx]) |
|
|
}) |
|
|
|
|
|
return top_predictions |
|
|
|
|
|
def process_video(self, video_path): |
|
|
""" |
|
|
Main function to process video and return ensemble results |
|
|
""" |
|
|
logger.info(f"Processing video: {video_path}") |
|
|
start_time = time.time() |
|
|
|
|
|
|
|
|
processing_times = {} |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info("Extracting pose landmarks...") |
|
|
mediapipe_start = time.time() |
|
|
df = self.mediapipe_extractor.extract_from_video(video_path) |
|
|
processing_times['mediapipe'] = time.time() - mediapipe_start |
|
|
|
|
|
if len(df) == 0: |
|
|
raise ValueError("No frames could be processed from video") |
|
|
|
|
|
|
|
|
results = { |
|
|
'individual_predictions': {}, |
|
|
'ensemble_predictions': None, |
|
|
'processing_times': processing_times, |
|
|
'windows_processed': 0 |
|
|
} |
|
|
|
|
|
available_models = self.model_loader.get_available_models() |
|
|
all_probabilities = [] |
|
|
|
|
|
|
|
|
if 'stgcn' in available_models: |
|
|
logger.info("Processing with ST-GCN...") |
|
|
stgcn_start = time.time() |
|
|
|
|
|
|
|
|
stgcn_data = self.mediapipe_extractor.process_landmarks_for_stgcn(df) |
|
|
|
|
|
|
|
|
windows = self._sliding_windows(stgcn_data, self.SEQ_LEN, self.OVERLAP_TEST) |
|
|
if windows: |
|
|
X_stgcn = np.stack(windows, axis=0) |
|
|
|
|
|
|
|
|
proba_stgcn = self.model_loader.predict_stgcn(X_stgcn) |
|
|
if proba_stgcn is not None: |
|
|
|
|
|
avg_proba = np.mean(proba_stgcn, axis=0) |
|
|
results['individual_predictions']['stgcn'] = self._get_top_predictions(avg_proba) |
|
|
all_probabilities.append(avg_proba) |
|
|
results['windows_processed'] = len(windows) |
|
|
else: |
|
|
logger.warning("ST-GCN prediction failed - using demo predictions") |
|
|
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0] |
|
|
results['individual_predictions']['stgcn'] = self._get_top_predictions(demo_proba) |
|
|
|
|
|
processing_times['stgcn'] = time.time() - stgcn_start |
|
|
|
|
|
|
|
|
if 'transformer_12rel' in available_models: |
|
|
logger.info("Processing with Transformer 12rel...") |
|
|
transformer_start = time.time() |
|
|
|
|
|
|
|
|
transformer_data = self.mediapipe_extractor.process_landmarks_for_transformer_12rel(df) |
|
|
|
|
|
|
|
|
windows = self._sliding_windows(transformer_data, self.SEQ_LEN, self.OVERLAP_TEST) |
|
|
if windows: |
|
|
X_transformer = np.stack(windows, axis=0) |
|
|
|
|
|
|
|
|
proba_transformer = self.model_loader.predict_transformer_12rel(X_transformer) |
|
|
if proba_transformer is not None: |
|
|
|
|
|
avg_proba = np.mean(proba_transformer, axis=0) |
|
|
results['individual_predictions']['transformer_12rel'] = self._get_top_predictions(avg_proba) |
|
|
all_probabilities.append(avg_proba) |
|
|
else: |
|
|
logger.warning("Transformer 12rel prediction failed - using demo predictions") |
|
|
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0] |
|
|
results['individual_predictions']['transformer_12rel'] = self._get_top_predictions(demo_proba) |
|
|
|
|
|
processing_times['transformer_12rel'] = time.time() - transformer_start |
|
|
|
|
|
|
|
|
if 'transformer_angle' in available_models: |
|
|
logger.info("Processing with Transformer angle branch...") |
|
|
angle_start = time.time() |
|
|
|
|
|
|
|
|
rel_data, angle_data = self.mediapipe_extractor.process_landmarks_for_transformer_angle(df) |
|
|
|
|
|
|
|
|
rel_windows = self._get_windows_numpy(rel_data, self.SEQ_LEN, self.OVERLAP_TEST) |
|
|
angle_windows = self._get_windows_numpy(angle_data, self.SEQ_LEN, self.OVERLAP_TEST) |
|
|
|
|
|
if len(rel_windows) > 0 and len(angle_windows) > 0: |
|
|
|
|
|
min_windows = min(len(rel_windows), len(angle_windows)) |
|
|
X_rel = rel_windows[:min_windows] |
|
|
X_ang = angle_windows[:min_windows] |
|
|
|
|
|
|
|
|
if X_rel.ndim == 3 and X_rel.shape[1] != self.SEQ_LEN: |
|
|
X_rel = X_rel.transpose(0, 2, 1) |
|
|
if X_ang.ndim == 3 and X_ang.shape[1] != self.SEQ_LEN: |
|
|
X_ang = X_ang.transpose(0, 2, 1) |
|
|
|
|
|
|
|
|
proba_angle = self.model_loader.predict_transformer_angle(X_rel, X_ang) |
|
|
if proba_angle is not None: |
|
|
|
|
|
avg_proba = np.mean(proba_angle, axis=0) |
|
|
results['individual_predictions']['transformer_angle'] = self._get_top_predictions(avg_proba) |
|
|
all_probabilities.append(avg_proba) |
|
|
else: |
|
|
logger.warning("Transformer angle prediction failed - using demo predictions") |
|
|
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0] |
|
|
results['individual_predictions']['transformer_angle'] = self._get_top_predictions(demo_proba) |
|
|
|
|
|
processing_times['transformer_angle'] = time.time() - angle_start |
|
|
|
|
|
|
|
|
if 'swin3d' in available_models: |
|
|
logger.info("Processing with Swin3D...") |
|
|
swin3d_start = time.time() |
|
|
|
|
|
try: |
|
|
|
|
|
video_windows = self._process_video_for_swin3d(video_path) |
|
|
|
|
|
if video_windows: |
|
|
|
|
|
proba_swin3d = self.model_loader.predict_swin3d(video_windows) |
|
|
if proba_swin3d is not None: |
|
|
|
|
|
avg_proba = np.mean(proba_swin3d, axis=0) |
|
|
results['individual_predictions']['swin3d'] = self._get_top_predictions(avg_proba) |
|
|
all_probabilities.append(avg_proba) |
|
|
else: |
|
|
logger.warning("Swin3D prediction failed - using demo predictions") |
|
|
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0] |
|
|
results['individual_predictions']['swin3d'] = self._get_top_predictions(demo_proba) |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Swin3D processing failed: {str(e)}") |
|
|
results['individual_predictions']['swin3d'] = None |
|
|
|
|
|
processing_times['swin3d'] = time.time() - swin3d_start |
|
|
|
|
|
|
|
|
if all_probabilities: |
|
|
logger.info("Computing ensemble predictions...") |
|
|
|
|
|
|
|
|
ensemble_proba = np.mean(all_probabilities, axis=0) |
|
|
results['ensemble_predictions'] = self._get_top_predictions(ensemble_proba) |
|
|
|
|
|
logger.info(f"Ensemble top prediction: {results['ensemble_predictions'][0]['exercise']} " |
|
|
f"({results['ensemble_predictions'][0]['probability']:.3f})") |
|
|
else: |
|
|
logger.warning("No model predictions available - generating demo predictions") |
|
|
|
|
|
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0] |
|
|
results['ensemble_predictions'] = self._get_top_predictions(demo_proba) |
|
|
logger.info("Demo mode: Generated random predictions for testing") |
|
|
|
|
|
|
|
|
for model_name in ['stgcn', 'transformer_12rel', 'transformer_angle', 'swin3d']: |
|
|
if model_name not in results['individual_predictions']: |
|
|
|
|
|
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0] |
|
|
results['individual_predictions'][model_name] = self._get_top_predictions(demo_proba) |
|
|
logger.info(f"Generated demo predictions for {model_name}") |
|
|
if model_name not in processing_times: |
|
|
processing_times[model_name] = 0.0 |
|
|
|
|
|
total_time = time.time() - start_time |
|
|
logger.info(f"Video processing completed in {total_time:.2f}s") |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing video: {str(e)}") |
|
|
raise |
|
|
|
|
|
finally: |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
def cleanup(self): |
|
|
"""Clean up resources""" |
|
|
logger.info("Cleaning up VideoProcessor...") |
|
|
if hasattr(self, 'model_loader'): |
|
|
self.model_loader.cleanup() |
|
|
if hasattr(self, 'mediapipe_extractor'): |
|
|
self.mediapipe_extractor.cleanup() |