Spaces:
Runtime error
Runtime error
| """ | |
| Core Analysis Engine | |
| Main orchestration engine for DeepVision Prompt Builder. | |
| Manages image/video processing, plugin execution, and result generation. | |
| """ | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional, Union | |
| from loguru import logger | |
| from core.config import config | |
| from core.image_processor import ImageProcessor | |
| from core.video_processor import VideoProcessor | |
| from core.result_manager import ResultManager | |
| from core.exceptions import DeepVisionError | |
| class AnalysisEngine: | |
| """ | |
| Main analysis engine for processing images and videos. | |
| Orchestrates the complete analysis pipeline: | |
| 1. File validation and preprocessing | |
| 2. Plugin execution | |
| 3. Result aggregation | |
| 4. JSON output generation | |
| """ | |
| def __init__(self): | |
| """Initialize AnalysisEngine.""" | |
| self.image_processor = ImageProcessor() | |
| self.video_processor = VideoProcessor() | |
| self.result_manager = ResultManager() | |
| self.plugins: Dict[str, Any] = {} | |
| self.plugin_order: List[str] = [] | |
| logger.info(f"AnalysisEngine initialized - {config.APP_NAME} v{config.APP_VERSION}") | |
| def register_plugin(self, plugin_name: str, plugin_instance: Any) -> None: | |
| """ | |
| Register a plugin for analysis. | |
| Args: | |
| plugin_name: Unique name for the plugin | |
| plugin_instance: Instance of the plugin class | |
| """ | |
| if plugin_name in self.plugins: | |
| logger.warning(f"Plugin '{plugin_name}' already registered, replacing") | |
| self.plugins[plugin_name] = plugin_instance | |
| # Maintain execution order | |
| if plugin_name not in self.plugin_order: | |
| self.plugin_order.append(plugin_name) | |
| logger.info(f"Registered plugin: {plugin_name}") | |
| def unregister_plugin(self, plugin_name: str) -> None: | |
| """ | |
| Unregister a plugin. | |
| Args: | |
| plugin_name: Name of plugin to remove | |
| """ | |
| if plugin_name in self.plugins: | |
| del self.plugins[plugin_name] | |
| if plugin_name in self.plugin_order: | |
| self.plugin_order.remove(plugin_name) | |
| logger.info(f"Unregistered plugin: {plugin_name}") | |
| def get_registered_plugins(self) -> List[str]: | |
| """ | |
| Get list of registered plugins. | |
| Returns: | |
| List of plugin names | |
| """ | |
| return list(self.plugins.keys()) | |
| def analyze_image( | |
| self, | |
| image_path: Union[str, Path], | |
| plugins: Optional[List[str]] = None, | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze a single image. | |
| Args: | |
| image_path: Path to image file | |
| plugins: List of plugin names to use (None for all) | |
| **kwargs: Additional arguments for processing | |
| Returns: | |
| Analysis results dictionary | |
| """ | |
| start_time = datetime.now() | |
| image_path = Path(image_path) | |
| logger.info(f"Starting image analysis: {image_path.name}") | |
| try: | |
| # Clear previous results | |
| self.result_manager.clear() | |
| # Process image | |
| image = self.image_processor.process( | |
| image_path, | |
| resize=kwargs.get("resize", True), | |
| normalize=kwargs.get("normalize", False) | |
| ) | |
| # Get image info | |
| image_info = self.image_processor.get_image_info(image_path) | |
| # Set file metadata | |
| self.result_manager.set_file_info( | |
| filename=image_info["filename"], | |
| file_type="image", | |
| file_size=image_info["file_size"], | |
| width=image_info["width"], | |
| height=image_info["height"], | |
| format=image_info["format"], | |
| hash=image_info["hash"], | |
| ) | |
| # Execute plugins | |
| plugins_used = self._execute_plugins( | |
| image, | |
| image_path, | |
| plugins, | |
| media_type="image" | |
| ) | |
| # Set processing metadata | |
| end_time = datetime.now() | |
| self.result_manager.set_processing_info( | |
| start_time=start_time, | |
| end_time=end_time, | |
| plugins_used=plugins_used | |
| ) | |
| # Get final results | |
| results = self.result_manager.to_dict( | |
| include_metadata=config.INCLUDE_METADATA | |
| ) | |
| logger.info(f"Image analysis completed: {image_path.name} " | |
| f"({len(plugins_used)} plugins)") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Image analysis failed: {e}") | |
| raise DeepVisionError( | |
| f"Analysis failed for {image_path.name}: {str(e)}", | |
| {"path": str(image_path), "error": str(e)} | |
| ) | |
| def analyze_video( | |
| self, | |
| video_path: Union[str, Path], | |
| plugins: Optional[List[str]] = None, | |
| extract_method: str = "keyframes", | |
| num_frames: int = 5, | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze a video by extracting and analyzing frames. | |
| Args: | |
| video_path: Path to video file | |
| plugins: List of plugin names to use | |
| extract_method: Frame extraction method ("fps" or "keyframes") | |
| num_frames: Number of frames to extract | |
| **kwargs: Additional arguments | |
| Returns: | |
| Analysis results dictionary | |
| """ | |
| start_time = datetime.now() | |
| video_path = Path(video_path) | |
| logger.info(f"Starting video analysis: {video_path.name}") | |
| try: | |
| # Clear previous results | |
| self.result_manager.clear() | |
| # Get video info | |
| video_info = self.video_processor.get_video_info(video_path) | |
| # Set file metadata | |
| self.result_manager.set_file_info( | |
| filename=video_info["filename"], | |
| file_type="video", | |
| file_size=video_info["file_size"], | |
| width=video_info["width"], | |
| height=video_info["height"], | |
| fps=video_info["fps"], | |
| duration=video_info["duration"], | |
| frame_count=video_info["frame_count"], | |
| ) | |
| # Extract frames | |
| if extract_method == "keyframes": | |
| frame_paths = self.video_processor.extract_key_frames( | |
| video_path, | |
| num_frames=num_frames | |
| ) | |
| else: | |
| frame_paths = self.video_processor.extract_frames( | |
| video_path, | |
| max_frames=num_frames, | |
| **kwargs | |
| ) | |
| logger.info(f"Extracted {len(frame_paths)} frames from video") | |
| # Analyze each frame | |
| frame_results = [] | |
| for idx, frame_path in enumerate(frame_paths): | |
| logger.info(f"Analyzing frame {idx + 1}/{len(frame_paths)}") | |
| # Process frame | |
| image = self.image_processor.process(frame_path, resize=True) | |
| # Execute plugins on frame | |
| plugins_used = self._execute_plugins( | |
| image, | |
| frame_path, | |
| plugins, | |
| media_type="video_frame" | |
| ) | |
| # Get frame results | |
| frame_result = { | |
| "frame_index": idx, | |
| "frame_path": str(frame_path.name), | |
| "results": dict(self.result_manager.results) | |
| } | |
| frame_results.append(frame_result) | |
| # Clear for next frame | |
| self.result_manager.results.clear() | |
| # Aggregate frame results | |
| aggregated = self._aggregate_video_results(frame_results) | |
| # Set aggregated results | |
| self.result_manager.results = aggregated | |
| # Set processing metadata | |
| end_time = datetime.now() | |
| self.result_manager.set_processing_info( | |
| start_time=start_time, | |
| end_time=end_time, | |
| plugins_used=plugins_used | |
| ) | |
| # Add video-specific metadata | |
| self.result_manager.add_metadata({ | |
| "frames_analyzed": len(frame_paths), | |
| "extraction_method": extract_method, | |
| }) | |
| # Get final results | |
| results = self.result_manager.to_dict( | |
| include_metadata=config.INCLUDE_METADATA | |
| ) | |
| logger.info(f"Video analysis completed: {video_path.name} " | |
| f"({len(frame_paths)} frames, {len(plugins_used)} plugins)") | |
| return results | |
| except Exception as e: | |
| logger.error(f"Video analysis failed: {e}") | |
| raise DeepVisionError( | |
| f"Analysis failed for {video_path.name}: {str(e)}", | |
| {"path": str(video_path), "error": str(e)} | |
| ) | |
| def _execute_plugins( | |
| self, | |
| media, | |
| media_path: Path, | |
| plugin_names: Optional[List[str]] = None, | |
| media_type: str = "image" | |
| ) -> List[str]: | |
| """ | |
| Execute registered plugins on media. | |
| Args: | |
| media: Processed media (image or frame) | |
| media_path: Path to media file | |
| plugin_names: List of plugins to execute (None for all) | |
| media_type: Type of media being processed | |
| Returns: | |
| List of executed plugin names | |
| """ | |
| # Determine which plugins to execute | |
| if plugin_names is None: | |
| plugins_to_run = self.plugin_order | |
| else: | |
| plugins_to_run = [ | |
| p for p in self.plugin_order if p in plugin_names | |
| ] | |
| executed = [] | |
| for plugin_name in plugins_to_run: | |
| if plugin_name not in self.plugins: | |
| logger.warning(f"Plugin '{plugin_name}' not found, skipping") | |
| continue | |
| try: | |
| logger.debug(f"Executing plugin: {plugin_name}") | |
| plugin = self.plugins[plugin_name] | |
| # Execute plugin | |
| result = plugin.analyze(media, media_path) | |
| # Add result | |
| self.result_manager.add_result(plugin_name, result) | |
| executed.append(plugin_name) | |
| logger.debug(f"Plugin '{plugin_name}' completed successfully") | |
| except Exception as e: | |
| logger.error(f"Plugin '{plugin_name}' failed: {e}") | |
| # Add error to results | |
| self.result_manager.add_result( | |
| plugin_name, | |
| { | |
| "error": str(e), | |
| "status": "failed" | |
| } | |
| ) | |
| return executed | |
| def _aggregate_video_results( | |
| self, | |
| frame_results: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Aggregate results from multiple video frames. | |
| Args: | |
| frame_results: List of results from each frame | |
| Returns: | |
| Aggregated results dictionary | |
| """ | |
| aggregated = { | |
| "frames": frame_results, | |
| "summary": {} | |
| } | |
| # For each plugin, aggregate results across frames | |
| if not frame_results: | |
| return aggregated | |
| # Get plugin names from first frame | |
| first_frame = frame_results[0]["results"] | |
| for plugin_name in first_frame.keys(): | |
| plugin_summary = self._aggregate_plugin_results( | |
| plugin_name, | |
| [f["results"].get(plugin_name, {}) for f in frame_results] | |
| ) | |
| aggregated["summary"][plugin_name] = plugin_summary | |
| return aggregated | |
| def _aggregate_plugin_results( | |
| self, | |
| plugin_name: str, | |
| results: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Aggregate results for a specific plugin across frames. | |
| Args: | |
| plugin_name: Name of the plugin | |
| results: List of results from each frame | |
| Returns: | |
| Aggregated result for the plugin | |
| """ | |
| # Default aggregation: collect all unique values | |
| aggregated = { | |
| "frames_processed": len(results), | |
| } | |
| # Plugin-specific aggregation logic | |
| if plugin_name == "object_detector": | |
| all_objects = [] | |
| for result in results: | |
| all_objects.extend(result.get("objects", [])) | |
| # Count object occurrences | |
| object_counts = {} | |
| for obj in all_objects: | |
| name = obj["name"] | |
| object_counts[name] = object_counts.get(name, 0) + 1 | |
| aggregated["total_objects"] = len(all_objects) | |
| aggregated["unique_objects"] = len(object_counts) | |
| aggregated["object_frequency"] = object_counts | |
| elif plugin_name == "caption_generator": | |
| captions = [r.get("caption", "") for r in results if r.get("caption")] | |
| aggregated["captions"] = captions | |
| aggregated["caption_count"] = len(captions) | |
| elif plugin_name == "color_analyzer": | |
| all_colors = [] | |
| for result in results: | |
| all_colors.extend(result.get("dominant_colors", [])) | |
| # Get most frequent colors | |
| color_counts = {} | |
| for color in all_colors: | |
| name = color["name"] | |
| color_counts[name] = color_counts.get(name, 0) + 1 | |
| aggregated["color_frequency"] = color_counts | |
| return aggregated | |
| def analyze( | |
| self, | |
| file_path: Union[str, Path], | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Automatically detect file type and analyze. | |
| Args: | |
| file_path: Path to image or video file | |
| **kwargs: Additional arguments | |
| Returns: | |
| Analysis results | |
| """ | |
| file_path = Path(file_path) | |
| # Detect file type | |
| ext = file_path.suffix.lower() | |
| if ext in config.ALLOWED_IMAGE_FORMATS: | |
| return self.analyze_image(file_path, **kwargs) | |
| elif ext in config.ALLOWED_VIDEO_FORMATS: | |
| return self.analyze_video(file_path, **kwargs) | |
| else: | |
| raise ValueError(f"Unsupported file format: {ext}") | |
| def __repr__(self) -> str: | |
| """Object representation.""" | |
| return (f"AnalysisEngine(plugins={len(self.plugins)}, " | |
| f"registered={self.get_registered_plugins()})") | |