| """ |
| app/core/sre_logging.py – SRE Log Aggregation (No Circular Dependencies) |
| ========================================================================== |
| Central log aggregator and emitter functions that can be safely imported |
| by any service without causing circular imports. |
| """ |
|
|
| import threading |
| import logging |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Any, Optional |
| from collections import deque |
|
|
| |
| class LogAggregator: |
| """Thread-safe ring buffer storing last 1000 logs from all services""" |
| def __init__(self, max_size: int = 1000): |
| self.max_size = max_size |
| self.buffer: deque = deque(maxlen=max_size) |
| self.lock = threading.Lock() |
| |
| def emit(self, service: str, level: str, message: str, **kwargs): |
| """Add a log entry from any service""" |
| with self.lock: |
| entry = { |
| "timestamp": datetime.utcnow().isoformat(), |
| "service": service, |
| "level": level, |
| "message": message, |
| **kwargs |
| } |
| self.buffer.append(entry) |
| |
| def get_logs(self, service: Optional[str] = None, level: Optional[str] = None, limit: int = 100) -> List[Dict]: |
| """Retrieve filtered logs (most recent first)""" |
| with self.lock: |
| filtered = [ |
| log for log in self.buffer |
| if (not service or log["service"] == service) |
| and (not level or log["level"] == level) |
| ] |
| return list(filtered)[-limit:] |
| |
| def get_error_rate(self, service: Optional[str], window_minutes: int = 5) -> float: |
| """Calculate error rate for a service (or all if service=None)""" |
| cutoff = datetime.utcnow() - timedelta(minutes=window_minutes) |
| cutoff_str = cutoff.isoformat() |
| |
| with self.lock: |
| recent = [ |
| log for log in self.buffer |
| if log["timestamp"] >= cutoff_str |
| and (not service or log["service"] == service) |
| ] |
| if not recent: |
| return 0.0 |
| errors = [log for log in recent if log["level"] in ("error", "critical")] |
| return len(errors) / len(recent) |
|
|
| |
| log_aggregator = LogAggregator(max_size=1000) |
|
|
| |
| def emit_worker_log(level: str, message: str, **kwargs): |
| log_aggregator.emit("analytics_worker", level, message, **kwargs) |
|
|
| def emit_vector_log(level: str, message: str, **kwargs): |
| log_aggregator.emit("vector_service", level, message, **kwargs) |
|
|
| def emit_llm_log(level: str, message: str, **kwargs): |
| log_aggregator.emit("llm_service", level, message, **kwargs) |
|
|
| def emit_mapper_log(level: str, message: str, **kwargs): |
| log_aggregator.emit("mapper", level, message, **kwargs) |
|
|
| def emit_deps_log(level: str, message: str, **kwargs): |
| log_aggregator.emit("dependencies", level, message, **kwargs) |