File size: 4,576 Bytes
ac5551d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
observability/logger.py β€” Structured JSON logging & audit trail.
Every module imports get_logger(); every API call writes to audit_log.
"""
from __future__ import annotations

import logging
import sys
import traceback
import random
import asyncio
from datetime import datetime, timezone
from typing import Any

import structlog
from structlog.types import EventDict, WrappedLogger

from config import settings


# ── Processors ────────────────────────────────────────────────────────────────

def _add_timestamp(
    _logger: WrappedLogger, _method: str, event_dict: EventDict
) -> EventDict:
    event_dict["timestamp"] = datetime.now(timezone.utc).isoformat()
    return event_dict


def _add_service(
    _logger: WrappedLogger, _method: str, event_dict: EventDict
) -> EventDict:
    event_dict["service"] = "mlforge-backend"
    event_dict["version"] = settings.version
    return event_dict


# ── Bootstrap ─────────────────────────────────────────────────────────────────

def configure_logging() -> None:
    """Call once at startup before any log is emitted."""
    settings.ensure_dirs()

    shared_processors: list[Any] = [
        structlog.stdlib.add_log_level,
        structlog.stdlib.add_logger_name,
        _add_timestamp,
        _add_service,
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
    ]

    # File handler β€” JSON
    file_handler = logging.FileHandler(
        settings.logs_dir / "backend.log", encoding="utf-8"
    )
    file_handler.setLevel(logging.DEBUG)

    # Console handler β€” pretty in dev, JSON in prod
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(logging.DEBUG if settings.debug else logging.INFO)

    logging.basicConfig(
        level=logging.DEBUG,
        handlers=[file_handler, console_handler],
        format="%(message)s",
    )

    structlog.configure(
        processors=shared_processors
        + [
            structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
        ],
        wrapper_class=structlog.stdlib.BoundLogger,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        cache_logger_on_first_use=True,
    )

    # Attach JSON renderer to handlers
    renderer = structlog.processors.JSONRenderer()
    formatter = structlog.stdlib.ProcessorFormatter(
        processor=renderer,
        foreign_pre_chain=shared_processors,
    )
    for handler in logging.getLogger().handlers:
        handler.setFormatter(formatter)


# ── Global System Log Queue (for Unified Dashboard) ──────────────────────────

_sys_log_queue: asyncio.Queue = asyncio.Queue(maxsize=1000)
_sys_log_subs: list[asyncio.Queue] = []

def log_system_event(
    level: str,
    message: str,
    source: str = "system",
    payload: dict[str, Any] | None = None
) -> None:
    """Push a structured log into the global system queue."""
    import time
    
    event = {
        "id": f"sys-{time.time()}-{random.random()}",
        "ts": time.strftime("%H:%M:%S"),
        "timestamp": int(time.time() * 1000),
        "level": level.upper(),
        "source": source,
        "message": message,
        "metrics": payload,
        "source_type": "system"
    }
    
    # Broadcast to all active SSE subscribers
    dead = []
    for q in _sys_log_subs:
        try:
            if q.qsize() >= 100: q.get_nowait()
            q.put_nowait(event)
        except Exception:
            dead.append(q)
    for d in dead:
        if d in _sys_log_subs:
            _sys_log_subs.remove(d)


def get_logger(name: str = "mlforge") -> structlog.stdlib.BoundLogger:
    return structlog.get_logger(name)


async def audit(
    event_type: str,
    payload: dict[str, Any] | None = None,
    model_id: str | None = None,
    job_id: str | None = None,
    level: str = "info",
) -> None:
    """Write a structured audit record to the audit_log table."""
    import json
    from database.connection import get_db

    db = await get_db()
    await db.execute(
        """INSERT INTO audit_log (event_type, model_id, job_id, payload, level)
           VALUES (?, ?, ?, ?, ?)""",
        (event_type, model_id, job_id, json.dumps(payload or {}), level),
    )
    await db.commit()