| import os |
| import cv2 |
| import json |
| import time |
| import pytest |
| import numpy as np |
| import torch |
| from pathlib import Path |
| from datetime import datetime |
|
|
| |
| import sys |
| sys.path.append(str(Path(__file__).resolve().parents[2])) |
| from app.services.processing.emotion_analyzer import process_video_frames, EmotionCNN, load_emotion_model |
|
|
| |
| def ensure_dir(directory): |
| """Create directory if it doesn't exist.""" |
| os.makedirs(directory, exist_ok=True) |
|
|
| class TestEmotionGPU: |
| """Test class for GPU-based emotion analysis.""" |
| |
| @pytest.fixture |
| def test_video_path(self): |
| """Fixture to provide a test video path.""" |
| |
| base_dir = Path(__file__).resolve().parents[2] |
| |
| |
| potential_videos = [ |
| base_dir / "videos" / "sample.mp4", |
| base_dir / "videos" / "test.mp4", |
| base_dir / "static" / "samples" / "sample.mp4", |
| base_dir / "tests" / "data" / "sample.mp4" |
| ] |
| |
| for video_path in potential_videos: |
| if video_path.exists(): |
| return str(video_path) |
| |
| |
| fallback_path = base_dir / "tests" / "output" / "dummy_test_video.mp4" |
| ensure_dir(fallback_path.parent) |
| |
| if not fallback_path.exists(): |
| |
| width, height = 640, 480 |
| fps = 30 |
| duration = 3 |
| |
| |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| out = cv2.VideoWriter(str(fallback_path), fourcc, fps, (width, height)) |
| |
| |
| for i in range(fps * duration): |
| |
| frame = np.zeros((height, width, 3), dtype=np.uint8) |
| |
| cv2.circle(frame, (width//2, height//2), 100, (200, 200, 200), -1) |
| cv2.circle(frame, (width//2-30, height//2-30), 15, (0, 0, 0), -1) |
| cv2.circle(frame, (width//2+30, height//2-30), 15, (0, 0, 0), -1) |
| cv2.ellipse(frame, (width//2, height//2+30), (50, 20), 0, 0, 180, (0, 0, 0), -1) |
| |
| |
| out.write(frame) |
| |
| |
| out.release() |
| |
| return str(fallback_path) |
| |
| @pytest.fixture |
| def output_dir(self): |
| """Fixture to provide an output directory for test results.""" |
| base_dir = Path(__file__).resolve().parents[2] |
| output_path = str(base_dir / "tests" / "output" / "emotions_gpu") |
| ensure_dir(output_path) |
| return output_path |
| |
| def test_gpu_detection(self): |
| """Test GPU detection functionality.""" |
| |
| print(f"CUDA available: {torch.cuda.is_available()}") |
| if torch.cuda.is_available(): |
| print(f"CUDA device count: {torch.cuda.device_count()}") |
| print(f"CUDA device name: {torch.cuda.get_device_name(0)}") |
| |
| |
| try: |
| x = torch.tensor([1.0, 2.0, 3.0], device='cuda') |
| y = x * 2 |
| result = y.cpu().numpy() |
| assert np.array_equal(result, np.array([2.0, 4.0, 6.0])), "CUDA computation failed" |
| print("CUDA test passed!") |
| except Exception as e: |
| print(f"CUDA test failed: {e}") |
| |
| def test_model_to_gpu(self): |
| """Test moving the emotion model to GPU.""" |
| if not torch.cuda.is_available(): |
| pytest.skip("CUDA not available, skipping GPU model test") |
| |
| try: |
| |
| model = load_emotion_model() |
| |
| |
| model.to('cuda') |
| |
| |
| sample_input = torch.rand(1, 1, 48, 48, device='cuda') |
| |
| |
| with torch.no_grad(): |
| output = model(sample_input) |
| |
| |
| assert output.shape == (1, 7), f"Expected output shape (1, 7), got {output.shape}" |
| |
| |
| probs = torch.nn.functional.softmax(output, dim=1) |
| |
| |
| assert torch.abs(torch.sum(probs) - 1.0) < 1e-5, "Probabilities don't sum to 1" |
| |
| print("Model GPU test passed!") |
| except Exception as e: |
| print(f"Model GPU test failed: {e}") |
| assert False, f"Failed to run model on GPU: {e}" |
| |
| def test_process_video_frames_gpu(self, test_video_path, output_dir): |
| """Test processing video frames with GPU acceleration.""" |
| |
| results_path = os.path.join(output_dir, f"gpu_emotion_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") |
| |
| |
| use_gpu = torch.cuda.is_available() |
| |
| if not use_gpu: |
| print("CUDA not available, test will run on CPU") |
| |
| |
| start_time = time.time() |
| results = process_video_frames( |
| test_video_path, |
| sample_rate=5, |
| use_gpu=use_gpu |
| ) |
| end_time = time.time() |
| |
| |
| with open(results_path, 'w') as f: |
| json.dump(results, f, indent=2) |
| |
| |
| processing_time = end_time - start_time |
| print(f"Processing time: {processing_time:.2f} seconds") |
| print(f"Saved results to {results_path}") |
| |
| |
| assert isinstance(results, list), "Results should be a list" |
| |
| |
| frames_with_faces = sum(1 for frame in results if frame.get('faces')) |
| total_frames = len(results) |
| detection_rate = (frames_with_faces / total_frames * 100) if total_frames > 0 else 0 |
| |
| print(f"Frames processed: {total_frames}") |
| print(f"Frames with faces: {frames_with_faces} ({detection_rate:.1f}%)") |
| |
| |
| if use_gpu: |
| gpu_frames = sum(1 for frame in results if frame.get('gpu_used', False)) |
| gpu_usage_rate = (gpu_frames / total_frames * 100) if total_frames > 0 else 0 |
| print(f"Frames processed with GPU: {gpu_frames} ({gpu_usage_rate:.1f}%)") |
| |
| |
| assert gpu_frames > 0, "GPU was available but not used for any frames" |
| |
| |
| frames_with_emotions = 0 |
| emotion_values = [] |
| |
| for frame in results: |
| for face in frame.get('faces', []): |
| if 'emotions' in face: |
| frames_with_emotions += 1 |
| |
| for emotion, value in face['emotions'].items(): |
| emotion_values.append(value) |
| |
| if frames_with_faces > 0: |
| assert frames_with_emotions > 0, "No frames with emotion data found" |
| assert len(emotion_values) > 0, "No emotion values recorded" |
| |
| |
| for value in emotion_values: |
| assert 0 <= value <= 1, f"Emotion value {value} outside valid range [0, 1]" |
| |
| print(f"Test completed successfully - processed {total_frames} frames with {frames_with_faces} faces detected") |
|
|
|
|
| if __name__ == "__main__": |
| |
| test = TestEmotionGPU() |
| test_video_path = test.test_video_path() |
| output_dir = test.output_dir() |
| |
| |
| print("\n=== Testing GPU Detection ===") |
| test.test_gpu_detection() |
| |
| print("\n=== Testing Model to GPU ===") |
| try: |
| test.test_model_to_gpu() |
| except Exception as e: |
| print(f"Model to GPU test failed: {e}") |
| |
| print("\n=== Testing Process Video Frames with GPU ===") |
| test.test_process_video_frames_gpu(test_video_path, output_dir) |