|
|
""" |
|
|
Core Analysis Engine |
|
|
|
|
|
Main orchestration engine for DeepVision Prompt Builder. |
|
|
Manages image/video processing, plugin execution, and result generation. |
|
|
""" |
|
|
|
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Dict, List, Any, Optional, Union |
|
|
from loguru import logger |
|
|
|
|
|
from core.config import config |
|
|
from core.image_processor import ImageProcessor |
|
|
from core.video_processor import VideoProcessor |
|
|
from core.result_manager import ResultManager |
|
|
from core.exceptions import DeepVisionError |
|
|
|
|
|
|
|
|
class AnalysisEngine: |
|
|
""" |
|
|
Main analysis engine for processing images and videos. |
|
|
|
|
|
Orchestrates the complete analysis pipeline: |
|
|
1. File validation and preprocessing |
|
|
2. Plugin execution |
|
|
3. Result aggregation |
|
|
4. JSON output generation |
|
|
""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize AnalysisEngine.""" |
|
|
self.image_processor = ImageProcessor() |
|
|
self.video_processor = VideoProcessor() |
|
|
self.result_manager = ResultManager() |
|
|
self.plugins: Dict[str, Any] = {} |
|
|
self.plugin_order: List[str] = [] |
|
|
|
|
|
logger.info(f"AnalysisEngine initialized - {config.APP_NAME} v{config.APP_VERSION}") |
|
|
|
|
|
def register_plugin(self, plugin_name: str, plugin_instance: Any) -> None: |
|
|
""" |
|
|
Register a plugin for analysis. |
|
|
|
|
|
Args: |
|
|
plugin_name: Unique name for the plugin |
|
|
plugin_instance: Instance of the plugin class |
|
|
""" |
|
|
if plugin_name in self.plugins: |
|
|
logger.warning(f"Plugin '{plugin_name}' already registered, replacing") |
|
|
|
|
|
self.plugins[plugin_name] = plugin_instance |
|
|
|
|
|
|
|
|
if plugin_name not in self.plugin_order: |
|
|
self.plugin_order.append(plugin_name) |
|
|
|
|
|
logger.info(f"Registered plugin: {plugin_name}") |
|
|
|
|
|
def unregister_plugin(self, plugin_name: str) -> None: |
|
|
""" |
|
|
Unregister a plugin. |
|
|
|
|
|
Args: |
|
|
plugin_name: Name of plugin to remove |
|
|
""" |
|
|
if plugin_name in self.plugins: |
|
|
del self.plugins[plugin_name] |
|
|
|
|
|
if plugin_name in self.plugin_order: |
|
|
self.plugin_order.remove(plugin_name) |
|
|
|
|
|
logger.info(f"Unregistered plugin: {plugin_name}") |
|
|
|
|
|
def get_registered_plugins(self) -> List[str]: |
|
|
""" |
|
|
Get list of registered plugins. |
|
|
|
|
|
Returns: |
|
|
List of plugin names |
|
|
""" |
|
|
return list(self.plugins.keys()) |
|
|
|
|
|
def analyze_image( |
|
|
self, |
|
|
image_path: Union[str, Path], |
|
|
plugins: Optional[List[str]] = None, |
|
|
**kwargs |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Analyze a single image. |
|
|
|
|
|
Args: |
|
|
image_path: Path to image file |
|
|
plugins: List of plugin names to use (None for all) |
|
|
**kwargs: Additional arguments for processing |
|
|
|
|
|
Returns: |
|
|
Analysis results dictionary |
|
|
""" |
|
|
start_time = datetime.now() |
|
|
image_path = Path(image_path) |
|
|
|
|
|
logger.info(f"Starting image analysis: {image_path.name}") |
|
|
|
|
|
try: |
|
|
|
|
|
self.result_manager.clear() |
|
|
|
|
|
|
|
|
image = self.image_processor.process( |
|
|
image_path, |
|
|
resize=kwargs.get("resize", True), |
|
|
normalize=kwargs.get("normalize", False) |
|
|
) |
|
|
|
|
|
|
|
|
image_info = self.image_processor.get_image_info(image_path) |
|
|
|
|
|
|
|
|
self.result_manager.set_file_info( |
|
|
filename=image_info["filename"], |
|
|
file_type="image", |
|
|
file_size=image_info["file_size"], |
|
|
width=image_info["width"], |
|
|
height=image_info["height"], |
|
|
format=image_info["format"], |
|
|
hash=image_info["hash"], |
|
|
) |
|
|
|
|
|
|
|
|
plugins_used = self._execute_plugins( |
|
|
image, |
|
|
image_path, |
|
|
plugins, |
|
|
media_type="image" |
|
|
) |
|
|
|
|
|
|
|
|
end_time = datetime.now() |
|
|
self.result_manager.set_processing_info( |
|
|
start_time=start_time, |
|
|
end_time=end_time, |
|
|
plugins_used=plugins_used |
|
|
) |
|
|
|
|
|
|
|
|
results = self.result_manager.to_dict( |
|
|
include_metadata=config.INCLUDE_METADATA |
|
|
) |
|
|
|
|
|
logger.info(f"Image analysis completed: {image_path.name} " |
|
|
f"({len(plugins_used)} plugins)") |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Image analysis failed: {e}") |
|
|
raise DeepVisionError( |
|
|
f"Analysis failed for {image_path.name}: {str(e)}", |
|
|
{"path": str(image_path), "error": str(e)} |
|
|
) |
|
|
|
|
|
def analyze_video( |
|
|
self, |
|
|
video_path: Union[str, Path], |
|
|
plugins: Optional[List[str]] = None, |
|
|
extract_method: str = "keyframes", |
|
|
num_frames: int = 5, |
|
|
**kwargs |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Analyze a video by extracting and analyzing frames. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
plugins: List of plugin names to use |
|
|
extract_method: Frame extraction method ("fps" or "keyframes") |
|
|
num_frames: Number of frames to extract |
|
|
**kwargs: Additional arguments |
|
|
|
|
|
Returns: |
|
|
Analysis results dictionary |
|
|
""" |
|
|
start_time = datetime.now() |
|
|
video_path = Path(video_path) |
|
|
|
|
|
logger.info(f"Starting video analysis: {video_path.name}") |
|
|
|
|
|
try: |
|
|
|
|
|
self.result_manager.clear() |
|
|
|
|
|
|
|
|
video_info = self.video_processor.get_video_info(video_path) |
|
|
|
|
|
|
|
|
self.result_manager.set_file_info( |
|
|
filename=video_info["filename"], |
|
|
file_type="video", |
|
|
file_size=video_info["file_size"], |
|
|
width=video_info["width"], |
|
|
height=video_info["height"], |
|
|
fps=video_info["fps"], |
|
|
duration=video_info["duration"], |
|
|
frame_count=video_info["frame_count"], |
|
|
) |
|
|
|
|
|
|
|
|
if extract_method == "keyframes": |
|
|
frame_paths = self.video_processor.extract_key_frames( |
|
|
video_path, |
|
|
num_frames=num_frames |
|
|
) |
|
|
else: |
|
|
frame_paths = self.video_processor.extract_frames( |
|
|
video_path, |
|
|
max_frames=num_frames, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
logger.info(f"Extracted {len(frame_paths)} frames from video") |
|
|
|
|
|
|
|
|
frame_results = [] |
|
|
for idx, frame_path in enumerate(frame_paths): |
|
|
logger.info(f"Analyzing frame {idx + 1}/{len(frame_paths)}") |
|
|
|
|
|
|
|
|
image = self.image_processor.process(frame_path, resize=True) |
|
|
|
|
|
|
|
|
plugins_used = self._execute_plugins( |
|
|
image, |
|
|
frame_path, |
|
|
plugins, |
|
|
media_type="video_frame" |
|
|
) |
|
|
|
|
|
|
|
|
frame_result = { |
|
|
"frame_index": idx, |
|
|
"frame_path": str(frame_path.name), |
|
|
"results": dict(self.result_manager.results) |
|
|
} |
|
|
frame_results.append(frame_result) |
|
|
|
|
|
|
|
|
self.result_manager.results.clear() |
|
|
|
|
|
|
|
|
aggregated = self._aggregate_video_results(frame_results) |
|
|
|
|
|
|
|
|
self.result_manager.results = aggregated |
|
|
|
|
|
|
|
|
end_time = datetime.now() |
|
|
self.result_manager.set_processing_info( |
|
|
start_time=start_time, |
|
|
end_time=end_time, |
|
|
plugins_used=plugins_used |
|
|
) |
|
|
|
|
|
|
|
|
self.result_manager.add_metadata({ |
|
|
"frames_analyzed": len(frame_paths), |
|
|
"extraction_method": extract_method, |
|
|
}) |
|
|
|
|
|
|
|
|
results = self.result_manager.to_dict( |
|
|
include_metadata=config.INCLUDE_METADATA |
|
|
) |
|
|
|
|
|
logger.info(f"Video analysis completed: {video_path.name} " |
|
|
f"({len(frame_paths)} frames, {len(plugins_used)} plugins)") |
|
|
|
|
|
return results |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video analysis failed: {e}") |
|
|
raise DeepVisionError( |
|
|
f"Analysis failed for {video_path.name}: {str(e)}", |
|
|
{"path": str(video_path), "error": str(e)} |
|
|
) |
|
|
|
|
|
def _execute_plugins( |
|
|
self, |
|
|
media, |
|
|
media_path: Path, |
|
|
plugin_names: Optional[List[str]] = None, |
|
|
media_type: str = "image" |
|
|
) -> List[str]: |
|
|
""" |
|
|
Execute registered plugins on media. |
|
|
|
|
|
Args: |
|
|
media: Processed media (image or frame) |
|
|
media_path: Path to media file |
|
|
plugin_names: List of plugins to execute (None for all) |
|
|
media_type: Type of media being processed |
|
|
|
|
|
Returns: |
|
|
List of executed plugin names |
|
|
""" |
|
|
|
|
|
if plugin_names is None: |
|
|
plugins_to_run = self.plugin_order |
|
|
else: |
|
|
plugins_to_run = [ |
|
|
p for p in self.plugin_order if p in plugin_names |
|
|
] |
|
|
|
|
|
executed = [] |
|
|
|
|
|
for plugin_name in plugins_to_run: |
|
|
if plugin_name not in self.plugins: |
|
|
logger.warning(f"Plugin '{plugin_name}' not found, skipping") |
|
|
continue |
|
|
|
|
|
try: |
|
|
logger.debug(f"Executing plugin: {plugin_name}") |
|
|
|
|
|
plugin = self.plugins[plugin_name] |
|
|
|
|
|
|
|
|
result = plugin.analyze(media, media_path) |
|
|
|
|
|
|
|
|
self.result_manager.add_result(plugin_name, result) |
|
|
|
|
|
executed.append(plugin_name) |
|
|
|
|
|
logger.debug(f"Plugin '{plugin_name}' completed successfully") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Plugin '{plugin_name}' failed: {e}") |
|
|
|
|
|
|
|
|
self.result_manager.add_result( |
|
|
plugin_name, |
|
|
{ |
|
|
"error": str(e), |
|
|
"status": "failed" |
|
|
} |
|
|
) |
|
|
|
|
|
return executed |
|
|
|
|
|
def _aggregate_video_results( |
|
|
self, |
|
|
frame_results: List[Dict[str, Any]] |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Aggregate results from multiple video frames. |
|
|
|
|
|
Args: |
|
|
frame_results: List of results from each frame |
|
|
|
|
|
Returns: |
|
|
Aggregated results dictionary |
|
|
""" |
|
|
aggregated = { |
|
|
"frames": frame_results, |
|
|
"summary": {} |
|
|
} |
|
|
|
|
|
|
|
|
if not frame_results: |
|
|
return aggregated |
|
|
|
|
|
|
|
|
first_frame = frame_results[0]["results"] |
|
|
|
|
|
for plugin_name in first_frame.keys(): |
|
|
plugin_summary = self._aggregate_plugin_results( |
|
|
plugin_name, |
|
|
[f["results"].get(plugin_name, {}) for f in frame_results] |
|
|
) |
|
|
aggregated["summary"][plugin_name] = plugin_summary |
|
|
|
|
|
return aggregated |
|
|
|
|
|
def _aggregate_plugin_results( |
|
|
self, |
|
|
plugin_name: str, |
|
|
results: List[Dict[str, Any]] |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Aggregate results for a specific plugin across frames. |
|
|
|
|
|
Args: |
|
|
plugin_name: Name of the plugin |
|
|
results: List of results from each frame |
|
|
|
|
|
Returns: |
|
|
Aggregated result for the plugin |
|
|
""" |
|
|
|
|
|
aggregated = { |
|
|
"frames_processed": len(results), |
|
|
} |
|
|
|
|
|
|
|
|
if plugin_name == "object_detector": |
|
|
all_objects = [] |
|
|
for result in results: |
|
|
all_objects.extend(result.get("objects", [])) |
|
|
|
|
|
|
|
|
object_counts = {} |
|
|
for obj in all_objects: |
|
|
name = obj["name"] |
|
|
object_counts[name] = object_counts.get(name, 0) + 1 |
|
|
|
|
|
aggregated["total_objects"] = len(all_objects) |
|
|
aggregated["unique_objects"] = len(object_counts) |
|
|
aggregated["object_frequency"] = object_counts |
|
|
|
|
|
elif plugin_name == "caption_generator": |
|
|
captions = [r.get("caption", "") for r in results if r.get("caption")] |
|
|
aggregated["captions"] = captions |
|
|
aggregated["caption_count"] = len(captions) |
|
|
|
|
|
elif plugin_name == "color_analyzer": |
|
|
all_colors = [] |
|
|
for result in results: |
|
|
all_colors.extend(result.get("dominant_colors", [])) |
|
|
|
|
|
|
|
|
color_counts = {} |
|
|
for color in all_colors: |
|
|
name = color["name"] |
|
|
color_counts[name] = color_counts.get(name, 0) + 1 |
|
|
|
|
|
aggregated["color_frequency"] = color_counts |
|
|
|
|
|
return aggregated |
|
|
|
|
|
def analyze( |
|
|
self, |
|
|
file_path: Union[str, Path], |
|
|
**kwargs |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
Automatically detect file type and analyze. |
|
|
|
|
|
Args: |
|
|
file_path: Path to image or video file |
|
|
**kwargs: Additional arguments |
|
|
|
|
|
Returns: |
|
|
Analysis results |
|
|
""" |
|
|
file_path = Path(file_path) |
|
|
|
|
|
|
|
|
ext = file_path.suffix.lower() |
|
|
|
|
|
if ext in config.ALLOWED_IMAGE_FORMATS: |
|
|
return self.analyze_image(file_path, **kwargs) |
|
|
elif ext in config.ALLOWED_VIDEO_FORMATS: |
|
|
return self.analyze_video(file_path, **kwargs) |
|
|
else: |
|
|
raise ValueError(f"Unsupported file format: {ext}") |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
"""Object representation.""" |
|
|
return (f"AnalysisEngine(plugins={len(self.plugins)}, " |
|
|
f"registered={self.get_registered_plugins()})") |
|
|
|