mlforge / observability /logger.py
senthil2421's picture
Deploy cloud brain to HF Spaces
ac5551d
"""
observability/logger.py β€” Structured JSON logging & audit trail.
Every module imports get_logger(); every API call writes to audit_log.
"""
from __future__ import annotations
import logging
import sys
import traceback
import random
import asyncio
from datetime import datetime, timezone
from typing import Any
import structlog
from structlog.types import EventDict, WrappedLogger
from config import settings
# ── Processors ────────────────────────────────────────────────────────────────
def _add_timestamp(
_logger: WrappedLogger, _method: str, event_dict: EventDict
) -> EventDict:
event_dict["timestamp"] = datetime.now(timezone.utc).isoformat()
return event_dict
def _add_service(
_logger: WrappedLogger, _method: str, event_dict: EventDict
) -> EventDict:
event_dict["service"] = "mlforge-backend"
event_dict["version"] = settings.version
return event_dict
# ── Bootstrap ─────────────────────────────────────────────────────────────────
def configure_logging() -> None:
"""Call once at startup before any log is emitted."""
settings.ensure_dirs()
shared_processors: list[Any] = [
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
_add_timestamp,
_add_service,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
]
# File handler β€” JSON
file_handler = logging.FileHandler(
settings.logs_dir / "backend.log", encoding="utf-8"
)
file_handler.setLevel(logging.DEBUG)
# Console handler β€” pretty in dev, JSON in prod
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG if settings.debug else logging.INFO)
logging.basicConfig(
level=logging.DEBUG,
handlers=[file_handler, console_handler],
format="%(message)s",
)
structlog.configure(
processors=shared_processors
+ [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Attach JSON renderer to handlers
renderer = structlog.processors.JSONRenderer()
formatter = structlog.stdlib.ProcessorFormatter(
processor=renderer,
foreign_pre_chain=shared_processors,
)
for handler in logging.getLogger().handlers:
handler.setFormatter(formatter)
# ── Global System Log Queue (for Unified Dashboard) ──────────────────────────
_sys_log_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
_sys_log_subs: list[asyncio.Queue] = []
def log_system_event(
level: str,
message: str,
source: str = "system",
payload: dict[str, Any] | None = None
) -> None:
"""Push a structured log into the global system queue."""
import time
event = {
"id": f"sys-{time.time()}-{random.random()}",
"ts": time.strftime("%H:%M:%S"),
"timestamp": int(time.time() * 1000),
"level": level.upper(),
"source": source,
"message": message,
"metrics": payload,
"source_type": "system"
}
# Broadcast to all active SSE subscribers
dead = []
for q in _sys_log_subs:
try:
if q.qsize() >= 100: q.get_nowait()
q.put_nowait(event)
except Exception:
dead.append(q)
for d in dead:
if d in _sys_log_subs:
_sys_log_subs.remove(d)
def get_logger(name: str = "mlforge") -> structlog.stdlib.BoundLogger:
return structlog.get_logger(name)
async def audit(
event_type: str,
payload: dict[str, Any] | None = None,
model_id: str | None = None,
job_id: str | None = None,
level: str = "info",
) -> None:
"""Write a structured audit record to the audit_log table."""
import json
from database.connection import get_db
db = await get_db()
await db.execute(
"""INSERT INTO audit_log (event_type, model_id, job_id, payload, level)
VALUES (?, ?, ?, ?, ?)""",
(event_type, model_id, job_id, json.dumps(payload or {}), level),
)
await db.commit()