Spaces:
Runtime error
Runtime error
| """ | |
| Result Manager Module | |
| Manages and aggregates results from multiple plugins, | |
| generates final JSON output with metadata. | |
| """ | |
| import json | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Dict, List, Any, Optional, Union | |
| from loguru import logger | |
| from core.config import config | |
| from core.exceptions import ResultError | |
| class ResultManager: | |
| """ | |
| Manage and aggregate analysis results. | |
| Collects results from multiple plugins, merges them, | |
| and generates structured JSON output. | |
| """ | |
| def __init__(self): | |
| """Initialize ResultManager.""" | |
| self.results: Dict[str, Any] = {} | |
| self.metadata: Dict[str, Any] = {} | |
| logger.info("ResultManager initialized") | |
| def add_result( | |
| self, | |
| plugin_name: str, | |
| result: Dict[str, Any], | |
| merge: bool = False | |
| ) -> None: | |
| """ | |
| Add result from a plugin. | |
| Args: | |
| plugin_name: Name of the plugin | |
| result: Result dictionary from plugin | |
| merge: Whether to merge with existing results | |
| """ | |
| if merge and plugin_name in self.results: | |
| # Merge with existing results | |
| self.results[plugin_name] = self._merge_dicts( | |
| self.results[plugin_name], | |
| result | |
| ) | |
| else: | |
| # Replace existing results | |
| self.results[plugin_name] = result | |
| logger.debug(f"Added result from plugin: {plugin_name}") | |
| def add_metadata(self, metadata: Dict[str, Any]) -> None: | |
| """ | |
| Add metadata to results. | |
| Args: | |
| metadata: Metadata dictionary | |
| """ | |
| self.metadata.update(metadata) | |
| logger.debug(f"Added metadata: {list(metadata.keys())}") | |
| def set_file_info( | |
| self, | |
| filename: str, | |
| file_type: str, | |
| file_size: int, | |
| **kwargs | |
| ) -> None: | |
| """ | |
| Set file information in metadata. | |
| Args: | |
| filename: Name of the file | |
| file_type: Type of file (image/video) | |
| file_size: Size of file in bytes | |
| **kwargs: Additional file information | |
| """ | |
| self.metadata["file"] = { | |
| "filename": filename, | |
| "type": file_type, | |
| "size": file_size, | |
| "size_mb": round(file_size / 1024 / 1024, 2), | |
| **kwargs | |
| } | |
| def set_processing_info( | |
| self, | |
| start_time: datetime, | |
| end_time: datetime, | |
| plugins_used: List[str] | |
| ) -> None: | |
| """ | |
| Set processing information in metadata. | |
| Args: | |
| start_time: Processing start time | |
| end_time: Processing end time | |
| plugins_used: List of plugin names used | |
| """ | |
| duration = (end_time - start_time).total_seconds() | |
| self.metadata["processing"] = { | |
| "start_time": start_time.isoformat(), | |
| "end_time": end_time.isoformat(), | |
| "duration_seconds": round(duration, 3), | |
| "plugins_used": plugins_used, | |
| "plugin_count": len(plugins_used), | |
| } | |
| def _merge_dicts(self, dict1: Dict, dict2: Dict) -> Dict: | |
| """ | |
| Deep merge two dictionaries. | |
| Args: | |
| dict1: First dictionary | |
| dict2: Second dictionary | |
| Returns: | |
| Merged dictionary | |
| """ | |
| result = dict1.copy() | |
| for key, value in dict2.items(): | |
| if key in result and isinstance(result[key], dict) and isinstance(value, dict): | |
| result[key] = self._merge_dicts(result[key], value) | |
| elif key in result and isinstance(result[key], list) and isinstance(value, list): | |
| result[key].extend(value) | |
| else: | |
| result[key] = value | |
| return result | |
| def merge_results(self, results_list: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Merge multiple result dictionaries. | |
| Args: | |
| results_list: List of result dictionaries | |
| Returns: | |
| Merged dictionary | |
| """ | |
| merged = {} | |
| for result in results_list: | |
| merged = self._merge_dicts(merged, result) | |
| return merged | |
| def get_result(self, plugin_name: Optional[str] = None) -> Union[Dict, Any]: | |
| """ | |
| Get result from specific plugin or all results. | |
| Args: | |
| plugin_name: Name of plugin (None for all results) | |
| Returns: | |
| Result dictionary or specific plugin result | |
| """ | |
| if plugin_name is None: | |
| return self.results | |
| return self.results.get(plugin_name) | |
| def to_dict(self, include_metadata: bool = True) -> Dict[str, Any]: | |
| """ | |
| Convert results to dictionary. | |
| Args: | |
| include_metadata: Whether to include metadata | |
| Returns: | |
| Complete results dictionary | |
| """ | |
| output = { | |
| "results": self.results, | |
| } | |
| if include_metadata and self.metadata: | |
| output["metadata"] = self.metadata | |
| # Add timestamp if not present | |
| if "timestamp" not in output.get("metadata", {}): | |
| if "metadata" not in output: | |
| output["metadata"] = {} | |
| output["metadata"]["timestamp"] = datetime.now().isoformat() | |
| # Add version | |
| output["metadata"]["version"] = config.APP_VERSION | |
| return output | |
| def to_json( | |
| self, | |
| include_metadata: bool = True, | |
| pretty: bool = None, | |
| ensure_ascii: bool = False | |
| ) -> str: | |
| """ | |
| Convert results to JSON string. | |
| Args: | |
| include_metadata: Whether to include metadata | |
| pretty: Whether to format JSON (None uses config) | |
| ensure_ascii: Whether to escape non-ASCII characters | |
| Returns: | |
| JSON string | |
| """ | |
| if pretty is None: | |
| pretty = config.PRETTY_JSON | |
| data = self.to_dict(include_metadata=include_metadata) | |
| if pretty: | |
| json_str = json.dumps( | |
| data, | |
| indent=2, | |
| ensure_ascii=ensure_ascii, | |
| default=str | |
| ) | |
| else: | |
| json_str = json.dumps( | |
| data, | |
| ensure_ascii=ensure_ascii, | |
| default=str | |
| ) | |
| return json_str | |
| def save_json( | |
| self, | |
| output_path: Union[str, Path], | |
| include_metadata: bool = True, | |
| pretty: bool = None | |
| ) -> None: | |
| """ | |
| Save results to JSON file. | |
| Args: | |
| output_path: Path to output file | |
| include_metadata: Whether to include metadata | |
| pretty: Whether to format JSON | |
| """ | |
| try: | |
| output_path = Path(output_path) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| json_str = self.to_json( | |
| include_metadata=include_metadata, | |
| pretty=pretty | |
| ) | |
| output_path.write_text(json_str, encoding="utf-8") | |
| logger.info(f"Saved results to: {output_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to save JSON: {e}") | |
| raise ResultError( | |
| f"Cannot save results to file: {str(e)}", | |
| {"path": str(output_path), "error": str(e)} | |
| ) | |
| def generate_prompt(self) -> str: | |
| """ | |
| Generate a text prompt from results. | |
| Returns: | |
| Generated prompt string | |
| """ | |
| prompt_parts = [] | |
| # Add captions | |
| if "caption_generator" in self.results: | |
| caption = self.results["caption_generator"].get("caption", "") | |
| if caption: | |
| prompt_parts.append(caption) | |
| # Add objects | |
| if "object_detector" in self.results: | |
| objects = self.results["object_detector"].get("objects", []) | |
| if objects: | |
| object_names = [obj["name"] for obj in objects[:5]] | |
| prompt_parts.append(f"showing {', '.join(object_names)}") | |
| # Add colors | |
| if "color_analyzer" in self.results: | |
| colors = self.results["color_analyzer"].get("dominant_colors", []) | |
| if colors: | |
| color_names = [c["name"] for c in colors[:3]] | |
| prompt_parts.append(f"with {', '.join(color_names)} colors") | |
| # Add text | |
| if "text_extractor" in self.results: | |
| text = self.results["text_extractor"].get("text", "") | |
| if text: | |
| prompt_parts.append(f'containing text "{text[:50]}"') | |
| prompt = ", ".join(prompt_parts) | |
| return prompt.capitalize() if prompt else "No description available" | |
| def get_summary(self) -> Dict[str, Any]: | |
| """ | |
| Get summary of results. | |
| Returns: | |
| Summary dictionary | |
| """ | |
| summary = { | |
| "total_plugins": len(self.results), | |
| "plugins": list(self.results.keys()), | |
| } | |
| # Add plugin-specific summaries | |
| for plugin_name, result in self.results.items(): | |
| if plugin_name == "object_detector": | |
| summary["object_count"] = len(result.get("objects", [])) | |
| elif plugin_name == "caption_generator": | |
| summary["has_caption"] = bool(result.get("caption")) | |
| elif plugin_name == "color_analyzer": | |
| summary["color_count"] = len(result.get("dominant_colors", [])) | |
| elif plugin_name == "text_extractor": | |
| summary["has_text"] = bool(result.get("text")) | |
| return summary | |
| def clear(self) -> None: | |
| """Clear all results and metadata.""" | |
| self.results.clear() | |
| self.metadata.clear() | |
| logger.debug("Cleared all results") | |
| def __str__(self) -> str: | |
| """String representation.""" | |
| return self.to_json(pretty=True) | |
| def __repr__(self) -> str: | |
| """Object representation.""" | |
| return f"ResultManager(plugins={len(self.results)}, metadata={len(self.metadata)})" | |