Spaces:
Sleeping
Sleeping
fix: keyframe images, video clips, evidence images, live stream webcam+URL, remove demo mode
fd50325 verified | """ | |
| DetectifAI Complete Video Processing Pipeline | |
| This is the main pipeline that orchestrates all DetectifAI components: | |
| - Optimized video processing with selective frame enhancement | |
| - DetectifAI event detection and security analysis | |
| - Object detection with fire/weapon recognition | |
| - Event aggregation and threat assessment | |
| - Highlight reel generation for security incidents | |
| - Compression and comprehensive reporting | |
| - API integration for real-time frontend updates | |
| """ | |
| import os | |
| import time | |
| import logging | |
| from typing import Dict, List, Any, Tuple, Optional | |
| from datetime import datetime | |
| import json | |
| # Import all components | |
| from config import VideoProcessingConfig, get_security_focused_config, get_high_recall_config | |
| from core.video_processing import OptimizedVideoProcessor | |
| from event_aggregation import EventDetector, EventDeduplicationEngine | |
| from video_segmentation import VideoSegmentationEngine | |
| from highlight_reel import HighlightReelGenerator | |
| from video_compression import VideoCompressor | |
| from json_reports import ReportGenerator | |
| from object_detection import ObjectDetectionIntegrator | |
| from behavior_analysis_integrator import BehaviorAnalysisIntegrator | |
| from video_captioning_integrator import VideoCaptioningIntegrator | |
| from detectifai_events import DetectifAIEventType, ThreatLevel | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('logs/detectifai_pipeline.log') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class CompleteVideoProcessingPipeline: | |
| """Complete video processing pipeline orchestrating all components""" | |
| def __init__(self, config: VideoProcessingConfig = None, db_manager=None): | |
| """ | |
| Initialize the complete processing pipeline | |
| Args: | |
| config: VideoProcessingConfig object, uses default if None | |
| db_manager: Optional DatabaseManager for MongoDB integration | |
| """ | |
| self.config = config or VideoProcessingConfig() | |
| self.db_manager = db_manager | |
| self.processing_stats = { | |
| 'start_time': None, | |
| 'end_time': None, | |
| 'total_processing_time': 0, | |
| 'component_times': {}, | |
| 'memory_usage': {}, | |
| 'errors': [] | |
| } | |
| # Initialize components | |
| logger.info("Initializing video processing pipeline components") | |
| try: | |
| self.video_processor = OptimizedVideoProcessor(self.config) | |
| self.event_detector = EventDetector(self.config) | |
| self.deduplication_engine = EventDeduplicationEngine(self.config) | |
| self.segmentation_engine = VideoSegmentationEngine(self.config) | |
| self.highlight_generator = HighlightReelGenerator(self.config) | |
| self.compressor = VideoCompressor(self.config) | |
| self.report_generator = ReportGenerator(self.config) | |
| self.object_detector = ObjectDetectionIntegrator(self.config) | |
| # Initialize behavior analyzer if enabled | |
| self.behavior_analyzer = None | |
| if getattr(self.config, 'enable_behavior_analysis', False): | |
| try: | |
| self.behavior_analyzer = BehaviorAnalysisIntegrator(self.config) | |
| logger.info("β Behavior analysis enabled") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Behavior analysis initialization failed: {e}") | |
| self.config.enable_behavior_analysis = False | |
| # Initialize video captioning if enabled | |
| self.video_captioning = None | |
| if getattr(self.config, 'enable_video_captioning', False): | |
| try: | |
| self.video_captioning = VideoCaptioningIntegrator(self.config, db_manager=db_manager) | |
| logger.info("β Video captioning enabled (MongoDB + FAISS)") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Video captioning initialization failed: {e}") | |
| self.config.enable_video_captioning = False | |
| logger.info("β All pipeline components initialized successfully") | |
| except Exception as e: | |
| logger.error(f"β Failed to initialize pipeline components: {e}") | |
| raise | |
| def process_video_complete(self, video_path: str, output_name: str = None) -> Dict[str, Any]: | |
| """ | |
| Process video through complete pipeline | |
| Args: | |
| video_path: Path to input video file | |
| output_name: Optional custom output name (uses video filename if None) | |
| Returns: | |
| Dictionary containing all processing results and output paths | |
| """ | |
| logger.info(f"π Starting complete video processing pipeline") | |
| logger.info(f"π Input video: {video_path}") | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| # Initialize processing stats | |
| self.processing_stats['start_time'] = time.time() | |
| # Prepare output naming | |
| if output_name is None: | |
| output_name = os.path.splitext(os.path.basename(video_path))[0] | |
| results = { | |
| 'input_video': video_path, | |
| 'output_name': output_name, | |
| 'config_used': self.config.__dict__.copy(), | |
| 'processing_stats': self.processing_stats, | |
| 'outputs': {} | |
| } | |
| # Ensure there is a concrete output directory for this run so downstream | |
| # steps (annotated video creation, reports, etc.) can write reliably. | |
| output_dir = os.path.join(self.config.output_base_dir, output_name) | |
| os.makedirs(output_dir, exist_ok=True) | |
| results['outputs']['output_directory'] = output_dir | |
| try: | |
| # Step 1: Extract keyframes with adaptive enhancement | |
| logger.info("π¬ Step 1: Extracting keyframes with adaptive enhancement...") | |
| step_start = time.time() | |
| keyframes = self.video_processor.extract_keyframes(video_path) | |
| self.processing_stats['component_times']['keyframe_extraction'] = time.time() - step_start | |
| results['outputs']['total_keyframes'] = len(keyframes) | |
| logger.info(f"β Extracted {len(keyframes)} keyframes") | |
| # Step 2: Create video segments | |
| logger.info("π Step 2: Creating video segments...") | |
| step_start = time.time() | |
| segments = self.segmentation_engine.create_video_segments(video_path, keyframes) | |
| self.processing_stats['component_times']['segmentation'] = time.time() - step_start | |
| results['outputs']['total_segments'] = len(segments) | |
| logger.info(f"β Created {len(segments)} video segments") | |
| # Step 3: Object Detection (if enabled) | |
| detection_results = [] | |
| object_events = [] | |
| if self.config.enable_object_detection: | |
| logger.info("π― Step 3a: Running object detection...") | |
| step_start = time.time() | |
| detection_results, object_events = self.object_detector.process_keyframes_with_object_detection(keyframes) | |
| self.processing_stats['component_times']['object_detection'] = time.time() - step_start | |
| results['outputs']['total_object_detections'] = len(detection_results) | |
| results['outputs']['total_object_events'] = len(object_events) | |
| logger.info(f"β Object detection complete: {len(object_events)} object-based events created") | |
| # Step 3b: Behavior Analysis (if enabled) | |
| behavior_results = [] | |
| behavior_events = [] | |
| if self.config.enable_behavior_analysis and self.behavior_analyzer: | |
| logger.info("π Step 3b: Running behavior analysis...") | |
| step_start = time.time() | |
| behavior_results, behavior_events = self.behavior_analyzer.process_keyframes_with_behavior_analysis(keyframes) | |
| self.processing_stats['component_times']['behavior_analysis'] = time.time() - step_start | |
| results['outputs']['total_behavior_detections'] = len(behavior_results) | |
| results['outputs']['total_behavior_events'] = len(behavior_events) | |
| logger.info(f"β Behavior analysis complete: {len(behavior_events)} behavior-based events created") | |
| # Step 3c: Video Captioning (if enabled) | |
| captioning_results = {} | |
| if self.config.enable_video_captioning and self.video_captioning: | |
| logger.info("π¬ Step 3c: Running video captioning...") | |
| step_start = time.time() | |
| captioning_results = self.video_captioning.process_keyframes_with_captioning( | |
| keyframes, | |
| video_id=output_name | |
| ) | |
| self.processing_stats['component_times']['video_captioning'] = time.time() - step_start | |
| results['outputs']['total_captions'] = captioning_results.get('total_captions', 0) | |
| results['outputs']['captioning_enabled'] = captioning_results.get('enabled', False) | |
| logger.info(f"β Video captioning complete: {captioning_results.get('total_captions', 0)} captions generated") | |
| logger.info(f"πΎ Captions saved to MongoDB, embeddings saved to FAISS") | |
| # Step 4: Detect motion-based events | |
| logger.info("π― Step 4: Detecting motion-based events...") | |
| step_start = time.time() | |
| motion_events = self.event_detector.detect_events(keyframes) | |
| self.processing_stats['component_times']['event_detection'] = time.time() - step_start | |
| # Convert object events to standard format and combine | |
| standard_object_events = [] | |
| if object_events: | |
| standard_object_events = self.event_detector.convert_object_events_to_standard_format(object_events) | |
| # Convert behavior events to standard format | |
| standard_behavior_events = [] | |
| if behavior_events: | |
| standard_behavior_events = self.event_detector.convert_behavior_events_to_standard_format(behavior_events) | |
| all_events = motion_events + standard_object_events + standard_behavior_events | |
| results['outputs']['total_motion_events'] = len(motion_events) | |
| results['outputs']['total_events'] = len(all_events) | |
| logger.info(f"β Detected {len(motion_events)} motion events + {len(object_events)} object events + {len(behavior_events)} behavior events = {len(all_events)} total events") | |
| # Step 4.5: DetectifAI Security Event Processing (includes facial recognition) | |
| logger.info("π Step 4.5: DetectifAI Security Event Processing...") | |
| step_start = time.time() | |
| detectifai_events = [] | |
| facial_recognition_stats = {} | |
| try: | |
| from detectifai_events import DetectifAIEventProcessor | |
| detectifai_processor = DetectifAIEventProcessor(self.config) | |
| detectifai_events = detectifai_processor.process_security_events( | |
| keyframes=keyframes, | |
| motion_events=motion_events, | |
| object_events=object_events | |
| ) | |
| # Additional facial recognition processing if available | |
| try: | |
| from facial_recognition import FacialRecognitionIntegrated | |
| if hasattr(self.config, 'enable_facial_recognition') and self.config.enable_facial_recognition: | |
| logger.info("π€ Processing facial recognition for suspicious activity frames...") | |
| face_detector = FacialRecognitionIntegrated(self.config) | |
| # Apply facial recognition ONLY to frames with suspicious activity (object detections) | |
| face_results = [] | |
| suspicious_frames = [] | |
| # Find frames with object detections (suspicious activity) | |
| suspicious_frames = [] | |
| if detection_results: | |
| suspicious_frames.extend([result for result in detection_results if result.total_detections > 0]) | |
| # Also find frames with behavior detections (suspicious activity) | |
| if behavior_results and self.behavior_analyzer: | |
| behavior_suspicious = self.behavior_analyzer.get_suspicious_frames(behavior_results) | |
| suspicious_frames.extend(behavior_suspicious) | |
| logger.info(f"π Found {len(behavior_suspicious)} suspicious frames from behavior analysis") | |
| # Remove duplicates based on frame_path | |
| seen_paths = set() | |
| unique_suspicious_frames = [] | |
| for frame in suspicious_frames: | |
| frame_path = frame.frame_path if hasattr(frame, 'frame_path') else getattr(frame, 'frame_path', None) | |
| if frame_path and frame_path not in seen_paths: | |
| seen_paths.add(frame_path) | |
| unique_suspicious_frames.append(frame) | |
| logger.info(f"π€ Applying facial recognition to {len(unique_suspicious_frames)} suspicious frames (from object detection + behavior analysis)") | |
| # Run face detection on suspicious frames only | |
| for suspicious_frame in unique_suspicious_frames: | |
| frame_path = suspicious_frame.frame_path if hasattr(suspicious_frame, 'frame_path') else getattr(suspicious_frame, 'frame_path', None) | |
| timestamp = suspicious_frame.timestamp if hasattr(suspicious_frame, 'timestamp') else getattr(suspicious_frame, 'timestamp', 0.0) | |
| if frame_path and os.path.exists(frame_path): | |
| face_result = face_detector.detect_faces_in_frame( | |
| frame_path, | |
| timestamp | |
| ) | |
| if face_result.faces_detected > 0: | |
| face_results.append(face_result) | |
| # Track suspicious persons and detect re-occurrences | |
| if face_results: | |
| reoccurrence_events = face_detector.track_suspicious_persons(face_results, detectifai_events) | |
| # Convert re-occurrence events to DetectifAI format | |
| for reoccurrence in reoccurrence_events: | |
| # Create DetectifAI event from reoccurrence | |
| from detectifai_events import DetectifAIEvent, DetectifAIEventType, ThreatLevel | |
| detectifai_reoccurrence = DetectifAIEvent( | |
| event_id=reoccurrence['event_id'], | |
| event_type=DetectifAIEventType.SUSPICIOUS_PERSON_REOCCURRENCE, | |
| threat_level=ThreatLevel.HIGH, | |
| start_timestamp=reoccurrence['start_timestamp'], | |
| end_timestamp=reoccurrence['end_timestamp'], | |
| confidence=reoccurrence.get('max_confidence', reoccurrence['confidence']), | |
| keyframes=reoccurrence['keyframes'], | |
| importance_score=reoccurrence.get('importance_score', 4.0), | |
| detection_data={ | |
| 'person_details': reoccurrence.get('detection_details', {}), | |
| 'placeholder': True | |
| }, | |
| requires_immediate_response=True, | |
| investigation_priority=5.0, | |
| description=reoccurrence.get('description', 'Suspicious person re-occurrence detected'), | |
| processing_timestamp=time.time(), | |
| detection_model_used='facial_recognition_placeholder' | |
| ) | |
| detectifai_events.append(detectifai_reoccurrence) | |
| logger.info(f"Added facial recognition event: {detectifai_reoccurrence.event_id}") | |
| facial_recognition_stats = face_detector.get_detection_stats() | |
| except ImportError: | |
| logger.info("Facial recognition module not available - skipping") | |
| except Exception as e: | |
| logger.error(f"Error in facial recognition processing: {e}") | |
| facial_recognition_stats = {'error': str(e)} | |
| except ImportError: | |
| logger.info("DetectifAI events module not available - using standard event processing") | |
| self.processing_stats['component_times']['detectifai_processing'] = time.time() - step_start | |
| results['outputs']['detectifai_events'] = len(detectifai_events) | |
| results['outputs']['facial_recognition_stats'] = facial_recognition_stats | |
| logger.info(f"β DetectifAI processing complete: {len(detectifai_events)} security events created") | |
| # Step 5: Deduplicate events and create canonical events | |
| logger.info("π Step 5: Deduplicating events...") | |
| step_start = time.time() | |
| canonical_events, dedup_stats = self.deduplication_engine.deduplicate_events(all_events) | |
| self.processing_stats['component_times']['deduplication'] = time.time() - step_start | |
| results['outputs']['canonical_events'] = len(canonical_events) | |
| results['outputs']['deduplication_stats'] = dedup_stats | |
| logger.info(f"β Created {len(canonical_events)} canonical events") | |
| # Step 5: Generate highlight reels (optional) | |
| highlight_paths = {} | |
| if self.config.generate_highlight_reels: | |
| logger.info("π₯ Step 5: Generating highlight reels...") | |
| step_start = time.time() | |
| highlight_paths = self._generate_all_highlight_reels(segments, canonical_events) | |
| self.processing_stats['component_times']['highlight_generation'] = time.time() - step_start | |
| logger.info(f"β Generated {len(highlight_paths)} highlight reels") | |
| else: | |
| logger.info("βοΈ Step 5: Skipping highlight reel generation (disabled in config)") | |
| results['outputs']['highlight_reels'] = highlight_paths | |
| # Step 5.5: Create annotated video with bounding boxes (if detections exist) | |
| annotated_video_path = None | |
| if self.config.enable_object_detection and detection_results: | |
| logger.info("π¨ Step 5.5: Creating annotated video with bounding boxes...") | |
| step_start = time.time() | |
| try: | |
| # Create annotated video with detection bounding boxes | |
| annotated_output_path = os.path.join( | |
| results['outputs']['output_directory'], | |
| f"{output_name}_annotated.mp4" | |
| ) | |
| annotated_video_path = self.object_detector.create_annotated_video( | |
| video_path, | |
| detection_results, | |
| annotated_output_path | |
| ) | |
| self.processing_stats['component_times']['video_annotation'] = time.time() - step_start | |
| if annotated_video_path: | |
| logger.info(f"β Annotated video created: {annotated_video_path}") | |
| results['outputs']['annotated_video'] = annotated_video_path | |
| else: | |
| logger.warning("β οΈ Annotated video creation failed") | |
| except Exception as e: | |
| logger.error(f"Error creating annotated video: {str(e)}") | |
| # Step 6: Compress video | |
| if self.config.generate_compressed_video: | |
| logger.info("ποΈ Step 6: Compressing video...") | |
| step_start = time.time() | |
| # Compress the annotated video if it exists, otherwise compress original | |
| video_to_compress = annotated_video_path if annotated_video_path else video_path | |
| compressed_path = self.compressor.compress_video( | |
| video_to_compress, | |
| f"{output_name}_compressed.{self.config.video_output_format}" | |
| ) | |
| results['outputs']['compressed_video'] = compressed_path | |
| self.processing_stats['component_times']['compression'] = time.time() - step_start | |
| if compressed_path: | |
| logger.info(f"β Video compressed: {compressed_path}") | |
| else: | |
| logger.warning("β οΈ Video compression failed") | |
| # Step 7: Generate reports | |
| logger.info("π Step 7: Generating reports...") | |
| step_start = time.time() | |
| report_paths = self._generate_all_reports(keyframes, all_events, canonical_events, segments, detection_results, behavior_results) | |
| results['outputs']['reports'] = report_paths | |
| self.processing_stats['component_times']['report_generation'] = time.time() - step_start | |
| logger.info(f"β Generated {len(report_paths)} reports") | |
| # Step 8: Save segment files | |
| if self.config.generate_segments: | |
| logger.info("πΎ Step 8: Saving segment files...") | |
| segments_report_path = self.segmentation_engine.save_segments_metadata( | |
| segments, | |
| os.path.join(self.config.output_base_dir, "reports", "video_segments.json") | |
| ) | |
| individual_segments_saved = self.segmentation_engine.save_individual_segment_files(segments) | |
| results['outputs']['segments_saved'] = individual_segments_saved | |
| logger.info("β Segment files saved") | |
| # Finalize processing stats | |
| self.processing_stats['end_time'] = time.time() | |
| self.processing_stats['total_processing_time'] = ( | |
| self.processing_stats['end_time'] - self.processing_stats['start_time'] | |
| ) | |
| logger.info(f"π PIPELINE COMPLETE!") | |
| logger.info(f"β±οΈ Total processing time: {self.processing_stats['total_processing_time']:.2f} seconds") | |
| return results | |
| except Exception as e: | |
| logger.error(f"β Pipeline processing failed: {e}") | |
| self.processing_stats['errors'].append(str(e)) | |
| raise | |
| def _generate_all_highlight_reels(self, segments: List, canonical_events: List) -> Dict[str, str]: | |
| """Generate all types of highlight reels""" | |
| highlight_paths = {} | |
| try: | |
| # Event-aware highlight reel | |
| event_aware_path = self.highlight_generator.create_event_aware_highlight_reel( | |
| segments, canonical_events | |
| ) | |
| if event_aware_path: | |
| highlight_paths['event_aware'] = event_aware_path | |
| # Ultra-comprehensive highlight reel | |
| comprehensive_path = self.highlight_generator.create_ultra_comprehensive_highlight_reel(segments) | |
| if comprehensive_path: | |
| highlight_paths['ultra_comprehensive'] = comprehensive_path | |
| # Quality-focused highlight reel | |
| quality_path = self.highlight_generator.create_quality_focused_highlight_reel(segments) | |
| if quality_path: | |
| highlight_paths['quality_focused'] = quality_path | |
| except Exception as e: | |
| logger.error(f"Error generating highlight reels: {e}") | |
| self.processing_stats['errors'].append(f"Highlight generation error: {e}") | |
| return highlight_paths | |
| def _generate_all_reports(self, keyframes: List, events: List, | |
| canonical_events: List, segments: List, | |
| detection_results: List = None, behavior_results: List = None) -> Dict[str, str]: | |
| """Generate all types of reports""" | |
| report_paths = {} | |
| try: | |
| # Processing results report (enhanced with object detection and behavior analysis) | |
| processing_report = self.report_generator.generate_processing_results_report( | |
| keyframes, events, canonical_events, segments, self.processing_stats, detection_results, behavior_results | |
| ) | |
| if processing_report: | |
| report_paths['processing_results'] = processing_report | |
| # Canonical events report | |
| canonical_report = self.report_generator.generate_canonical_events_report(canonical_events) | |
| if canonical_report: | |
| report_paths['canonical_events'] = canonical_report | |
| # Segments report | |
| segments_report = self.report_generator.generate_segments_report(segments) | |
| if segments_report: | |
| report_paths['video_segments'] = segments_report | |
| # Object detection report (if enabled) | |
| if self.config.enable_object_detection and detection_results: | |
| object_detection_report = self.report_generator.generate_object_detection_report( | |
| detection_results, self.object_detector.get_object_detection_summary() | |
| ) | |
| if object_detection_report: | |
| report_paths['object_detection'] = object_detection_report | |
| # Behavior analysis report (if enabled) | |
| if self.config.enable_behavior_analysis and behavior_results and self.behavior_analyzer: | |
| behavior_analysis_report = self.report_generator.generate_behavior_analysis_report( | |
| behavior_results, self.behavior_analyzer.get_behavior_analysis_summary() | |
| ) | |
| if behavior_analysis_report: | |
| report_paths['behavior_analysis'] = behavior_analysis_report | |
| # HTML gallery (enhanced with object detection and behavior analysis) | |
| if self.config.generate_html_gallery: | |
| html_gallery = self.report_generator.generate_html_gallery( | |
| keyframes, canonical_events, segments, detection_results, behavior_results | |
| ) | |
| if html_gallery: | |
| report_paths['html_gallery'] = html_gallery | |
| except Exception as e: | |
| logger.error(f"Error generating reports: {e}") | |
| self.processing_stats['errors'].append(f"Report generation error: {e}") | |
| return report_paths | |
| def get_processing_summary(self) -> Dict[str, Any]: | |
| """Get summary of processing results""" | |
| return { | |
| 'total_processing_time': self.processing_stats.get('total_processing_time', 0), | |
| 'component_times': self.processing_stats.get('component_times', {}), | |
| 'errors_encountered': len(self.processing_stats.get('errors', [])), | |
| 'processing_config': { | |
| 'base_quality_threshold': self.config.base_quality_threshold, | |
| 'motion_threshold': self.config.motion_threshold, | |
| 'max_summary_frames': self.config.max_summary_frames, | |
| 'output_resolution': self.config.output_resolution | |
| } | |
| } | |
| def process_multiple_videos(self, video_directory: str) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Process multiple videos in a directory | |
| Args: | |
| video_directory: Directory containing video files | |
| Returns: | |
| Dictionary mapping video paths to processing results | |
| """ | |
| logger.info(f"π¬ Processing multiple videos from: {video_directory}") | |
| if not os.path.exists(video_directory): | |
| raise FileNotFoundError(f"Video directory not found: {video_directory}") | |
| # Find video files | |
| video_extensions = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv'} | |
| video_files = [] | |
| for filename in os.listdir(video_directory): | |
| if any(filename.lower().endswith(ext) for ext in video_extensions): | |
| video_files.append(os.path.join(video_directory, filename)) | |
| logger.info(f"Found {len(video_files)} video files to process") | |
| batch_results = {} | |
| successful_count = 0 | |
| for i, video_path in enumerate(video_files, 1): | |
| try: | |
| logger.info(f"πΉ Processing video {i}/{len(video_files)}: {os.path.basename(video_path)}") | |
| results = self.process_video_complete(video_path) | |
| batch_results[video_path] = results | |
| successful_count += 1 | |
| logger.info(f"β Successfully processed {os.path.basename(video_path)}") | |
| except Exception as e: | |
| logger.error(f"β Failed to process {os.path.basename(video_path)}: {e}") | |
| batch_results[video_path] = {'error': str(e)} | |
| logger.info(f"π Batch processing complete: {successful_count}/{len(video_files)} successful") | |
| return batch_results | |
| def main(): | |
| """Main function demonstrating pipeline usage""" | |
| # Example usage with different configurations | |
| print("π¬ Video Processing Pipeline Demo") | |
| print("=" * 50) | |
| # For security detection - use specialized config | |
| security_config = get_security_focused_config() | |
| pipeline_security = CompleteVideoProcessingPipeline(security_config) | |
| # For high recall (more keyframes) - use high recall config | |
| high_recall_config = get_high_recall_config() | |
| pipeline_high_recall = CompleteVideoProcessingPipeline(high_recall_config) | |
| # Example video processing | |
| video_file = "rob.mp4" # Replace with your video file | |
| if os.path.exists(video_file): | |
| print(f"\nπ― Processing with security detection config...") | |
| results = pipeline_security.process_video_complete(video_file) | |
| print(f"\nπ Processing Summary:") | |
| summary = pipeline_security.get_processing_summary() | |
| for key, value in summary.items(): | |
| print(f" {key}: {value}") | |
| print(f"\\nπ Output files created:") | |
| for category, outputs in results['outputs'].items(): | |
| if isinstance(outputs, dict): | |
| print(f" {category}:") | |
| for name, path in outputs.items(): | |
| print(f" - {name}: {path}") | |
| else: | |
| print(f" {category}: {outputs}") | |
| else: | |
| print(f"β Video file not found: {video_file}") | |
| print("\\nπ‘ Available configuration presets:") | |
| print(" - get_security_focused_config() - Optimized for security/threat detection") | |
| print(" - get_high_recall_config() - More keyframes, sensitive detection") | |
| print(" - get_high_precision_config() - Fewer but higher quality keyframes") | |
| print(" - get_balanced_config() - General purpose settings") | |
| if __name__ == "__main__": | |
| main() |