test / behavior_backend /tests /services /01_test_eye_contact.py
hibatorrahmen's picture
Add backend application and Dockerfile
8ae78b0
import cv2
import time
import json
import sys
import os
# Add the project root to sys.path
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(os.path.dirname(current_dir))
sys.path.insert(0, parent_dir)
from app.services.processing.eye_contact_analyzer import EyeContactAnalyzer, analyze_eye_contact, DEVICE
def analyze_video_file(video_path, display_video=True, save_results=True):
"""
Analyze eye contact in a video file and get statistics.
Args:
video_path: Path to the video file
display_video: Whether to display the video during analysis
save_results: Whether to save results to a JSON file
Returns:
dict: Eye contact statistics and assessment
"""
# Open the video file
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error: Could not open video file {video_path}")
return None
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps if fps > 0 else 0
print(f"Analyzing video: {video_path}")
print(f"Video properties: {frame_count} frames, {fps:.2f} FPS, {duration:.2f} seconds")
# Initialize analyzer
analyzer = EyeContactAnalyzer()
frame_number = 0
# Variables for FPS calculation
prev_time = time.time()
fps_counter = 0
processing_fps = 0
# Process each frame
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Process the frame
metrics, analyzer, annotated_frame = analyze_eye_contact(frame, analyzer, display_video)
# Calculate processing FPS
fps_counter += 1
current_time = time.time()
if current_time - prev_time >= 1.0: # Update FPS every second
processing_fps = fps_counter / (current_time - prev_time)
fps_counter = 0
prev_time = current_time
# Display progress
frame_number += 1
progress = (frame_number / frame_count) * 100 if frame_count > 0 else 0
print(f"\rProgress: {progress:.1f}% (Frame {frame_number}/{frame_count})", end="")
# Calculate current video time
current_video_time = frame_number / fps if fps > 0 else 0
minutes = int(current_video_time // 60)
seconds = int(current_video_time % 60)
# Display the frame if requested
if display_video:
# Add progress information to the frame
cv2.putText(annotated_frame, f"Progress: {progress:.1f}%",
(20, 140), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Add FPS information to the frame
cv2.putText(annotated_frame, f"Processing FPS: {processing_fps:.1f}",
(20, 170), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Add device information
cv2.putText(annotated_frame, f"Device: {DEVICE}",
(20, 200), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Add current video time
cv2.putText(annotated_frame, f"Time: {minutes:02d}:{seconds:02d}",
(20, 230), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)
# Show frame
cv2.imshow("Eye Contact Analysis", annotated_frame)
# Break if 'q' is pressed
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# Clean up
cap.release()
if display_video:
cv2.destroyAllWindows()
print("\nAnalysis complete!")
# Get statistics and assessment
stats = analyzer.get_stats()
assessment = analyzer.get_interview_assessment()
# Combine results
results = {
"video_info": {
"path": video_path,
"frames": frame_count,
"fps": fps,
"duration_seconds": duration,
"device_used": DEVICE
},
"eye_contact_stats": stats,
"assessment": assessment
}
# Save results if requested
if save_results:
output_file = f"{video_path.split('/')[-1].split('.')[0]}_eye_contact_analysis.json"
with open(output_file, 'w') as f:
json.dump(results, f, indent=4)
print(f"Results saved to {output_file}")
# Print key statistics
print("\n--- Eye Contact Statistics ---")
print(f"Total frames analyzed: {stats['total_frames']}")
print(f"Eye contact percentage: {stats['eye_contact_percentage']:.2f}%")
print(f"Total duration: {stats['total_duration_seconds']:.2f} seconds")
print(f"Eye contact duration: {stats['eye_contact_duration_seconds']:.2f} seconds")
print(f"Longest eye contact: {stats['longest_eye_contact_seconds']:.2f} seconds")
print(f"Average contact duration: {stats['average_contact_duration_seconds']:.2f} seconds")
print(f"Contact episodes: {stats['contact_episodes']}")
print("\n--- Assessment ---")
print(f"Score: {assessment['score']}/10")
print(f"Assessment: {assessment['assessment']}")
print("\nPatterns detected:")
for pattern in assessment['patterns']:
print(f"- {pattern}")
print("\nRecommendations:")
for recommendation in assessment['recommendations']:
print(f"- {recommendation}")
return results
if __name__ == "__main__":
# Path to the video file
video_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"static", "uploads", "ce1fbeb0-6bf2-492f-baa4-7b50d0dc3981.mp4")
# Analyze the video
analyze_video_file(video_path, display_video=True, save_results=True)