gridsense-gemma4 / brain /video_processor.py
Nestroy2003's picture
Sync from GitHub via hub-sync
91bf1ce verified
import os
import tempfile
def process_video(video_path: str, max_frames: int = 5) -> dict:
"""Process video file — extracts frames and audio for Gemma 4 analysis."""
try:
import cv2
import numpy as np
return _process_with_cv2(video_path, max_frames)
except ImportError:
# cv2 not available on this deployment — return basic metadata
return {
"selected_frames": [],
"flicker_score": 0,
"motion_score": 0,
"audio_transcript": "",
"duration_seconds": 0,
"frame_summary": "Video processing not available on this deployment. Submit text description for analysis.",
"temp_dir": tempfile.mkdtemp()
}
except Exception as e:
return {
"selected_frames": [],
"flicker_score": 0,
"motion_score": 0,
"audio_transcript": "",
"duration_seconds": 0,
"frame_summary": f"Video processing error: {str(e)}",
"temp_dir": tempfile.mkdtemp()
}
def _process_with_cv2(video_path: str, max_frames: int) -> dict:
import cv2
import numpy as np
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
raise ValueError(f"Cannot open video: {video_path}")
fps = cap.get(cv2.CAP_PROP_FPS) or 30
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
max_analysis = int(min(30, duration) * fps)
frames, brightness_values = [], []
temp_dir = tempfile.mkdtemp()
sample_interval = max(1, int(fps))
frame_idx = 0
while cap.isOpened() and frame_idx < max_analysis:
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if not ret:
break
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
brightness = float(np.mean(gray))
brightness_values.append(brightness)
frames.append((frame_idx, brightness, frame.copy()))
frame_idx += sample_interval
cap.release()
if not frames:
return {"selected_frames": [], "flicker_score": 0, "motion_score": 0,
"audio_transcript": "", "duration_seconds": 0,
"frame_summary": "No frames extracted.", "temp_dir": temp_dir}
brightness_array = np.array(brightness_values)
flicker_score = min(1.0, float(np.std(brightness_array) / 30.0))
motion_scores = []
for i in range(1, len(frames)):
diff = cv2.absdiff(
cv2.cvtColor(frames[i-1][2], cv2.COLOR_BGR2GRAY),
cv2.cvtColor(frames[i][2], cv2.COLOR_BGR2GRAY)
)
motion_scores.append(float(np.mean(diff)))
motion_score = min(1.0, float(np.mean(motion_scores)) / 20.0) if motion_scores else 0.0
selected = {0, len(frames)-1,
int(np.argmax(brightness_values)),
int(np.argmin(brightness_values))}
frame_paths = []
for idx in sorted(selected)[:max_frames]:
if idx < len(frames):
path = os.path.join(temp_dir, f"frame_{idx:04d}.jpg")
cv2.imwrite(path, frames[idx][2], [cv2.IMWRITE_JPEG_QUALITY, 85])
frame_paths.append(path)
parts = [f"Video: {duration:.1f}s."]
if flicker_score > 0.3:
parts.append(f"Significant flickering (variance {flicker_score:.2f}). Brightness {min(brightness_values):.0f}{max(brightness_values):.0f}.")
elif flicker_score > 0.1:
parts.append(f"Mild brightness variation (variance {flicker_score:.2f}).")
else:
parts.append("Stable brightness throughout.")
if motion_score > 0.4:
parts.append(f"High motion ({motion_score:.2f}).")
return {
"selected_frames": frame_paths,
"flicker_score": round(flicker_score, 3),
"motion_score": round(motion_score, 3),
"audio_transcript": "",
"duration_seconds": round(duration, 1),
"frame_summary": " ".join(parts),
"temp_dir": temp_dir
}
def cleanup_temp_files(video_result: dict):
import shutil
temp_dir = video_result.get("temp_dir")
if temp_dir and os.path.exists(temp_dir):
shutil.rmtree(temp_dir, ignore_errors=True)