vio / services /video_data_extraction /video_preprocessor.py
harmesh95's picture
Add YOLOv8 model weights with LFS tracking
e2af51e
import os
import cv2
import torch
import pandas as pd
from feature_extraction.extractor import VideoFeatureExtractor
from utils.csv_utils import _create_interaction_row
class VideoDataExtractor:
def __init__(self):
self.extractor = VideoFeatureExtractor()
def extract_video_data(
self,
video_path,
output_csv_path=None,
output_folder=None,
show_video=False,
save_video=False,
):
"""
Extract interaction data from a video file and return a DataFrame.
Args:
video_path: Path to input video
output_csv_path: Optional path to save CSV output
output_folder: Optional folder to save annotated video
show_video: Whether to display video during processing
save_video: Whether to save output video
Returns:
pandas.DataFrame containing extracted interactions
"""
cap = None
video_writer = None
csv_data = []
seen_interactions = set()
try:
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError("Error: Could not open video file")
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
video_name = os.path.splitext(os.path.basename(video_path))[0]
# Configure resolution-based settings
batch_size, frame_skip = self.extractor.preprocessor.set_resolution_config(
frame_width, frame_height
)
self.extractor.preprocessor.frame_skip = frame_skip
# Initialize video writer if required
if output_folder and save_video:
os.makedirs(output_folder, exist_ok=True)
output_video_path = os.path.join(
output_folder, f"{video_name}_detections.mp4"
)
video_writer = cv2.VideoWriter(
output_video_path,
cv2.VideoWriter_fourcc(*"mp4v"),
fps / frame_skip,
(frame_width, frame_height),
)
# Reset extractor for a fresh start
self.extractor.reset()
# Process frames
for frame_idx in range(0, total_frames, frame_skip):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
break
frame_data, annotated_frame = self.extractor.extract_features(
frame, frame_idx
)
if frame_data is not None:
for interaction in frame_data["interactions"]:
interaction_id = (
interaction["person1_id"],
interaction["person2_id"],
frame_idx,
)
if interaction_id not in seen_interactions:
seen_interactions.add(interaction_id)
row = _create_interaction_row(
video_name,
frame_data,
interaction,
frame_width,
frame_height,
)
csv_data.append(row)
if video_writer is not None and annotated_frame is not None:
video_writer.write(annotated_frame)
if show_video and annotated_frame is not None:
cv2.imshow("Video Data Extraction", annotated_frame)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
if frame_idx % 100 == 0:
torch.cuda.empty_cache()
# βœ… Return only the DataFrame
if csv_data:
df = pd.DataFrame(csv_data)
if output_csv_path:
df.to_csv(
output_csv_path,
mode="a" if os.path.exists(output_csv_path) else "w",
header=not os.path.exists(output_csv_path),
index=False,
)
return df
else:
return pd.DataFrame() # empty DataFrame if nothing found
finally:
if cap is not None:
cap.release()
if video_writer is not None:
video_writer.release()
cv2.destroyAllWindows()
torch.cuda.empty_cache()