Axiovora-X / backend /core /logging_config.py
ZAIDX11's picture
Add files using upload-large-folder tool
effde1c verified
# --- Real-Time Log Streaming (Websockets, FastAPI) ---
import asyncio
import logging
import time
import os
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn
from typing import Set
class WebsocketLogHandler(logging.Handler):
"""Streams logs to connected websocket clients."""
clients: Set[WebSocket] = set()
def __init__(self):
super().__init__()
def emit(self, record: logging.LogRecord):
log_entry = self.format(record)
# Schedule sending log to all clients
for ws in list(WebsocketLogHandler.clients):
try:
loop = asyncio.get_event_loop()
if loop.is_running():
asyncio.create_task(ws.send_text(log_entry))
else:
loop.run_until_complete(ws.send_text(log_entry))
except Exception:
WebsocketLogHandler.clients.discard(ws)
app = FastAPI()
@app.websocket("/ws/logs")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
WebsocketLogHandler.clients.add(websocket)
try:
while True:
await websocket.receive_text() # Keep alive
except WebSocketDisconnect:
WebsocketLogHandler.clients.discard(websocket)
def run_log_stream_server():
uvicorn.run(app, host="0.0.0.0", port=8765)
import shutil
from apscheduler.schedulers.background import BackgroundScheduler
def rotate_logs(log_dir: str, max_files: int = 10):
files = sorted([f for f in os.listdir(log_dir) if f.endswith('.log')], reverse=True)
for i, f in enumerate(files[max_files:], start=max_files):
os.remove(os.path.join(log_dir, f))
# Archive oldest
if len(files) > max_files:
archive_dir = os.path.join(log_dir, 'archive')
os.makedirs(archive_dir, exist_ok=True)
shutil.move(os.path.join(log_dir, files[max_files]), os.path.join(archive_dir, files[max_files]))
def schedule_log_rotation(log_dir: str, max_files: int = 10, interval_minutes: int = 60):
scheduler = BackgroundScheduler()
scheduler.add_job(lambda: rotate_logs(log_dir, max_files), 'interval', minutes=interval_minutes)
scheduler.start()
try:
from elasticsearch import Elasticsearch
except ImportError:
Elasticsearch = None
class ElasticsearchHandler(logging.Handler):
def __init__(self, es_url: str, index: str = 'logs'):
super().__init__()
if Elasticsearch is None:
raise ImportError("elasticsearch package is not installed")
self.es = Elasticsearch(es_url)
self.index = index
def emit(self, record: logging.LogRecord):
log_entry = self.format(record)
self.es.index(index=self.index, body={'message': log_entry, 'timestamp': time.time()})
def create_elasticsearch_dashboard(es_url: str, index: str = 'logs'):
# This is a stub for real dashboard creation (Kibana, Grafana, etc.)
# In production, use Kibana API or Grafana provisioning
print(f"Dashboard for index '{index}' available at {es_url}/_plugin/kibana/app/discover#/?_a=(index:'{index}')")
# --- Advanced Log Search, Filtering, Alerting ---
from typing import Callable
def search_logs(log_file: str, keyword: str) -> list[str]:
results: list[str] = []
with open(log_file, encoding="utf-8") as f:
for line in f:
if keyword in line:
results.append(line.strip())
return results
def alert_on_log_pattern(log_file: str, pattern: str, alert_fn: Callable[[str], None]):
with open(log_file, encoding="utf-8") as f:
for line in f:
if pattern in line:
alert_fn(line)
# --- Log Correlation, Trace IDs, Distributed Context ---
def add_trace_id(record: logging.LogRecord, trace_id: str) -> logging.LogRecord:
record.trace_id = trace_id
return record
import re
from typing import List, Optional
def redact_log_message(message: str, patterns: List[str]) -> str:
for pat in patterns:
message = re.sub(pat, "[REDACTED]", message)
return message
class RedactingFormatter(logging.Formatter):
def __init__(self, patterns: List[str], *args: object, **kwargs: object):
super().__init__(*args, **kwargs)
self.patterns = patterns
def format(self, record: logging.LogRecord) -> str:
msg = super().format(record)
return redact_log_message(msg, self.patterns)
def enrich_log_record(record: logging.LogRecord, user_id: Optional[str]=None, session_id: Optional[str]=None, request_id: Optional[str]=None):
if user_id:
record.user_id = user_id
if session_id:
record.session_id = session_id
if request_id:
record.request_id = request_id
return record
# --- Expanded Test Harness ---
def test_advanced_logging():
logger = configure_logging("development")
# Start log streaming server in a thread
import threading
server_thread = threading.Thread(target=run_log_stream_server, daemon=True)
server_thread.start()
time.sleep(2)
# Attach websocket handler
ws_handler = WebsocketLogHandler()
logger.addHandler(ws_handler)
logger.info("Websocket log test.")
# Elasticsearch
es_handler = ElasticsearchHandler("http://localhost:9200")
logger.addHandler(es_handler)
logger.info("Elasticsearch log test.")
create_elasticsearch_dashboard("http://localhost:9200")
# Log rotation/archival
schedule_log_rotation(LOG_DIR, max_files=5, interval_minutes=1)
logger.info("Log rotation scheduled.")
# Search/filter/alert
log_file = os.path.join(LOG_DIR, "app.log")
print("Search logs:", search_logs(log_file, "Test"))
alert_on_log_pattern(log_file, "ERROR", lambda l: print("ALERT:", l))
# Redaction
redacting_formatter = RedactingFormatter([r"password=\w+", r"secret=\w+"])
for handler in logger.handlers:
handler.setFormatter(redacting_formatter)
logger.info("User login password=12345 secret=abcdefg")
# Enrich log record
logger = get_logger("enriched", context={"user_id": "u42", "session_id": "sess99", "request_id": "req777"})
logger.info("Enriched log with context.")
# Simulate log ingestion from syslog (stub)
print("[Stub] Ingest logs from syslog/cloudwatch.")
if __name__ == "__main__":
test_advanced_logging()
import logging
import os
import sys
import json
from logging.handlers import RotatingFileHandler, SMTPHandler, HTTPHandler
from typing import Optional, Dict, Any
# --- Sentry Integration ---
SENTRY_DSN = os.getenv("SENTRY_DSN")
try:
import sentry_sdk
if SENTRY_DSN:
sentry_sdk.init(
dsn=SENTRY_DSN,
traces_sample_rate=0.1, # Adjust for production
environment=os.getenv("ENVIRONMENT", "development"),
)
except ImportError:
pass # Sentry is optional
# --- Advanced Handlers ---
LOG_DIR = os.getenv("LOG_DIR", "logs")
os.makedirs(LOG_DIR, exist_ok=True)
def get_file_handler(log_name: str = "app.log", max_bytes: int = 10**7, backup_count: int = 5):
return RotatingFileHandler(
os.path.join(LOG_DIR, log_name),
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8"
)
def get_email_handler():
mailhost = (os.getenv("SMTP_HOST", "localhost"), int(os.getenv("SMTP_PORT", 25)))
fromaddr = os.getenv("LOG_EMAIL_FROM", "noreply@example.com")
toaddrs = os.getenv("LOG_EMAIL_TO", "admin@example.com").split(",")
subject = os.getenv("LOG_EMAIL_SUBJECT", "App Error")
user = os.getenv("SMTP_USER")
passwd = os.getenv("SMTP_PASS")
credentials = (user, passwd) if user and passwd else None
return SMTPHandler(mailhost, fromaddr, toaddrs, subject, credentials=credentials, secure=())
def get_http_handler():
host = os.getenv("LOG_HTTP_HOST", "localhost:8000")
url = os.getenv("LOG_HTTP_URL", "/log")
method = os.getenv("LOG_HTTP_METHOD", "POST")
return HTTPHandler(host, url, method=method)
class SlackHandler(logging.Handler):
"""Send logs to Slack via webhook."""
def __init__(self, webhook_url: str):
super().__init__()
self.webhook_url = webhook_url
def emit(self, record):
import requests
log_entry = self.format(record)
try:
requests.post(self.webhook_url, json={"text": log_entry})
except Exception:
pass
# --- Structured Logging ---
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
"timestamp": self.formatTime(record, self.datefmt),
"level": record.levelname,
"name": record.name,
"message": record.getMessage(),
"module": record.module,
"funcName": record.funcName,
"lineno": record.lineno,
}
if hasattr(record, 'extra'):
log_record.update(record.extra)
return json.dumps(log_record)
# --- Dynamic Log Level and Filtering ---
def set_log_level(logger: logging.Logger, level: str):
logger.setLevel(getattr(logging, level.upper(), logging.INFO))
class ContextFilter(logging.Filter):
def __init__(self, context: Optional[Dict[str, Any]] = None):
super().__init__()
self.context = context or {}
def filter(self, record):
for k, v in self.context.items():
setattr(record, k, v)
return True
# --- Distributed Tracing and Metrics (Stubs) ---
def trace_log(logger: logging.Logger, trace_id: str, span_id: str, message: str):
logger.info(f"[trace_id={trace_id} span_id={span_id}] {message}")
def log_metric(logger: logging.Logger, metric_name: str, value: float, tags: Optional[Dict[str, str]] = None):
logger.info(f"[metric] {metric_name}={value} tags={tags}")
# --- Log Analysis and Visualization Utilities ---
from collections import Counter
def analyze_logs(log_file: str) -> Dict[str, Any]:
levels = Counter()
with open(log_file, encoding="utf-8") as f:
for line in f:
try:
entry = json.loads(line)
levels[entry.get("level", "INFO")] += 1
except Exception:
continue
return dict(levels)
def visualize_log_levels(log_file: str):
import matplotlib.pyplot as plt
stats = analyze_logs(log_file)
plt.bar(stats.keys(), stats.values())
plt.title("Log Level Distribution")
plt.xlabel("Level")
plt.ylabel("Count")
plt.show()
# --- Main Logging Configuration ---
def configure_logging(env: str = "development"):
env = env or os.getenv("ENVIRONMENT", "development")
logger = logging.getLogger()
logger.handlers.clear()
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s')
json_formatter = JsonFormatter()
# Console handler
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
logger.addHandler(ch)
# File handler
fh = get_file_handler("app.log")
fh.setFormatter(json_formatter)
logger.addHandler(fh)
# Websocket handler (for real-time streaming)
ws_handler = WebsocketLogHandler()
logger.addHandler(ws_handler)
# Email handler (errors only)
if env == "production":
eh = get_email_handler()
eh.setLevel(logging.ERROR)
logger.addHandler(eh)
# HTTP handler (optional)
if os.getenv("LOG_HTTP_HOST"):
hh = get_http_handler()
logger.addHandler(hh)
# Slack handler (optional)
slack_url = os.getenv("SLACK_WEBHOOK_URL")
if slack_url:
sh = SlackHandler(slack_url)
sh.setLevel(logging.WARNING)
logger.addHandler(sh)
logger.setLevel(logging.INFO)
logger.info(f"Logging configured for {env} environment.")
return logger
def get_logger(name: str, context: Optional[Dict[str, Any]] = None, level: str = "INFO") -> logging.Logger:
logger = logging.getLogger(name)
set_log_level(logger, level)
if context:
logger.addFilter(ContextFilter(context))
return logger
# --- Example/Test Harness ---
def test_logging():
logger = configure_logging("development")
logger.info("Test info log.")
logger.warning("Test warning log.")
logger.error("Test error log.")
trace_log(logger, "trace123", "span456", "Tracing example.")
log_metric(logger, "accuracy", 0.98, tags={"model": "test"})
# Visualize log levels
log_file = os.path.join(LOG_DIR, "app.log")
visualize_log_levels(log_file)
if __name__ == "__main__":
test_logging()