Spaces:
Sleeping
Sleeping
File size: 4,576 Bytes
ac5551d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | """
observability/logger.py β Structured JSON logging & audit trail.
Every module imports get_logger(); every API call writes to audit_log.
"""
from __future__ import annotations
import logging
import sys
import traceback
import random
import asyncio
from datetime import datetime, timezone
from typing import Any
import structlog
from structlog.types import EventDict, WrappedLogger
from config import settings
# ββ Processors ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _add_timestamp(
_logger: WrappedLogger, _method: str, event_dict: EventDict
) -> EventDict:
event_dict["timestamp"] = datetime.now(timezone.utc).isoformat()
return event_dict
def _add_service(
_logger: WrappedLogger, _method: str, event_dict: EventDict
) -> EventDict:
event_dict["service"] = "mlforge-backend"
event_dict["version"] = settings.version
return event_dict
# ββ Bootstrap βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def configure_logging() -> None:
"""Call once at startup before any log is emitted."""
settings.ensure_dirs()
shared_processors: list[Any] = [
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
_add_timestamp,
_add_service,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
]
# File handler β JSON
file_handler = logging.FileHandler(
settings.logs_dir / "backend.log", encoding="utf-8"
)
file_handler.setLevel(logging.DEBUG)
# Console handler β pretty in dev, JSON in prod
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.DEBUG if settings.debug else logging.INFO)
logging.basicConfig(
level=logging.DEBUG,
handlers=[file_handler, console_handler],
format="%(message)s",
)
structlog.configure(
processors=shared_processors
+ [
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
# Attach JSON renderer to handlers
renderer = structlog.processors.JSONRenderer()
formatter = structlog.stdlib.ProcessorFormatter(
processor=renderer,
foreign_pre_chain=shared_processors,
)
for handler in logging.getLogger().handlers:
handler.setFormatter(formatter)
# ββ Global System Log Queue (for Unified Dashboard) ββββββββββββββββββββββββββ
_sys_log_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
_sys_log_subs: list[asyncio.Queue] = []
def log_system_event(
level: str,
message: str,
source: str = "system",
payload: dict[str, Any] | None = None
) -> None:
"""Push a structured log into the global system queue."""
import time
event = {
"id": f"sys-{time.time()}-{random.random()}",
"ts": time.strftime("%H:%M:%S"),
"timestamp": int(time.time() * 1000),
"level": level.upper(),
"source": source,
"message": message,
"metrics": payload,
"source_type": "system"
}
# Broadcast to all active SSE subscribers
dead = []
for q in _sys_log_subs:
try:
if q.qsize() >= 100: q.get_nowait()
q.put_nowait(event)
except Exception:
dead.append(q)
for d in dead:
if d in _sys_log_subs:
_sys_log_subs.remove(d)
def get_logger(name: str = "mlforge") -> structlog.stdlib.BoundLogger:
return structlog.get_logger(name)
async def audit(
event_type: str,
payload: dict[str, Any] | None = None,
model_id: str | None = None,
job_id: str | None = None,
level: str = "info",
) -> None:
"""Write a structured audit record to the audit_log table."""
import json
from database.connection import get_db
db = await get_db()
await db.execute(
"""INSERT INTO audit_log (event_type, model_id, job_id, payload, level)
VALUES (?, ?, ?, ?, ?)""",
(event_type, model_id, job_id, json.dumps(payload or {}), level),
)
await db.commit()
|