Spaces:
Running
Running
File size: 5,787 Bytes
0913c52 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
"""
Loguru Logger Handler for Streamlit
Captures loguru logger output and makes it available for display in Streamlit.
"""
import sys
import time
from typing import Any
try:
from loguru import logger
LOGURU_AVAILABLE = True
except ImportError:
LOGURU_AVAILABLE = False
logger = None
from workflow_monitor import PhaseType, get_monitor
class StreamlitLogHandler:
"""Handler that captures loguru logs and sends them to workflow monitor."""
def __init__(self):
self.logs: list[dict[str, Any]] = []
self.monitor = get_monitor()
self._original_handlers = []
def setup(self, min_level: str = "DEBUG"):
"""Setup the handler to capture logs."""
if not LOGURU_AVAILABLE:
return
# Remove default handler
logger.remove()
# Add our custom handler
logger.add(
self._log_handler,
level=min_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} | {message}",
colorize=False, # We'll handle colors in Streamlit
)
# Also keep console output (optional)
logger.add(
sys.stderr,
level=min_level,
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | <level>{message}</level>",
)
def _log_handler(self, message):
"""Handle log messages from loguru."""
record = message.record
# Extract information
try:
if hasattr(record["time"], "timestamp"):
timestamp = record["time"].timestamp()
else:
timestamp = time.time()
except Exception:
timestamp = time.time()
# Get raw message text without HTML tags
# Use record["message"] to get the original message text
raw_message = record.get("message", str(message))
# Clean HTML tags if present
import re
clean_message = re.sub(r"<[^>]+>", "", raw_message)
log_entry = {
"timestamp": timestamp,
"level": record["level"].name,
"message": clean_message,
"module": record.get("name", "unknown"),
"function": record.get("function", "unknown"),
"line": record.get("line", 0),
"file": record["file"].name if record.get("file") else None,
}
# Store log
self.logs.append(log_entry)
# Also send to workflow monitor
# Map log levels to message types
level_to_message_type = {
"TRACE": "status",
"DEBUG": "status",
"INFO": "status",
"SUCCESS": "result",
"WARNING": "action",
"ERROR": "error",
"CRITICAL": "error",
}
# Try to infer agent/node from module name
agent_name = None
node_name = None
module_name = record["name"]
if "data_agent" in module_name:
agent_name = "Data Agent"
elif "experiment_agent" in module_name:
agent_name = "Experiment Agent"
elif "ideation_agent" in module_name:
agent_name = "Ideation Agent"
elif "critic_agent" in module_name:
agent_name = "Critic Agent"
# Try to infer node from function name
function_name = record["function"]
if "_node" in function_name:
node_name = function_name.replace("_node", "")
elif function_name.endswith("node"):
node_name = function_name[:-4]
# Determine phase
phase = PhaseType.DATA_EXECUTION # Default
if "experiment" in module_name:
phase = PhaseType.EXPERIMENT_EXEC
elif "ideation" in module_name:
phase = PhaseType.IDEATION_LITERATURE_SEARCH
# Send to monitor
self.monitor.log_update(
phase=phase,
status="progress",
message=f"[{record['level'].name}] {str(message)}",
agent_name=agent_name,
message_type=level_to_message_type.get(record["level"].name, "status"),
node_name=node_name,
intermediate_output={
"log_level": record["level"].name,
"module": module_name,
"function": function_name,
"line": record["line"],
"file": record["file"].name if record.get("file") else None,
},
)
def get_logs(self, level: str | None = None, limit: int | None = None) -> list[dict[str, Any]]:
"""Get logs, optionally filtered by level."""
logs = self.logs
if level:
logs = [log for log in logs if log["level"] == level.upper()]
if limit:
logs = logs[-limit:]
return logs
def clear(self):
"""Clear all logs."""
self.logs.clear()
# Global logger handler instance
_global_log_handler: StreamlitLogHandler | None = None
def get_log_handler() -> StreamlitLogHandler:
"""Get the global log handler instance."""
global _global_log_handler
if _global_log_handler is None:
_global_log_handler = StreamlitLogHandler()
return _global_log_handler
def setup_streamlit_logging(min_level: str = "DEBUG"):
"""Setup loguru to capture logs for Streamlit display."""
handler = get_log_handler()
handler.setup(min_level=min_level)
return handler
def reset_log_handler():
"""Reset the global log handler."""
global _global_log_handler
if _global_log_handler:
_global_log_handler.clear()
_global_log_handler = StreamlitLogHandler()
if LOGURU_AVAILABLE:
setup_streamlit_logging()
|