Spaces:
Sleeping
Sleeping
fix: keyframe images, video clips, evidence images, live stream webcam+URL, remove demo mode
fd50325 verified | """ | |
| Event Clip Generator | |
| Generates video clips from events for viewing, playing, and downloading. | |
| Extracts clips from the original or compressed video based on event timestamps. | |
| Supports annotation with face bounding boxes for person search results. | |
| """ | |
| import os | |
| import cv2 | |
| import subprocess | |
| import logging | |
| import uuid | |
| from typing import Optional, Dict, Any, List, Tuple | |
| from pathlib import Path | |
| from datetime import datetime | |
| logger = logging.getLogger(__name__) | |
| class EventClipGenerator: | |
| """Generate video clips from events""" | |
| def __init__(self, output_dir: str = "video_processing_outputs/clips"): | |
| self.output_dir = output_dir | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| def extract_clip(self, video_path: str, start_time: float, end_time: float, | |
| event_id: str, video_id: str = None) -> Optional[str]: | |
| """ | |
| Extract a video clip from a video file | |
| Args: | |
| video_path: Path to source video | |
| start_time: Start timestamp in seconds | |
| end_time: End timestamp in seconds | |
| event_id: Event identifier | |
| video_id: Optional video identifier for organizing clips | |
| Returns: | |
| Path to extracted clip file, or None if extraction failed | |
| """ | |
| if not os.path.exists(video_path): | |
| logger.error(f"Video file not found: {video_path}") | |
| return None | |
| try: | |
| # Create clip filename | |
| clip_id = f"{event_id}_{uuid.uuid4().hex[:8]}" | |
| clip_filename = f"{clip_id}.mp4" | |
| # Create output directory for this video if video_id provided | |
| if video_id: | |
| clip_dir = os.path.join(self.output_dir, video_id) | |
| os.makedirs(clip_dir, exist_ok=True) | |
| clip_path = os.path.join(clip_dir, clip_filename) | |
| else: | |
| clip_path = os.path.join(self.output_dir, clip_filename) | |
| # Calculate duration | |
| duration = end_time - start_time | |
| # Use ffmpeg to extract clip (more reliable than OpenCV) | |
| try: | |
| # Try ffmpeg first (faster and more reliable) | |
| # Use re-encoding with H.264 for browser compatibility and reliable | |
| # short-clip extraction (codec copy can miss keyframes and produce | |
| # audio-only output for clips shorter than ~2 seconds). | |
| cmd = [ | |
| 'ffmpeg', | |
| '-ss', str(start_time), # Seek BEFORE -i for speed | |
| '-i', video_path, | |
| '-t', str(duration), | |
| '-c:v', 'libx264', # Re-encode video with H.264 | |
| '-preset', 'fast', | |
| '-crf', '23', | |
| '-c:a', 'aac', # AAC audio for browser compat | |
| '-movflags', '+faststart', # Enable progressive download | |
| '-avoid_negative_ts', 'make_zero', | |
| '-y', # Overwrite output file | |
| clip_path | |
| ] | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=60 # 60 second timeout | |
| ) | |
| if result.returncode == 0 and os.path.exists(clip_path): | |
| logger.info(f"✅ Extracted clip: {clip_path} ({duration:.2f}s)") | |
| return clip_path | |
| else: | |
| logger.warning(f"FFmpeg extraction failed, trying OpenCV fallback: {result.stderr}") | |
| # Fallback to OpenCV | |
| return self._extract_clip_opencv(video_path, start_time, end_time, clip_path) | |
| except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError) as e: | |
| logger.warning(f"FFmpeg not available or failed: {e}, using OpenCV fallback") | |
| # Fallback to OpenCV | |
| return self._extract_clip_opencv(video_path, start_time, end_time, clip_path) | |
| except Exception as e: | |
| logger.error(f"Error extracting clip: {e}") | |
| return None | |
| def _extract_clip_opencv(self, video_path: str, start_time: float, | |
| end_time: float, output_path: str) -> Optional[str]: | |
| """Extract clip using OpenCV (fallback method)""" | |
| try: | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| logger.error(f"Could not open video: {video_path}") | |
| return None | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| # Calculate frame numbers | |
| start_frame = int(start_time * fps) | |
| end_frame = int(end_time * fps) | |
| # Set starting position | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
| # Create video writer | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) | |
| frame_count = start_frame | |
| while frame_count <= end_frame: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| out.write(frame) | |
| frame_count += 1 | |
| cap.release() | |
| out.release() | |
| # Convert to browser-compatible format using ffmpeg | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 0: | |
| try: | |
| browser_compatible_path = output_path.replace('.mp4', '_h264.mp4') | |
| cmd = [ | |
| 'ffmpeg', | |
| '-i', output_path, | |
| '-c:v', 'libx264', # H.264 codec for browser compatibility | |
| '-preset', 'fast', | |
| '-crf', '23', | |
| '-c:a', 'aac', # AAC audio codec | |
| '-movflags', '+faststart', # Enable streaming | |
| '-y', | |
| browser_compatible_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| if result.returncode == 0 and os.path.exists(browser_compatible_path): | |
| # Remove the original mp4v file and rename | |
| os.remove(output_path) | |
| os.rename(browser_compatible_path, output_path) | |
| logger.info(f"✅ Extracted clip using OpenCV (H.264): {output_path}") | |
| return output_path | |
| else: | |
| logger.warning(f"FFmpeg conversion failed: {result.stderr}") | |
| logger.info(f"✅ Extracted clip using OpenCV (mp4v): {output_path}") | |
| return output_path | |
| except Exception as e: | |
| logger.warning(f"FFmpeg not available for conversion: {e}") | |
| logger.info(f"✅ Extracted clip using OpenCV: {output_path}") | |
| return output_path | |
| else: | |
| logger.error(f"OpenCV extraction failed: output file is empty or missing") | |
| return None | |
| except Exception as e: | |
| logger.error(f"OpenCV clip extraction error: {e}") | |
| return None | |
| def extract_annotated_clip(self, video_path: str, start_time: float, end_time: float, | |
| face_id: str, face_detections: List[Dict[str, Any]], | |
| video_id: str = None, person_name: str = None) -> Optional[str]: | |
| """ | |
| Extract and annotate a video clip with bounding boxes for a specific person | |
| Args: | |
| video_path: Path to source video | |
| start_time: Start timestamp in seconds | |
| end_time: End timestamp in seconds | |
| face_id: Face identifier to highlight | |
| face_detections: List of face detection records with bounding boxes and timestamps | |
| video_id: Optional video identifier | |
| person_name: Optional person name to display on annotations | |
| Returns: | |
| Path to annotated clip file, or None if extraction failed | |
| """ | |
| if not os.path.exists(video_path): | |
| logger.error(f"Video file not found: {video_path}") | |
| return None | |
| try: | |
| # Create annotated clip filename | |
| clip_id = f"annotated_{face_id}_{uuid.uuid4().hex[:8]}" | |
| clip_filename = f"{clip_id}.mp4" | |
| # Create output directory | |
| if video_id: | |
| clip_dir = os.path.join(self.output_dir, video_id, "annotated") | |
| os.makedirs(clip_dir, exist_ok=True) | |
| clip_path = os.path.join(clip_dir, clip_filename) | |
| else: | |
| annotated_dir = os.path.join(self.output_dir, "annotated") | |
| os.makedirs(annotated_dir, exist_ok=True) | |
| clip_path = os.path.join(annotated_dir, clip_filename) | |
| # Open video | |
| cap = cv2.VideoCapture(video_path) | |
| if not cap.isOpened(): | |
| logger.error(f"Could not open video: {video_path}") | |
| return None | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| # Calculate frame numbers | |
| start_frame = int(start_time * fps) | |
| end_frame = min(int(end_time * fps), total_frames - 1) | |
| # Create a map of frame_number -> bounding boxes for quick lookup | |
| frame_bbox_map = {} | |
| for detection in face_detections: | |
| if detection.get('face_id') == face_id: | |
| # Try multiple timestamp fields | |
| timestamp = ( | |
| detection.get('timestamp') or | |
| detection.get('detected_at') or | |
| (detection.get('detected_at').timestamp() if isinstance(detection.get('detected_at'), type(datetime.now())) else 0) or | |
| 0 | |
| ) | |
| # If timestamp is a datetime object, convert to seconds | |
| if hasattr(timestamp, 'timestamp'): | |
| timestamp = timestamp.timestamp() | |
| frame_num = int(timestamp * fps) if timestamp > 0 else 0 | |
| # Try multiple bbox field names | |
| bbox = ( | |
| detection.get('bounding_box') or | |
| detection.get('bounding_boxes') or | |
| None | |
| ) | |
| if bbox: | |
| # Handle different bbox formats: [x1, y1, x2, y2] or {"x1": ..., "y1": ..., ...} | |
| try: | |
| if isinstance(bbox, dict): | |
| x1 = int(bbox.get('x1', bbox.get(0, 0))) | |
| y1 = int(bbox.get('y1', bbox.get(1, 0))) | |
| x2 = int(bbox.get('x2', bbox.get(2, 0))) | |
| y2 = int(bbox.get('y2', bbox.get(3, 0))) | |
| elif isinstance(bbox, list) and len(bbox) >= 4: | |
| x1, y1, x2, y2 = int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]) | |
| else: | |
| continue | |
| # Validate bounding box coordinates | |
| if x1 >= 0 and y1 >= 0 and x2 > x1 and y2 > y1: | |
| # Store for multiple nearby frames to handle timestamp inaccuracies | |
| for offset in range(-2, 3): # ±2 frames tolerance | |
| frame_bbox_map[frame_num + offset] = (x1, y1, x2, y2) | |
| except (ValueError, TypeError) as e: | |
| logger.warning(f"Invalid bounding box format: {bbox}, error: {e}") | |
| continue | |
| # Set starting position | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame) | |
| # Create video writer | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| out = cv2.VideoWriter(clip_path, fourcc, fps, (width, height)) | |
| frame_count = start_frame | |
| frames_annotated = 0 | |
| while frame_count <= end_frame: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| # Check if this frame has a bounding box for this face | |
| if frame_count in frame_bbox_map: | |
| x1, y1, x2, y2 = frame_bbox_map[frame_count] | |
| # Draw bounding box (green for person detection) | |
| color = (0, 255, 0) # Green in BGR | |
| thickness = 3 | |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, thickness) | |
| # Draw label | |
| label = person_name if person_name else "Detected Person" | |
| label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2)[0] | |
| # Draw label background | |
| cv2.rectangle(frame, (x1, y1 - label_size[1] - 10), | |
| (x1 + label_size[0] + 10, y1), color, -1) | |
| # Draw label text | |
| cv2.putText(frame, label, (x1 + 5, y1 - 5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| frames_annotated += 1 | |
| out.write(frame) | |
| frame_count += 1 | |
| cap.release() | |
| out.release() | |
| # Convert to browser-compatible format using ffmpeg | |
| if os.path.exists(clip_path) and os.path.getsize(clip_path) > 0: | |
| try: | |
| browser_compatible_path = clip_path.replace('.mp4', '_h264.mp4') | |
| cmd = [ | |
| 'ffmpeg', | |
| '-i', clip_path, | |
| '-c:v', 'libx264', # H.264 codec for browser compatibility | |
| '-preset', 'fast', | |
| '-crf', '23', | |
| '-c:a', 'aac', # AAC audio codec | |
| '-movflags', '+faststart', # Enable streaming | |
| '-y', | |
| browser_compatible_path | |
| ] | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) | |
| if result.returncode == 0 and os.path.exists(browser_compatible_path): | |
| # Remove the original mp4v file and rename | |
| os.remove(clip_path) | |
| os.rename(browser_compatible_path, clip_path) | |
| logger.info(f"✅ Created annotated clip: {clip_path} ({frames_annotated} frames annotated)") | |
| return clip_path | |
| else: | |
| logger.warning(f"FFmpeg conversion failed, returning OpenCV output: {result.stderr}") | |
| logger.info(f"✅ Created annotated clip (mp4v): {clip_path} ({frames_annotated} frames annotated)") | |
| return clip_path | |
| except Exception as e: | |
| logger.warning(f"FFmpeg not available for conversion: {e}") | |
| logger.info(f"✅ Created annotated clip (mp4v): {clip_path} ({frames_annotated} frames annotated)") | |
| return clip_path | |
| else: | |
| logger.error(f"Annotated clip creation failed: output file is empty or missing") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error creating annotated clip: {e}") | |
| return None | |
| def get_clip_info(self, clip_path: str) -> Dict[str, Any]: | |
| """Get information about a clip file""" | |
| if not os.path.exists(clip_path): | |
| return {} | |
| try: | |
| cap = cv2.VideoCapture(clip_path) | |
| if not cap.isOpened(): | |
| return {} | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| duration = frame_count / fps if fps > 0 else 0 | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| file_size = os.path.getsize(clip_path) | |
| cap.release() | |
| return { | |
| 'duration': duration, | |
| 'fps': fps, | |
| 'frame_count': frame_count, | |
| 'resolution': f"{width}x{height}", | |
| 'file_size': file_size, | |
| 'file_size_mb': round(file_size / (1024 * 1024), 2) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting clip info: {e}") | |
| return {} | |