| | from dataclasses import dataclass, field |
| | from typing import Any, Dict, List, Tuple, Optional |
| | import logging |
| | from datetime import datetime |
| |
|
| | from open_storyline.utils.logging import get_logger |
| |
|
| |
|
| | @dataclass |
| | class LogEntry: |
| | """Single log entry""" |
| | level: str |
| | message: str |
| | timestamp: str |
| | artifact_id: Optional[str] = None |
| | extra_data: Dict[str, Any] = field(default_factory=dict) |
| |
|
| | @dataclass |
| | class NodeSummary: |
| | """ |
| | Node Execution Status Summary - Reuses existing logger module |
| | |
| | Features: |
| | 1. ERROR - Error messages for LLM |
| | 2. WARNING - Warning messages for LLM |
| | 3. DEBUG - Debug information for developers |
| | 4. INFO_LLM - Detailed information for LLM |
| | 5. INFO_USER - Brief information for users |
| | |
| | Capabilities: |
| | - Reuses get_logger() configuration |
| | - Hierarchical log storage and extraction |
| | - Colored console output |
| | - Log compression functionality |
| | - Supports artifact tracking |
| | """ |
| | ERROR: str = "ERROR" |
| | DEBUG: str = "DEBUG" |
| | WARNING: str = "WARNING" |
| | INFO_LLM: str = "INFO_LLM" |
| | INFO_USER: str = "INFO_USER" |
| | LOGGER_LEVELS: Tuple[str, ...] = (ERROR, DEBUG, WARNING, INFO_LLM, INFO_USER) |
| |
|
| |
|
| | |
| | log_error: List[LogEntry] = field(default_factory=list) |
| | log_warn: List[LogEntry] = field(default_factory=list) |
| | log_info_llm: List[LogEntry] = field(default_factory=list) |
| | log_info_user: List[LogEntry] = field(default_factory=list) |
| | log_debug: List[LogEntry] = field(default_factory=list) |
| | |
| | |
| | artifact_warnings: Dict[str, List[str]] = field(default_factory=dict) |
| | artifact_errors: Dict[str, List[str]] = field(default_factory=dict) |
| | |
| | |
| | logger_name: Optional[str] = field(default=None) |
| | auto_console: bool = field(default=True) |
| | summary_levels: Optional[List[str]] = field(default=None) |
| |
|
| | |
| | _logger: Optional[logging.Logger] = field(default=None, init=False, repr=False) |
| |
|
| | def __post_init__(self): |
| | """Initialize logger - reuses get_logger""" |
| | if self.logger_name is None: |
| | self.logger_name = "NodeSummary" |
| | self._logger = get_logger(self.logger_name) |
| | if self.summary_levels is None: |
| | self.summary_levels = [self.ERROR, self.WARNING, self.INFO_LLM, self.INFO_USER] |
| |
|
| | def _log_to_console(self, level: int, message: str, artifact_id: Optional[str] = None): |
| | """Output to console (using configured logger)""" |
| | if not self.auto_console: |
| | return |
| | |
| | prefix = f"[ARTIFACT:{artifact_id}] " if artifact_id else "" |
| | self._logger.log(level, f"{prefix}{message}") |
| | |
| | def add_error(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| | """Log error messages - for LLM""" |
| | entry = LogEntry( |
| | level=self.ERROR, |
| | message=message, |
| | timestamp=datetime.now().isoformat(), |
| | artifact_id=artifact_id, |
| | extra_data=kwargs |
| | ) |
| | self.log_error.append(entry) |
| | |
| | if artifact_id: |
| | self.artifact_errors.setdefault(artifact_id, []).append(message) |
| | |
| | self._log_to_console(logging.ERROR, message, artifact_id) |
| | |
| | def add_warning(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| | """Log warning messages - for LLM""" |
| | entry = LogEntry( |
| | level="WARNING", |
| | message=message, |
| | timestamp=datetime.now().isoformat(), |
| | artifact_id=artifact_id, |
| | extra_data=kwargs |
| | ) |
| | self.log_warn.append(entry) |
| | |
| | if artifact_id: |
| | self.artifact_warnings.setdefault(artifact_id, []).append(message) |
| | |
| | self._log_to_console(logging.WARNING, message, artifact_id) |
| | |
| | def info_for_llm(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| | """Log detailed information - for LLM""" |
| | entry = LogEntry( |
| | level="INFO_LLM", |
| | message=message, |
| | timestamp=datetime.now().isoformat(), |
| | artifact_id=artifact_id, |
| | extra_data=kwargs |
| | ) |
| | self.log_info_llm.append(entry) |
| | self._log_to_console(logging.INFO, f"[{self.INFO_LLM}] {message}", artifact_id) |
| | |
| | def info_for_user(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| | """Log general information - for users""" |
| | entry = LogEntry( |
| | level="INFO_USER", |
| | message=message, |
| | timestamp=datetime.now().isoformat(), |
| | artifact_id=artifact_id, |
| | extra_data=kwargs |
| | ) |
| | self.log_info_user.append(entry) |
| | self._log_to_console(logging.INFO, f"[{self.INFO_USER}] {message}", artifact_id) |
| | |
| | def debug_for_dev(self, message: str, artifact_id: Optional[str] = None, **kwargs: Any): |
| | """Log debug information - for developers""" |
| | entry = LogEntry( |
| | level=self.DEBUG, |
| | message=message, |
| | timestamp=datetime.now().isoformat(), |
| | artifact_id=artifact_id, |
| | extra_data=kwargs |
| | ) |
| | self.log_debug.append(entry) |
| | self._log_to_console(logging.DEBUG, f"[{self.DEBUG}] {message}", artifact_id) |
| | |
| | def get_logs_by_level( |
| | self, |
| | level:str , |
| | compress_log: bool=False, |
| | ) -> Dict[str,Any]: |
| | self.all_logs = { |
| | self.ERROR: self.log_error, |
| | self.DEBUG: self.log_debug, |
| | self.WARNING: self.log_warn, |
| | self.INFO_LLM: self.log_info_llm, |
| | self.INFO_USER: self.log_info_user |
| | } |
| |
|
| | selected_log = self.all_logs[level] |
| |
|
| | return self._extract_log(selected_log) |
| | |
| | def _extract_log( |
| | self, |
| | log_content: List[LogEntry], |
| | ) -> Dict[str,Any]: |
| | """ |
| | Extract log content into string format. |
| | |
| | Args: |
| | log_content: List of log entries |
| | |
| | Returns: |
| | Formatted log string with each log entry on a separate line |
| | """ |
| | if not log_content: |
| | return {} |
| | |
| | log_lines: List[str] = [] |
| | extra_data_list: List[Dict[str,Any]] = [] |
| | for entry in log_content: |
| | log_line = f"[{entry.timestamp}] {entry.message}" |
| |
|
| | if entry.artifact_id: |
| | log_line += f" [artifact_id: {entry.artifact_id}]" |
| | |
| | log_lines.append(log_line) |
| | extra_data_list.append(entry.extra_data) |
| | result: Dict[str,Any] = { |
| | "log_lines": "\n".join(log_lines), |
| | "extra_data_list": extra_data_list |
| | } |
| | return result |
| | |
| | def _get_preview_urls( |
| | self, |
| | extra_data_list: List[Dict[str,Any]], |
| | ) -> List[str]: |
| | preview_urls: List[str] = [] |
| | for extra_data in extra_data_list: |
| | preview_urls.extend([str(url) for url in extra_data.get('preview_urls', [])]) |
| | return preview_urls |
| |
|
| | def get_summary( |
| | self, |
| | artifact_id: str, |
| | compress_log: bool=True, |
| | **kwargs: Dict[str,Any], |
| | ) -> Dict[str,Any]: |
| | summary: Dict[str,Any] = {} |
| | preview_urls: List[str] = [] |
| | if self.summary_levels is None: |
| | return summary |
| |
|
| | for level in self.summary_levels: |
| | summary_log = self.get_logs_by_level(level, compress_log) |
| | log_lines = summary_log.get('log_lines', "") |
| | extra_data_list = summary_log.get('extra_data_list', []) |
| | preview_urls.extend(self._get_preview_urls(extra_data_list)) |
| | summary[level] = log_lines |
| | |
| | summary['preview_urls'] = preview_urls |
| | summary['artifact_id'] = artifact_id |
| | return summary |
| |
|
| | def clear(self): |
| | """Clear all logs""" |
| | self.log_error.clear() |
| | self.log_warn.clear() |
| | self.log_info_llm.clear() |
| | self.log_info_user.clear() |
| | self.log_debug.clear() |
| | self.artifact_warnings.clear() |
| | self.artifact_errors.clear() |