|
|
import os |
|
|
import cv2 |
|
|
import torch |
|
|
import pandas as pd |
|
|
from feature_extraction.extractor import VideoFeatureExtractor |
|
|
from utils.csv_utils import _create_interaction_row |
|
|
|
|
|
|
|
|
class VideoDataExtractor: |
|
|
def __init__(self): |
|
|
self.extractor = VideoFeatureExtractor() |
|
|
|
|
|
def extract_video_data( |
|
|
self, |
|
|
video_path, |
|
|
output_csv_path=None, |
|
|
output_folder=None, |
|
|
show_video=False, |
|
|
save_video=False, |
|
|
): |
|
|
""" |
|
|
Extract interaction data from a video file and return a DataFrame. |
|
|
|
|
|
Args: |
|
|
video_path: Path to input video |
|
|
output_csv_path: Optional path to save CSV output |
|
|
output_folder: Optional folder to save annotated video |
|
|
show_video: Whether to display video during processing |
|
|
save_video: Whether to save output video |
|
|
|
|
|
Returns: |
|
|
pandas.DataFrame containing extracted interactions |
|
|
""" |
|
|
cap = None |
|
|
video_writer = None |
|
|
csv_data = [] |
|
|
seen_interactions = set() |
|
|
|
|
|
try: |
|
|
if not os.path.exists(video_path): |
|
|
raise FileNotFoundError(f"Video file not found: {video_path}") |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
if not cap.isOpened(): |
|
|
raise ValueError("Error: Could not open video file") |
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
|
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
|
|
|
video_name = os.path.splitext(os.path.basename(video_path))[0] |
|
|
|
|
|
|
|
|
batch_size, frame_skip = self.extractor.preprocessor.set_resolution_config( |
|
|
frame_width, frame_height |
|
|
) |
|
|
self.extractor.preprocessor.frame_skip = frame_skip |
|
|
|
|
|
|
|
|
if output_folder and save_video: |
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
output_video_path = os.path.join( |
|
|
output_folder, f"{video_name}_detections.mp4" |
|
|
) |
|
|
video_writer = cv2.VideoWriter( |
|
|
output_video_path, |
|
|
cv2.VideoWriter_fourcc(*"mp4v"), |
|
|
fps / frame_skip, |
|
|
(frame_width, frame_height), |
|
|
) |
|
|
|
|
|
|
|
|
self.extractor.reset() |
|
|
|
|
|
|
|
|
for frame_idx in range(0, total_frames, frame_skip): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
|
|
ret, frame = cap.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
frame_data, annotated_frame = self.extractor.extract_features( |
|
|
frame, frame_idx |
|
|
) |
|
|
|
|
|
if frame_data is not None: |
|
|
for interaction in frame_data["interactions"]: |
|
|
interaction_id = ( |
|
|
interaction["person1_id"], |
|
|
interaction["person2_id"], |
|
|
frame_idx, |
|
|
) |
|
|
|
|
|
if interaction_id not in seen_interactions: |
|
|
seen_interactions.add(interaction_id) |
|
|
row = _create_interaction_row( |
|
|
video_name, |
|
|
frame_data, |
|
|
interaction, |
|
|
frame_width, |
|
|
frame_height, |
|
|
) |
|
|
csv_data.append(row) |
|
|
|
|
|
if video_writer is not None and annotated_frame is not None: |
|
|
video_writer.write(annotated_frame) |
|
|
|
|
|
if show_video and annotated_frame is not None: |
|
|
cv2.imshow("Video Data Extraction", annotated_frame) |
|
|
if cv2.waitKey(1) & 0xFF == ord("q"): |
|
|
break |
|
|
|
|
|
if frame_idx % 100 == 0: |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
if csv_data: |
|
|
df = pd.DataFrame(csv_data) |
|
|
if output_csv_path: |
|
|
df.to_csv( |
|
|
output_csv_path, |
|
|
mode="a" if os.path.exists(output_csv_path) else "w", |
|
|
header=not os.path.exists(output_csv_path), |
|
|
index=False, |
|
|
) |
|
|
return df |
|
|
else: |
|
|
return pd.DataFrame() |
|
|
|
|
|
finally: |
|
|
if cap is not None: |
|
|
cap.release() |
|
|
if video_writer is not None: |
|
|
video_writer.release() |
|
|
cv2.destroyAllWindows() |
|
|
torch.cuda.empty_cache() |
|
|
|