scievo-test / streamlit-client /logger_handler.py
harry-lu-0708's picture
clean HF Space commit (no binary history)
0913c52
"""
Loguru Logger Handler for Streamlit
Captures loguru logger output and makes it available for display in Streamlit.
"""
import sys
import time
from typing import Any
try:
from loguru import logger
LOGURU_AVAILABLE = True
except ImportError:
LOGURU_AVAILABLE = False
logger = None
from workflow_monitor import PhaseType, get_monitor
class StreamlitLogHandler:
"""Handler that captures loguru logs and sends them to workflow monitor."""
def __init__(self):
self.logs: list[dict[str, Any]] = []
self.monitor = get_monitor()
self._original_handlers = []
def setup(self, min_level: str = "DEBUG"):
"""Setup the handler to capture logs."""
if not LOGURU_AVAILABLE:
return
# Remove default handler
logger.remove()
# Add our custom handler
logger.add(
self._log_handler,
level=min_level,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} | {message}",
colorize=False, # We'll handle colors in Streamlit
)
# Also keep console output (optional)
logger.add(
sys.stderr,
level=min_level,
format="<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> | <level>{message}</level>",
)
def _log_handler(self, message):
"""Handle log messages from loguru."""
record = message.record
# Extract information
try:
if hasattr(record["time"], "timestamp"):
timestamp = record["time"].timestamp()
else:
timestamp = time.time()
except Exception:
timestamp = time.time()
# Get raw message text without HTML tags
# Use record["message"] to get the original message text
raw_message = record.get("message", str(message))
# Clean HTML tags if present
import re
clean_message = re.sub(r"<[^>]+>", "", raw_message)
log_entry = {
"timestamp": timestamp,
"level": record["level"].name,
"message": clean_message,
"module": record.get("name", "unknown"),
"function": record.get("function", "unknown"),
"line": record.get("line", 0),
"file": record["file"].name if record.get("file") else None,
}
# Store log
self.logs.append(log_entry)
# Also send to workflow monitor
# Map log levels to message types
level_to_message_type = {
"TRACE": "status",
"DEBUG": "status",
"INFO": "status",
"SUCCESS": "result",
"WARNING": "action",
"ERROR": "error",
"CRITICAL": "error",
}
# Try to infer agent/node from module name
agent_name = None
node_name = None
module_name = record["name"]
if "data_agent" in module_name:
agent_name = "Data Agent"
elif "experiment_agent" in module_name:
agent_name = "Experiment Agent"
elif "ideation_agent" in module_name:
agent_name = "Ideation Agent"
elif "critic_agent" in module_name:
agent_name = "Critic Agent"
# Try to infer node from function name
function_name = record["function"]
if "_node" in function_name:
node_name = function_name.replace("_node", "")
elif function_name.endswith("node"):
node_name = function_name[:-4]
# Determine phase
phase = PhaseType.DATA_EXECUTION # Default
if "experiment" in module_name:
phase = PhaseType.EXPERIMENT_EXEC
elif "ideation" in module_name:
phase = PhaseType.IDEATION_LITERATURE_SEARCH
# Send to monitor
self.monitor.log_update(
phase=phase,
status="progress",
message=f"[{record['level'].name}] {str(message)}",
agent_name=agent_name,
message_type=level_to_message_type.get(record["level"].name, "status"),
node_name=node_name,
intermediate_output={
"log_level": record["level"].name,
"module": module_name,
"function": function_name,
"line": record["line"],
"file": record["file"].name if record.get("file") else None,
},
)
def get_logs(self, level: str | None = None, limit: int | None = None) -> list[dict[str, Any]]:
"""Get logs, optionally filtered by level."""
logs = self.logs
if level:
logs = [log for log in logs if log["level"] == level.upper()]
if limit:
logs = logs[-limit:]
return logs
def clear(self):
"""Clear all logs."""
self.logs.clear()
# Global logger handler instance
_global_log_handler: StreamlitLogHandler | None = None
def get_log_handler() -> StreamlitLogHandler:
"""Get the global log handler instance."""
global _global_log_handler
if _global_log_handler is None:
_global_log_handler = StreamlitLogHandler()
return _global_log_handler
def setup_streamlit_logging(min_level: str = "DEBUG"):
"""Setup loguru to capture logs for Streamlit display."""
handler = get_log_handler()
handler.setup(min_level=min_level)
return handler
def reset_log_handler():
"""Reset the global log handler."""
global _global_log_handler
if _global_log_handler:
_global_log_handler.clear()
_global_log_handler = StreamlitLogHandler()
if LOGURU_AVAILABLE:
setup_streamlit_logging()