""" Core Analysis Engine Main orchestration engine for DeepVision Prompt Builder. Manages image/video processing, plugin execution, and result generation. """ from datetime import datetime from pathlib import Path from typing import Dict, List, Any, Optional, Union from loguru import logger from core.config import config from core.image_processor import ImageProcessor from core.video_processor import VideoProcessor from core.result_manager import ResultManager from core.exceptions import DeepVisionError class AnalysisEngine: """ Main analysis engine for processing images and videos. Orchestrates the complete analysis pipeline: 1. File validation and preprocessing 2. Plugin execution 3. Result aggregation 4. JSON output generation """ def __init__(self): """Initialize AnalysisEngine.""" self.image_processor = ImageProcessor() self.video_processor = VideoProcessor() self.result_manager = ResultManager() self.plugins: Dict[str, Any] = {} self.plugin_order: List[str] = [] logger.info(f"AnalysisEngine initialized - {config.APP_NAME} v{config.APP_VERSION}") def register_plugin(self, plugin_name: str, plugin_instance: Any) -> None: """ Register a plugin for analysis. Args: plugin_name: Unique name for the plugin plugin_instance: Instance of the plugin class """ if plugin_name in self.plugins: logger.warning(f"Plugin '{plugin_name}' already registered, replacing") self.plugins[plugin_name] = plugin_instance # Maintain execution order if plugin_name not in self.plugin_order: self.plugin_order.append(plugin_name) logger.info(f"Registered plugin: {plugin_name}") def unregister_plugin(self, plugin_name: str) -> None: """ Unregister a plugin. Args: plugin_name: Name of plugin to remove """ if plugin_name in self.plugins: del self.plugins[plugin_name] if plugin_name in self.plugin_order: self.plugin_order.remove(plugin_name) logger.info(f"Unregistered plugin: {plugin_name}") def get_registered_plugins(self) -> List[str]: """ Get list of registered plugins. Returns: List of plugin names """ return list(self.plugins.keys()) def analyze_image( self, image_path: Union[str, Path], plugins: Optional[List[str]] = None, **kwargs ) -> Dict[str, Any]: """ Analyze a single image. Args: image_path: Path to image file plugins: List of plugin names to use (None for all) **kwargs: Additional arguments for processing Returns: Analysis results dictionary """ start_time = datetime.now() image_path = Path(image_path) logger.info(f"Starting image analysis: {image_path.name}") try: # Clear previous results self.result_manager.clear() # Process image image = self.image_processor.process( image_path, resize=kwargs.get("resize", True), normalize=kwargs.get("normalize", False) ) # Get image info image_info = self.image_processor.get_image_info(image_path) # Set file metadata self.result_manager.set_file_info( filename=image_info["filename"], file_type="image", file_size=image_info["file_size"], width=image_info["width"], height=image_info["height"], format=image_info["format"], hash=image_info["hash"], ) # Execute plugins plugins_used = self._execute_plugins( image, image_path, plugins, media_type="image" ) # Set processing metadata end_time = datetime.now() self.result_manager.set_processing_info( start_time=start_time, end_time=end_time, plugins_used=plugins_used ) # Get final results results = self.result_manager.to_dict( include_metadata=config.INCLUDE_METADATA ) logger.info(f"Image analysis completed: {image_path.name} " f"({len(plugins_used)} plugins)") return results except Exception as e: logger.error(f"Image analysis failed: {e}") raise DeepVisionError( f"Analysis failed for {image_path.name}: {str(e)}", {"path": str(image_path), "error": str(e)} ) def analyze_video( self, video_path: Union[str, Path], plugins: Optional[List[str]] = None, extract_method: str = "keyframes", num_frames: int = 5, **kwargs ) -> Dict[str, Any]: """ Analyze a video by extracting and analyzing frames. Args: video_path: Path to video file plugins: List of plugin names to use extract_method: Frame extraction method ("fps" or "keyframes") num_frames: Number of frames to extract **kwargs: Additional arguments Returns: Analysis results dictionary """ start_time = datetime.now() video_path = Path(video_path) logger.info(f"Starting video analysis: {video_path.name}") try: # Clear previous results self.result_manager.clear() # Get video info video_info = self.video_processor.get_video_info(video_path) # Set file metadata self.result_manager.set_file_info( filename=video_info["filename"], file_type="video", file_size=video_info["file_size"], width=video_info["width"], height=video_info["height"], fps=video_info["fps"], duration=video_info["duration"], frame_count=video_info["frame_count"], ) # Extract frames if extract_method == "keyframes": frame_paths = self.video_processor.extract_key_frames( video_path, num_frames=num_frames ) else: frame_paths = self.video_processor.extract_frames( video_path, max_frames=num_frames, **kwargs ) logger.info(f"Extracted {len(frame_paths)} frames from video") # Analyze each frame frame_results = [] for idx, frame_path in enumerate(frame_paths): logger.info(f"Analyzing frame {idx + 1}/{len(frame_paths)}") # Process frame image = self.image_processor.process(frame_path, resize=True) # Execute plugins on frame plugins_used = self._execute_plugins( image, frame_path, plugins, media_type="video_frame" ) # Get frame results frame_result = { "frame_index": idx, "frame_path": str(frame_path.name), "results": dict(self.result_manager.results) } frame_results.append(frame_result) # Clear for next frame self.result_manager.results.clear() # Aggregate frame results aggregated = self._aggregate_video_results(frame_results) # Set aggregated results self.result_manager.results = aggregated # Set processing metadata end_time = datetime.now() self.result_manager.set_processing_info( start_time=start_time, end_time=end_time, plugins_used=plugins_used ) # Add video-specific metadata self.result_manager.add_metadata({ "frames_analyzed": len(frame_paths), "extraction_method": extract_method, }) # Get final results results = self.result_manager.to_dict( include_metadata=config.INCLUDE_METADATA ) logger.info(f"Video analysis completed: {video_path.name} " f"({len(frame_paths)} frames, {len(plugins_used)} plugins)") return results except Exception as e: logger.error(f"Video analysis failed: {e}") raise DeepVisionError( f"Analysis failed for {video_path.name}: {str(e)}", {"path": str(video_path), "error": str(e)} ) def _execute_plugins( self, media, media_path: Path, plugin_names: Optional[List[str]] = None, media_type: str = "image" ) -> List[str]: """ Execute registered plugins on media. Args: media: Processed media (image or frame) media_path: Path to media file plugin_names: List of plugins to execute (None for all) media_type: Type of media being processed Returns: List of executed plugin names """ # Determine which plugins to execute if plugin_names is None: plugins_to_run = self.plugin_order else: plugins_to_run = [ p for p in self.plugin_order if p in plugin_names ] executed = [] for plugin_name in plugins_to_run: if plugin_name not in self.plugins: logger.warning(f"Plugin '{plugin_name}' not found, skipping") continue try: logger.debug(f"Executing plugin: {plugin_name}") plugin = self.plugins[plugin_name] # Execute plugin result = plugin.analyze(media, media_path) # Add result self.result_manager.add_result(plugin_name, result) executed.append(plugin_name) logger.debug(f"Plugin '{plugin_name}' completed successfully") except Exception as e: logger.error(f"Plugin '{plugin_name}' failed: {e}") # Add error to results self.result_manager.add_result( plugin_name, { "error": str(e), "status": "failed" } ) return executed def _aggregate_video_results( self, frame_results: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Aggregate results from multiple video frames. Args: frame_results: List of results from each frame Returns: Aggregated results dictionary """ aggregated = { "frames": frame_results, "summary": {} } # For each plugin, aggregate results across frames if not frame_results: return aggregated # Get plugin names from first frame first_frame = frame_results[0]["results"] for plugin_name in first_frame.keys(): plugin_summary = self._aggregate_plugin_results( plugin_name, [f["results"].get(plugin_name, {}) for f in frame_results] ) aggregated["summary"][plugin_name] = plugin_summary return aggregated def _aggregate_plugin_results( self, plugin_name: str, results: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Aggregate results for a specific plugin across frames. Args: plugin_name: Name of the plugin results: List of results from each frame Returns: Aggregated result for the plugin """ # Default aggregation: collect all unique values aggregated = { "frames_processed": len(results), } # Plugin-specific aggregation logic if plugin_name == "object_detector": all_objects = [] for result in results: all_objects.extend(result.get("objects", [])) # Count object occurrences object_counts = {} for obj in all_objects: name = obj["name"] object_counts[name] = object_counts.get(name, 0) + 1 aggregated["total_objects"] = len(all_objects) aggregated["unique_objects"] = len(object_counts) aggregated["object_frequency"] = object_counts elif plugin_name == "caption_generator": captions = [r.get("caption", "") for r in results if r.get("caption")] aggregated["captions"] = captions aggregated["caption_count"] = len(captions) elif plugin_name == "color_analyzer": all_colors = [] for result in results: all_colors.extend(result.get("dominant_colors", [])) # Get most frequent colors color_counts = {} for color in all_colors: name = color["name"] color_counts[name] = color_counts.get(name, 0) + 1 aggregated["color_frequency"] = color_counts return aggregated def analyze( self, file_path: Union[str, Path], **kwargs ) -> Dict[str, Any]: """ Automatically detect file type and analyze. Args: file_path: Path to image or video file **kwargs: Additional arguments Returns: Analysis results """ file_path = Path(file_path) # Detect file type ext = file_path.suffix.lower() if ext in config.ALLOWED_IMAGE_FORMATS: return self.analyze_image(file_path, **kwargs) elif ext in config.ALLOWED_VIDEO_FORMATS: return self.analyze_video(file_path, **kwargs) else: raise ValueError(f"Unsupported file format: {ext}") def __repr__(self) -> str: """Object representation.""" return (f"AnalysisEngine(plugins={len(self.plugins)}, " f"registered={self.get_registered_plugins()})")