"""
Loguru Logger Handler for Streamlit
Captures loguru logger output and makes it available for display in Streamlit.
"""
import sys
import time
from typing import Any
try:
from loguru import logger
LOGURU_AVAILABLE = True
except ImportError:
LOGURU_AVAILABLE = False
logger = None
from workflow_monitor import PhaseType, get_monitor
class StreamlitLogHandler:
"""Handler that captures loguru logs and sends them to workflow monitor."""
def __init__(self):
self.logs: list[dict[str, Any]] = []
self.monitor = get_monitor()
self._original_handlers = []
def setup(self, min_level: str = "DEBUG"):
"""Setup the handler to capture logs."""
if not LOGURU_AVAILABLE:
return
# Remove default handler
logger.remove()
# Add our custom handler
logger.add(
self._log_handler,
level=min_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} | {message}",
colorize=False, # We'll handle colors in Streamlit
)
# Also keep console output (optional)
logger.add(
sys.stderr,
level=min_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} | {message}",
)
def _log_handler(self, message):
"""Handle log messages from loguru."""
record = message.record
# Extract information
try:
if hasattr(record["time"], "timestamp"):
timestamp = record["time"].timestamp()
else:
timestamp = time.time()
except Exception:
timestamp = time.time()
# Get raw message text without HTML tags
# Use record["message"] to get the original message text
raw_message = record.get("message", str(message))
# Clean HTML tags if present
import re
clean_message = re.sub(r"<[^>]+>", "", raw_message)
log_entry = {
"timestamp": timestamp,
"level": record["level"].name,
"message": clean_message,
"module": record.get("name", "unknown"),
"function": record.get("function", "unknown"),
"line": record.get("line", 0),
"file": record["file"].name if record.get("file") else None,
}
# Store log
self.logs.append(log_entry)
# Also send to workflow monitor
# Map log levels to message types
level_to_message_type = {
"TRACE": "status",
"DEBUG": "status",
"INFO": "status",
"SUCCESS": "result",
"WARNING": "action",
"ERROR": "error",
"CRITICAL": "error",
}
# Try to infer agent/node from module name
agent_name = None
node_name = None
module_name = record["name"]
if "data_agent" in module_name:
agent_name = "Data Agent"
elif "experiment_agent" in module_name:
agent_name = "Experiment Agent"
elif "ideation_agent" in module_name:
agent_name = "Ideation Agent"
elif "critic_agent" in module_name:
agent_name = "Critic Agent"
# Try to infer node from function name
function_name = record["function"]
if "_node" in function_name:
node_name = function_name.replace("_node", "")
elif function_name.endswith("node"):
node_name = function_name[:-4]
# Determine phase
phase = PhaseType.DATA_EXECUTION # Default
if "experiment" in module_name:
phase = PhaseType.EXPERIMENT_EXEC
elif "ideation" in module_name:
phase = PhaseType.IDEATION_LITERATURE_SEARCH
# Send to monitor
self.monitor.log_update(
phase=phase,
status="progress",
message=f"[{record['level'].name}] {str(message)}",
agent_name=agent_name,
message_type=level_to_message_type.get(record["level"].name, "status"),
node_name=node_name,
intermediate_output={
"log_level": record["level"].name,
"module": module_name,
"function": function_name,
"line": record["line"],
"file": record["file"].name if record.get("file") else None,
},
)
def get_logs(self, level: str | None = None, limit: int | None = None) -> list[dict[str, Any]]:
"""Get logs, optionally filtered by level."""
logs = self.logs
if level:
logs = [log for log in logs if log["level"] == level.upper()]
if limit:
logs = logs[-limit:]
return logs
def clear(self):
"""Clear all logs."""
self.logs.clear()
# Global logger handler instance
_global_log_handler: StreamlitLogHandler | None = None
def get_log_handler() -> StreamlitLogHandler:
"""Get the global log handler instance."""
global _global_log_handler
if _global_log_handler is None:
_global_log_handler = StreamlitLogHandler()
return _global_log_handler
def setup_streamlit_logging(min_level: str = "DEBUG"):
"""Setup loguru to capture logs for Streamlit display."""
handler = get_log_handler()
handler.setup(min_level=min_level)
return handler
def reset_log_handler():
"""Reset the global log handler."""
global _global_log_handler
if _global_log_handler:
_global_log_handler.clear()
_global_log_handler = StreamlitLogHandler()
if LOGURU_AVAILABLE:
setup_streamlit_logging()