Salman Abjam
Initial deployment: DeepVision Prompt Builder v0.1.0
eb5a9e1
"""
Core Analysis Engine
Main orchestration engine for DeepVision Prompt Builder.
Manages image/video processing, plugin execution, and result generation.
"""
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional, Union
from loguru import logger
from core.config import config
from core.image_processor import ImageProcessor
from core.video_processor import VideoProcessor
from core.result_manager import ResultManager
from core.exceptions import DeepVisionError
class AnalysisEngine:
"""
Main analysis engine for processing images and videos.
Orchestrates the complete analysis pipeline:
1. File validation and preprocessing
2. Plugin execution
3. Result aggregation
4. JSON output generation
"""
def __init__(self):
"""Initialize AnalysisEngine."""
self.image_processor = ImageProcessor()
self.video_processor = VideoProcessor()
self.result_manager = ResultManager()
self.plugins: Dict[str, Any] = {}
self.plugin_order: List[str] = []
logger.info(f"AnalysisEngine initialized - {config.APP_NAME} v{config.APP_VERSION}")
def register_plugin(self, plugin_name: str, plugin_instance: Any) -> None:
"""
Register a plugin for analysis.
Args:
plugin_name: Unique name for the plugin
plugin_instance: Instance of the plugin class
"""
if plugin_name in self.plugins:
logger.warning(f"Plugin '{plugin_name}' already registered, replacing")
self.plugins[plugin_name] = plugin_instance
# Maintain execution order
if plugin_name not in self.plugin_order:
self.plugin_order.append(plugin_name)
logger.info(f"Registered plugin: {plugin_name}")
def unregister_plugin(self, plugin_name: str) -> None:
"""
Unregister a plugin.
Args:
plugin_name: Name of plugin to remove
"""
if plugin_name in self.plugins:
del self.plugins[plugin_name]
if plugin_name in self.plugin_order:
self.plugin_order.remove(plugin_name)
logger.info(f"Unregistered plugin: {plugin_name}")
def get_registered_plugins(self) -> List[str]:
"""
Get list of registered plugins.
Returns:
List of plugin names
"""
return list(self.plugins.keys())
def analyze_image(
self,
image_path: Union[str, Path],
plugins: Optional[List[str]] = None,
**kwargs
) -> Dict[str, Any]:
"""
Analyze a single image.
Args:
image_path: Path to image file
plugins: List of plugin names to use (None for all)
**kwargs: Additional arguments for processing
Returns:
Analysis results dictionary
"""
start_time = datetime.now()
image_path = Path(image_path)
logger.info(f"Starting image analysis: {image_path.name}")
try:
# Clear previous results
self.result_manager.clear()
# Process image
image = self.image_processor.process(
image_path,
resize=kwargs.get("resize", True),
normalize=kwargs.get("normalize", False)
)
# Get image info
image_info = self.image_processor.get_image_info(image_path)
# Set file metadata
self.result_manager.set_file_info(
filename=image_info["filename"],
file_type="image",
file_size=image_info["file_size"],
width=image_info["width"],
height=image_info["height"],
format=image_info["format"],
hash=image_info["hash"],
)
# Execute plugins
plugins_used = self._execute_plugins(
image,
image_path,
plugins,
media_type="image"
)
# Set processing metadata
end_time = datetime.now()
self.result_manager.set_processing_info(
start_time=start_time,
end_time=end_time,
plugins_used=plugins_used
)
# Get final results
results = self.result_manager.to_dict(
include_metadata=config.INCLUDE_METADATA
)
logger.info(f"Image analysis completed: {image_path.name} "
f"({len(plugins_used)} plugins)")
return results
except Exception as e:
logger.error(f"Image analysis failed: {e}")
raise DeepVisionError(
f"Analysis failed for {image_path.name}: {str(e)}",
{"path": str(image_path), "error": str(e)}
)
def analyze_video(
self,
video_path: Union[str, Path],
plugins: Optional[List[str]] = None,
extract_method: str = "keyframes",
num_frames: int = 5,
**kwargs
) -> Dict[str, Any]:
"""
Analyze a video by extracting and analyzing frames.
Args:
video_path: Path to video file
plugins: List of plugin names to use
extract_method: Frame extraction method ("fps" or "keyframes")
num_frames: Number of frames to extract
**kwargs: Additional arguments
Returns:
Analysis results dictionary
"""
start_time = datetime.now()
video_path = Path(video_path)
logger.info(f"Starting video analysis: {video_path.name}")
try:
# Clear previous results
self.result_manager.clear()
# Get video info
video_info = self.video_processor.get_video_info(video_path)
# Set file metadata
self.result_manager.set_file_info(
filename=video_info["filename"],
file_type="video",
file_size=video_info["file_size"],
width=video_info["width"],
height=video_info["height"],
fps=video_info["fps"],
duration=video_info["duration"],
frame_count=video_info["frame_count"],
)
# Extract frames
if extract_method == "keyframes":
frame_paths = self.video_processor.extract_key_frames(
video_path,
num_frames=num_frames
)
else:
frame_paths = self.video_processor.extract_frames(
video_path,
max_frames=num_frames,
**kwargs
)
logger.info(f"Extracted {len(frame_paths)} frames from video")
# Analyze each frame
frame_results = []
for idx, frame_path in enumerate(frame_paths):
logger.info(f"Analyzing frame {idx + 1}/{len(frame_paths)}")
# Process frame
image = self.image_processor.process(frame_path, resize=True)
# Execute plugins on frame
plugins_used = self._execute_plugins(
image,
frame_path,
plugins,
media_type="video_frame"
)
# Get frame results
frame_result = {
"frame_index": idx,
"frame_path": str(frame_path.name),
"results": dict(self.result_manager.results)
}
frame_results.append(frame_result)
# Clear for next frame
self.result_manager.results.clear()
# Aggregate frame results
aggregated = self._aggregate_video_results(frame_results)
# Set aggregated results
self.result_manager.results = aggregated
# Set processing metadata
end_time = datetime.now()
self.result_manager.set_processing_info(
start_time=start_time,
end_time=end_time,
plugins_used=plugins_used
)
# Add video-specific metadata
self.result_manager.add_metadata({
"frames_analyzed": len(frame_paths),
"extraction_method": extract_method,
})
# Get final results
results = self.result_manager.to_dict(
include_metadata=config.INCLUDE_METADATA
)
logger.info(f"Video analysis completed: {video_path.name} "
f"({len(frame_paths)} frames, {len(plugins_used)} plugins)")
return results
except Exception as e:
logger.error(f"Video analysis failed: {e}")
raise DeepVisionError(
f"Analysis failed for {video_path.name}: {str(e)}",
{"path": str(video_path), "error": str(e)}
)
def _execute_plugins(
self,
media,
media_path: Path,
plugin_names: Optional[List[str]] = None,
media_type: str = "image"
) -> List[str]:
"""
Execute registered plugins on media.
Args:
media: Processed media (image or frame)
media_path: Path to media file
plugin_names: List of plugins to execute (None for all)
media_type: Type of media being processed
Returns:
List of executed plugin names
"""
# Determine which plugins to execute
if plugin_names is None:
plugins_to_run = self.plugin_order
else:
plugins_to_run = [
p for p in self.plugin_order if p in plugin_names
]
executed = []
for plugin_name in plugins_to_run:
if plugin_name not in self.plugins:
logger.warning(f"Plugin '{plugin_name}' not found, skipping")
continue
try:
logger.debug(f"Executing plugin: {plugin_name}")
plugin = self.plugins[plugin_name]
# Execute plugin
result = plugin.analyze(media, media_path)
# Add result
self.result_manager.add_result(plugin_name, result)
executed.append(plugin_name)
logger.debug(f"Plugin '{plugin_name}' completed successfully")
except Exception as e:
logger.error(f"Plugin '{plugin_name}' failed: {e}")
# Add error to results
self.result_manager.add_result(
plugin_name,
{
"error": str(e),
"status": "failed"
}
)
return executed
def _aggregate_video_results(
self,
frame_results: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Aggregate results from multiple video frames.
Args:
frame_results: List of results from each frame
Returns:
Aggregated results dictionary
"""
aggregated = {
"frames": frame_results,
"summary": {}
}
# For each plugin, aggregate results across frames
if not frame_results:
return aggregated
# Get plugin names from first frame
first_frame = frame_results[0]["results"]
for plugin_name in first_frame.keys():
plugin_summary = self._aggregate_plugin_results(
plugin_name,
[f["results"].get(plugin_name, {}) for f in frame_results]
)
aggregated["summary"][plugin_name] = plugin_summary
return aggregated
def _aggregate_plugin_results(
self,
plugin_name: str,
results: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
Aggregate results for a specific plugin across frames.
Args:
plugin_name: Name of the plugin
results: List of results from each frame
Returns:
Aggregated result for the plugin
"""
# Default aggregation: collect all unique values
aggregated = {
"frames_processed": len(results),
}
# Plugin-specific aggregation logic
if plugin_name == "object_detector":
all_objects = []
for result in results:
all_objects.extend(result.get("objects", []))
# Count object occurrences
object_counts = {}
for obj in all_objects:
name = obj["name"]
object_counts[name] = object_counts.get(name, 0) + 1
aggregated["total_objects"] = len(all_objects)
aggregated["unique_objects"] = len(object_counts)
aggregated["object_frequency"] = object_counts
elif plugin_name == "caption_generator":
captions = [r.get("caption", "") for r in results if r.get("caption")]
aggregated["captions"] = captions
aggregated["caption_count"] = len(captions)
elif plugin_name == "color_analyzer":
all_colors = []
for result in results:
all_colors.extend(result.get("dominant_colors", []))
# Get most frequent colors
color_counts = {}
for color in all_colors:
name = color["name"]
color_counts[name] = color_counts.get(name, 0) + 1
aggregated["color_frequency"] = color_counts
return aggregated
def analyze(
self,
file_path: Union[str, Path],
**kwargs
) -> Dict[str, Any]:
"""
Automatically detect file type and analyze.
Args:
file_path: Path to image or video file
**kwargs: Additional arguments
Returns:
Analysis results
"""
file_path = Path(file_path)
# Detect file type
ext = file_path.suffix.lower()
if ext in config.ALLOWED_IMAGE_FORMATS:
return self.analyze_image(file_path, **kwargs)
elif ext in config.ALLOWED_VIDEO_FORMATS:
return self.analyze_video(file_path, **kwargs)
else:
raise ValueError(f"Unsupported file format: {ext}")
def __repr__(self) -> str:
"""Object representation."""
return (f"AnalysisEngine(plugins={len(self.plugins)}, "
f"registered={self.get_registered_plugins()})")