Gym_Demo / models /video_processor.py
Cuong2004's picture
first commit
f21948d
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils import suppress_warnings
import time
import numpy as np
import pandas as pd
import torch
from PIL import Image
from torchvision.transforms import Compose, Resize, Pad, ToTensor, Normalize, Lambda
import cv2
import logging
from sklearn.linear_model import LogisticRegression
from scipy.stats import mode
from numpy.lib.stride_tricks import sliding_window_view
from .model_loader import ModelLoader
from utils.mediapipe_extractor import MediaPipeExtractor
logger = logging.getLogger(__name__)
class VideoProcessor:
def __init__(self, weights_dir="models/weights"):
"""Initialize video processor with models"""
self.SEQ_LEN = 32
self.OVERLAP_TEST = 0
self.IMG_SIZE = 112
self.ACTIONS = [
"barbell biceps curl","lateral raise","push-up","bench press",
"chest fly machine","deadlift","decline bench press","hammer curl",
"hip thrust","incline bench press","lat pulldown","leg extension",
"leg raises","plank","pull Up","romanian deadlift","russian twist",
"shoulder press","squat","t bar row","tricep Pushdown","tricep dips"
]
# Initialize components
logger.info("Initializing VideoProcessor...")
self.model_loader = ModelLoader(weights_dir)
self.mediapipe_extractor = MediaPipeExtractor()
# Initialize video transforms for Swin3D
self.video_transforms = self._make_video_transforms()
# Meta-learner for ensemble (will be simple voting for now)
self.meta_learner = None
logger.info("VideoProcessor initialized successfully")
def _make_video_transforms(self):
"""Create video transforms for Swin3D"""
def pad_to_square(img: Image.Image) -> Image.Image:
w, h = img.size
m = max(w, h)
pad = ((m-w)//2, (m-h)//2, m-w-(m-w)//2, m-h-(m-h)//2)
return Pad(pad, fill=0)(img)
return Compose([
Lambda(pad_to_square),
Resize((self.IMG_SIZE, self.IMG_SIZE)),
ToTensor(),
Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])
def _sliding_windows(self, seq, seq_len, overlap):
"""Generate overlapping windows with padding"""
step = seq_len - overlap
windows = []
for start in range(0, len(seq), step):
end = start + seq_len
if end <= len(seq):
windows.append(seq[start:end])
else:
# Pad last window
part = seq[start:]
if len(part) > 0:
pad = np.repeat(part[-1:], seq_len - len(part), axis=0)
windows.append(np.vstack([part, pad]))
return windows
def _get_windows_numpy(self, data: np.ndarray, seq_len: int, overlap: int):
"""Generate windows using sliding_window_view for angle branch"""
step = seq_len - overlap
L, D = data.shape
n_w = int(np.ceil((L-overlap)/step))
pad_len = n_w*step + overlap - L
if pad_len > 0:
pad = np.repeat(data[-1:], pad_len, axis=0)
data = np.vstack([data, pad])
win = sliding_window_view(data, window_shape=seq_len, axis=0)
return win[::step]
def _process_video_for_swin3d(self, video_path):
"""Process video for Swin3D model"""
import av
logger.info("Processing video for Swin3D...")
container = av.open(video_path)
stream = container.streams.video[0]
# Extract all frames
frames = []
for frame in container.decode(video=0):
frames.append(frame.to_image())
container.close()
# Create windows from frames
Xs = []
step = self.SEQ_LEN
for start_f in range(0, len(frames), step):
window_frames = frames[start_f:start_f + self.SEQ_LEN]
# Pad if necessary
if len(window_frames) < self.SEQ_LEN:
if window_frames:
last = window_frames[-1]
window_frames += [last.copy() for _ in range(self.SEQ_LEN - len(window_frames))]
else:
blank = Image.new("RGB", (self.IMG_SIZE, self.IMG_SIZE))
window_frames = [blank] * self.SEQ_LEN
# Transform and stack
proc = [self.video_transforms(img) for img in window_frames]
video = torch.stack(proc, dim=1) # [C, T, H, W]
Xs.append(video)
return Xs
def _get_top_predictions(self, probabilities, top_k=3):
"""Get top-k predictions with probabilities"""
top_indices = np.argsort(probabilities)[-top_k:][::-1]
top_predictions = []
for idx in top_indices:
top_predictions.append({
'exercise': self.ACTIONS[idx],
'probability': float(probabilities[idx])
})
return top_predictions
def process_video(self, video_path):
"""
Main function to process video and return ensemble results
"""
logger.info(f"Processing video: {video_path}")
start_time = time.time()
# Track processing times
processing_times = {}
try:
# 1. Extract pose landmarks using MediaPipe
logger.info("Extracting pose landmarks...")
mediapipe_start = time.time()
df = self.mediapipe_extractor.extract_from_video(video_path)
processing_times['mediapipe'] = time.time() - mediapipe_start
if len(df) == 0:
raise ValueError("No frames could be processed from video")
# 2. Process for each model
results = {
'individual_predictions': {},
'ensemble_predictions': None,
'processing_times': processing_times,
'windows_processed': 0
}
available_models = self.model_loader.get_available_models()
all_probabilities = []
# ST-GCN
if 'stgcn' in available_models:
logger.info("Processing with ST-GCN...")
stgcn_start = time.time()
# Process landmarks for ST-GCN
stgcn_data = self.mediapipe_extractor.process_landmarks_for_stgcn(df)
# Create windows
windows = self._sliding_windows(stgcn_data, self.SEQ_LEN, self.OVERLAP_TEST)
if windows:
X_stgcn = np.stack(windows, axis=0)
# Predict
proba_stgcn = self.model_loader.predict_stgcn(X_stgcn)
if proba_stgcn is not None:
# Average probabilities across ALL windows in video (not per window)
avg_proba = np.mean(proba_stgcn, axis=0)
results['individual_predictions']['stgcn'] = self._get_top_predictions(avg_proba)
all_probabilities.append(avg_proba)
results['windows_processed'] = len(windows)
else:
logger.warning("ST-GCN prediction failed - using demo predictions")
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0]
results['individual_predictions']['stgcn'] = self._get_top_predictions(demo_proba)
processing_times['stgcn'] = time.time() - stgcn_start
# Transformer 12rel
if 'transformer_12rel' in available_models:
logger.info("Processing with Transformer 12rel...")
transformer_start = time.time()
# Process landmarks for Transformer
transformer_data = self.mediapipe_extractor.process_landmarks_for_transformer_12rel(df)
# Create windows
windows = self._sliding_windows(transformer_data, self.SEQ_LEN, self.OVERLAP_TEST)
if windows:
X_transformer = np.stack(windows, axis=0)
# Predict
proba_transformer = self.model_loader.predict_transformer_12rel(X_transformer)
if proba_transformer is not None:
# Average probabilities across ALL windows in video
avg_proba = np.mean(proba_transformer, axis=0)
results['individual_predictions']['transformer_12rel'] = self._get_top_predictions(avg_proba)
all_probabilities.append(avg_proba)
else:
logger.warning("Transformer 12rel prediction failed - using demo predictions")
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0]
results['individual_predictions']['transformer_12rel'] = self._get_top_predictions(demo_proba)
processing_times['transformer_12rel'] = time.time() - transformer_start
# Transformer angle branch
if 'transformer_angle' in available_models:
logger.info("Processing with Transformer angle branch...")
angle_start = time.time()
# Process landmarks for angle branch
rel_data, angle_data = self.mediapipe_extractor.process_landmarks_for_transformer_angle(df)
# Create windows for both branches
rel_windows = self._get_windows_numpy(rel_data, self.SEQ_LEN, self.OVERLAP_TEST)
angle_windows = self._get_windows_numpy(angle_data, self.SEQ_LEN, self.OVERLAP_TEST)
if len(rel_windows) > 0 and len(angle_windows) > 0:
# Ensure same number of windows
min_windows = min(len(rel_windows), len(angle_windows))
X_rel = rel_windows[:min_windows]
X_ang = angle_windows[:min_windows]
# Check and transpose if needed
if X_rel.ndim == 3 and X_rel.shape[1] != self.SEQ_LEN:
X_rel = X_rel.transpose(0, 2, 1)
if X_ang.ndim == 3 and X_ang.shape[1] != self.SEQ_LEN:
X_ang = X_ang.transpose(0, 2, 1)
# Predict
proba_angle = self.model_loader.predict_transformer_angle(X_rel, X_ang)
if proba_angle is not None:
# Average probabilities across ALL windows in video
avg_proba = np.mean(proba_angle, axis=0)
results['individual_predictions']['transformer_angle'] = self._get_top_predictions(avg_proba)
all_probabilities.append(avg_proba)
else:
logger.warning("Transformer angle prediction failed - using demo predictions")
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0]
results['individual_predictions']['transformer_angle'] = self._get_top_predictions(demo_proba)
processing_times['transformer_angle'] = time.time() - angle_start
# Swin3D
if 'swin3d' in available_models:
logger.info("Processing with Swin3D...")
swin3d_start = time.time()
try:
# Process video for Swin3D
video_windows = self._process_video_for_swin3d(video_path)
if video_windows:
# Predict
proba_swin3d = self.model_loader.predict_swin3d(video_windows)
if proba_swin3d is not None:
# Average probabilities across ALL windows in video
avg_proba = np.mean(proba_swin3d, axis=0)
results['individual_predictions']['swin3d'] = self._get_top_predictions(avg_proba)
all_probabilities.append(avg_proba)
else:
logger.warning("Swin3D prediction failed - using demo predictions")
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0]
results['individual_predictions']['swin3d'] = self._get_top_predictions(demo_proba)
except Exception as e:
logger.warning(f"Swin3D processing failed: {str(e)}")
results['individual_predictions']['swin3d'] = None
processing_times['swin3d'] = time.time() - swin3d_start
# 3. Ensemble predictions
if all_probabilities:
logger.info("Computing ensemble predictions...")
# Simple averaging ensemble
ensemble_proba = np.mean(all_probabilities, axis=0)
results['ensemble_predictions'] = self._get_top_predictions(ensemble_proba)
logger.info(f"Ensemble top prediction: {results['ensemble_predictions'][0]['exercise']} "
f"({results['ensemble_predictions'][0]['probability']:.3f})")
else:
logger.warning("No model predictions available - generating demo predictions")
# Generate demo predictions for testing when no models loaded
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0]
results['ensemble_predictions'] = self._get_top_predictions(demo_proba)
logger.info("Demo mode: Generated random predictions for testing")
# Set default values for missing models with demo predictions
for model_name in ['stgcn', 'transformer_12rel', 'transformer_angle', 'swin3d']:
if model_name not in results['individual_predictions']:
# Generate demo predictions instead of N/A
demo_proba = np.random.dirichlet(np.ones(len(self.ACTIONS)), size=1)[0]
results['individual_predictions'][model_name] = self._get_top_predictions(demo_proba)
logger.info(f"Generated demo predictions for {model_name}")
if model_name not in processing_times:
processing_times[model_name] = 0.0
total_time = time.time() - start_time
logger.info(f"Video processing completed in {total_time:.2f}s")
return results
except Exception as e:
logger.error(f"Error processing video: {str(e)}")
raise
finally:
# Don't cleanup MediaPipe extractor - reuse for next videos
# Only cleanup when VideoProcessor is destroyed
pass
def cleanup(self):
"""Clean up resources"""
logger.info("Cleaning up VideoProcessor...")
if hasattr(self, 'model_loader'):
self.model_loader.cleanup()
if hasattr(self, 'mediapipe_extractor'):
self.mediapipe_extractor.cleanup()