File size: 5,787 Bytes
0913c52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""
Loguru Logger Handler for Streamlit

Captures loguru logger output and makes it available for display in Streamlit.
"""

import sys
import time
from typing import Any

try:
    from loguru import logger

    LOGURU_AVAILABLE = True
except ImportError:
    LOGURU_AVAILABLE = False
    logger = None

from workflow_monitor import PhaseType, get_monitor


class StreamlitLogHandler:
    """Handler that captures loguru logs and sends them to workflow monitor."""

    def __init__(self):
        self.logs: list[dict[str, Any]] = []
        self.monitor = get_monitor()
        self._original_handlers = []

    def setup(self, min_level: str = "DEBUG"):
        """Setup the handler to capture logs."""
        if not LOGURU_AVAILABLE:
            return

        # Remove default handler
        logger.remove()

        # Add our custom handler
        logger.add(
            self._log_handler,
            level=min_level,
            format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} | {message}",
            colorize=False,  # We'll handle colors in Streamlit
        )

        # Also keep console output (optional)
        logger.add(
            sys.stderr,
            level=min_level,
            format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | <level>{message}</level>",
        )

    def _log_handler(self, message):
        """Handle log messages from loguru."""
        record = message.record

        # Extract information
        try:
            if hasattr(record["time"], "timestamp"):
                timestamp = record["time"].timestamp()
            else:
                timestamp = time.time()
        except Exception:
            timestamp = time.time()

        # Get raw message text without HTML tags
        # Use record["message"] to get the original message text
        raw_message = record.get("message", str(message))

        # Clean HTML tags if present
        import re

        clean_message = re.sub(r"<[^>]+>", "", raw_message)

        log_entry = {
            "timestamp": timestamp,
            "level": record["level"].name,
            "message": clean_message,
            "module": record.get("name", "unknown"),
            "function": record.get("function", "unknown"),
            "line": record.get("line", 0),
            "file": record["file"].name if record.get("file") else None,
        }

        # Store log
        self.logs.append(log_entry)

        # Also send to workflow monitor
        # Map log levels to message types
        level_to_message_type = {
            "TRACE": "status",
            "DEBUG": "status",
            "INFO": "status",
            "SUCCESS": "result",
            "WARNING": "action",
            "ERROR": "error",
            "CRITICAL": "error",
        }

        # Try to infer agent/node from module name
        agent_name = None
        node_name = None
        module_name = record["name"]

        if "data_agent" in module_name:
            agent_name = "Data Agent"
        elif "experiment_agent" in module_name:
            agent_name = "Experiment Agent"
        elif "ideation_agent" in module_name:
            agent_name = "Ideation Agent"
        elif "critic_agent" in module_name:
            agent_name = "Critic Agent"

        # Try to infer node from function name
        function_name = record["function"]
        if "_node" in function_name:
            node_name = function_name.replace("_node", "")
        elif function_name.endswith("node"):
            node_name = function_name[:-4]

        # Determine phase
        phase = PhaseType.DATA_EXECUTION  # Default
        if "experiment" in module_name:
            phase = PhaseType.EXPERIMENT_EXEC
        elif "ideation" in module_name:
            phase = PhaseType.IDEATION_LITERATURE_SEARCH

        # Send to monitor
        self.monitor.log_update(
            phase=phase,
            status="progress",
            message=f"[{record['level'].name}] {str(message)}",
            agent_name=agent_name,
            message_type=level_to_message_type.get(record["level"].name, "status"),
            node_name=node_name,
            intermediate_output={
                "log_level": record["level"].name,
                "module": module_name,
                "function": function_name,
                "line": record["line"],
                "file": record["file"].name if record.get("file") else None,
            },
        )

    def get_logs(self, level: str | None = None, limit: int | None = None) -> list[dict[str, Any]]:
        """Get logs, optionally filtered by level."""
        logs = self.logs
        if level:
            logs = [log for log in logs if log["level"] == level.upper()]
        if limit:
            logs = logs[-limit:]
        return logs

    def clear(self):
        """Clear all logs."""
        self.logs.clear()


# Global logger handler instance
_global_log_handler: StreamlitLogHandler | None = None


def get_log_handler() -> StreamlitLogHandler:
    """Get the global log handler instance."""
    global _global_log_handler
    if _global_log_handler is None:
        _global_log_handler = StreamlitLogHandler()
    return _global_log_handler


def setup_streamlit_logging(min_level: str = "DEBUG"):
    """Setup loguru to capture logs for Streamlit display."""
    handler = get_log_handler()
    handler.setup(min_level=min_level)
    return handler


def reset_log_handler():
    """Reset the global log handler."""
    global _global_log_handler
    if _global_log_handler:
        _global_log_handler.clear()
    _global_log_handler = StreamlitLogHandler()
    if LOGURU_AVAILABLE:
        setup_streamlit_logging()