doc-ingestion / src /utils /log.py
Vamshi Pokala
feat: React UI, Docker Space, sessions, Ollama toggle on HF
2af1cca
'''
- Structured logging with JSON format
- Performance metrics collection (delegates to src.monitoring.metrics)
- Error tracking and alerting
- Request/response logging
'''
import contextlib
import json
import logging
import time
from typing import Any, Dict, Iterator, Optional
from src.monitoring.metrics import get_metrics_collector
class JSONFormatter(logging.Formatter):
def format(self, record: logging.LogRecord) -> str:
log_record: Dict[str, Any] = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info:
log_record["exception"] = self.formatException(record.exc_info)
if hasattr(record, "extra"):
log_record.update(record.extra)
return json.dumps(log_record)
def get_logger(name: str, level: int = logging.INFO) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(level)
return logger
class MetricsCollector:
"""
Deprecated: use src.monitoring.metrics.MetricsCollector instead.
This class now delegates to the new metrics collector for backward compatibility.
"""
def __init__(self) -> None:
self._metrics: Dict[str, list] = {}
self._new_collector = get_metrics_collector()
def record(self, name: str, value: float) -> None:
self._metrics.setdefault(name, []).append(value)
def summary(self) -> Dict[str, Dict[str, float]]:
result = {}
for name, values in self._metrics.items():
result[name] = {
"count": len(values),
"total": sum(values),
"mean": sum(values) / len(values),
"min": min(values),
"max": max(values),
}
return result
metrics = MetricsCollector()
@contextlib.contextmanager
def track_duration(operation: str, logger: Optional[logging.Logger] = None) -> Iterator[None]:
"""Context manager that measures wall-clock duration and records it."""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
metrics.record(operation, elapsed)
if logger:
_log_extra(
logger,
logging.DEBUG,
f"{operation} completed",
{"duration_seconds": round(elapsed, 4), "operation": operation},
)
def log_request(logger: logging.Logger, method: str, path: str, extra: Optional[Dict] = None) -> None:
payload = {"event": "request", "method": method, "path": path}
if extra:
payload.update(extra)
_log_extra(logger, logging.INFO, f"{method} {path}", payload)
def log_response(
logger: logging.Logger,
method: str,
path: str,
status_code: int,
duration_seconds: float,
extra: Optional[Dict] = None,
) -> None:
payload = {
"event": "response",
"method": method,
"path": path,
"status_code": status_code,
"duration_seconds": round(duration_seconds, 4),
}
if extra:
payload.update(extra)
level = logging.WARNING if status_code >= 400 else logging.INFO
_log_extra(logger, level, f"{method} {path} -> {status_code}", payload)
def log_error(
logger: logging.Logger,
message: str,
exc: Optional[BaseException] = None,
extra: Optional[Dict] = None,
) -> None:
payload: Dict[str, Any] = {"event": "error"}
if extra:
payload.update(extra)
logger.error(message, exc_info=exc, stack_info=exc is not None)
if payload:
_log_extra(logger, logging.ERROR, message, payload)
# --- internal helper ---
def _log_extra(logger: logging.Logger, level: int, message: str, extra: Dict) -> None:
record = logger.makeRecord(
logger.name, level, "(unknown)", 0, message, (), None
)
record.extra = extra # type: ignore[attr-defined]
logger.handle(record)