from dataclasses import dataclass, field from typing import Any, Dict, List, Tuple, Optional import logging from datetime import datetime from open_storyline.utils.logging import get_logger @dataclass class LogEntry: """Single log entry""" level: str message: str timestamp: str artifact_id: Optional[str] = None extra_data: Dict[str, Any] = field(default_factory=dict) @dataclass class NodeSummary: """ Node Execution Status Summary - Reuses existing logger module Features: 1. ERROR - Error messages for LLM 2. WARNING - Warning messages for LLM 3. DEBUG - Debug information for developers 4. INFO_LLM - Detailed information for LLM 5. INFO_USER - Brief information for users Capabilities: - Reuses get_logger() configuration - Hierarchical log storage and extraction - Colored console output - Log compression functionality - Supports artifact tracking """ ERROR: str = "ERROR" DEBUG: str = "DEBUG" WARNING: str = "WARNING" INFO_LLM: str = "INFO_LLM" INFO_USER: str = "INFO_USER" LOGGER_LEVELS: Tuple[str, ...] = (ERROR, DEBUG, WARNING, INFO_LLM, INFO_USER) # Log storage log_error: List[LogEntry] = field(default_factory=list) log_warn: List[LogEntry] = field(default_factory=list) log_info_llm: List[LogEntry] = field(default_factory=list) log_info_user: List[LogEntry] = field(default_factory=list) log_debug: List[LogEntry] = field(default_factory=list) # Artifact mapping artifact_warnings: Dict[str, List[str]] = field(default_factory=dict) artifact_errors: Dict[str, List[str]] = field(default_factory=dict) # Configuration options logger_name: Optional[str] = field(default=None) auto_console: bool = field(default=True) # Auto output to console summary_levels: Optional[List[str]] = field(default=None) # Internal state _logger: Optional[logging.Logger] = field(default=None, init=False, repr=False) def __post_init__(self): """Initialize logger - reuses get_logger""" if self.logger_name is None: self.logger_name = "NodeSummary" self._logger = get_logger(self.logger_name) if self.summary_levels is None: self.summary_levels = [self.ERROR, self.WARNING, self.INFO_LLM, self.INFO_USER] def _log_to_console(self, level: int, message: str, artifact_id: Optional[str] = None): """Output to console (using configured logger)""" if not self.auto_console: return prefix = f"[ARTIFACT:{artifact_id}] " if artifact_id else "" self._logger.log(level, f"{prefix}{message}") def add_error(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): """Log error messages - for LLM""" entry = LogEntry( level=self.ERROR, message=message, timestamp=datetime.now().isoformat(), artifact_id=artifact_id, extra_data=kwargs ) self.log_error.append(entry) if artifact_id: self.artifact_errors.setdefault(artifact_id, []).append(message) self._log_to_console(logging.ERROR, message, artifact_id) def add_warning(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): """Log warning messages - for LLM""" entry = LogEntry( level="WARNING", message=message, timestamp=datetime.now().isoformat(), artifact_id=artifact_id, extra_data=kwargs ) self.log_warn.append(entry) if artifact_id: self.artifact_warnings.setdefault(artifact_id, []).append(message) self._log_to_console(logging.WARNING, message, artifact_id) def info_for_llm(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): """Log detailed information - for LLM""" entry = LogEntry( level="INFO_LLM", message=message, timestamp=datetime.now().isoformat(), artifact_id=artifact_id, extra_data=kwargs ) self.log_info_llm.append(entry) self._log_to_console(logging.INFO, f"[{self.INFO_LLM}] {message}", artifact_id) def info_for_user(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): """Log general information - for users""" entry = LogEntry( level="INFO_USER", message=message, timestamp=datetime.now().isoformat(), artifact_id=artifact_id, extra_data=kwargs ) self.log_info_user.append(entry) self._log_to_console(logging.INFO, f"[{self.INFO_USER}] {message}", artifact_id) def debug_for_dev(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): """Log debug information - for developers""" entry = LogEntry( level=self.DEBUG, message=message, timestamp=datetime.now().isoformat(), artifact_id=artifact_id, extra_data=kwargs ) self.log_debug.append(entry) self._log_to_console(logging.DEBUG, f"[{self.DEBUG}] {message}", artifact_id) def get_logs_by_level( self, level:str , compress_log: bool=False, # 暂时未实现 TODO ) -> Dict[str,Any]: self.all_logs = { self.ERROR: self.log_error, self.DEBUG: self.log_debug, self.WARNING: self.log_warn, self.INFO_LLM: self.log_info_llm, self.INFO_USER: self.log_info_user } selected_log = self.all_logs[level] return self._extract_log(selected_log) def _extract_log( self, log_content: List[LogEntry], ) -> Dict[str,Any]: """ Extract log content into string format. Args: log_content: List of log entries Returns: Formatted log string with each log entry on a separate line """ if not log_content: return {} log_lines: List[str] = [] extra_data_list: List[Dict[str,Any]] = [] for entry in log_content: log_line = f"[{entry.timestamp}] {entry.message}" if entry.artifact_id: log_line += f" [artifact_id: {entry.artifact_id}]" log_lines.append(log_line) extra_data_list.append(entry.extra_data) result: Dict[str,Any] = { "log_lines": "\n".join(log_lines), "extra_data_list": extra_data_list } return result def _get_preview_urls( self, extra_data_list: List[Dict[str,Any]], ) -> List[str]: preview_urls: List[str] = [] for extra_data in extra_data_list: preview_urls.extend([str(url) for url in extra_data.get('preview_urls', [])]) return preview_urls def get_summary( self, artifact_id: str, compress_log: bool=True, **kwargs: Dict[str,Any], ) -> Dict[str,Any]: summary: Dict[str,Any] = {} preview_urls: List[str] = [] if self.summary_levels is None: return summary for level in self.summary_levels: summary_log = self.get_logs_by_level(level, compress_log) log_lines = summary_log.get('log_lines', "") extra_data_list = summary_log.get('extra_data_list', []) preview_urls.extend(self._get_preview_urls(extra_data_list)) summary[level] = log_lines summary['preview_urls'] = preview_urls summary['artifact_id'] = artifact_id return summary def clear(self): """Clear all logs""" self.log_error.clear() self.log_warn.clear() self.log_info_llm.clear() self.log_info_user.clear() self.log_debug.clear() self.artifact_warnings.clear() self.artifact_errors.clear()