# --- Real-Time Log Streaming (Websockets, FastAPI) --- import asyncio import logging import time import os from fastapi import FastAPI, WebSocket, WebSocketDisconnect import uvicorn from typing import Set class WebsocketLogHandler(logging.Handler): """Streams logs to connected websocket clients.""" clients: Set[WebSocket] = set() def __init__(self): super().__init__() def emit(self, record: logging.LogRecord): log_entry = self.format(record) # Schedule sending log to all clients for ws in list(WebsocketLogHandler.clients): try: loop = asyncio.get_event_loop() if loop.is_running(): asyncio.create_task(ws.send_text(log_entry)) else: loop.run_until_complete(ws.send_text(log_entry)) except Exception: WebsocketLogHandler.clients.discard(ws) app = FastAPI() @app.websocket("/ws/logs") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() WebsocketLogHandler.clients.add(websocket) try: while True: await websocket.receive_text() # Keep alive except WebSocketDisconnect: WebsocketLogHandler.clients.discard(websocket) def run_log_stream_server(): uvicorn.run(app, host="0.0.0.0", port=8765) import shutil from apscheduler.schedulers.background import BackgroundScheduler def rotate_logs(log_dir: str, max_files: int = 10): files = sorted([f for f in os.listdir(log_dir) if f.endswith('.log')], reverse=True) for i, f in enumerate(files[max_files:], start=max_files): os.remove(os.path.join(log_dir, f)) # Archive oldest if len(files) > max_files: archive_dir = os.path.join(log_dir, 'archive') os.makedirs(archive_dir, exist_ok=True) shutil.move(os.path.join(log_dir, files[max_files]), os.path.join(archive_dir, files[max_files])) def schedule_log_rotation(log_dir: str, max_files: int = 10, interval_minutes: int = 60): scheduler = BackgroundScheduler() scheduler.add_job(lambda: rotate_logs(log_dir, max_files), 'interval', minutes=interval_minutes) scheduler.start() try: from elasticsearch import Elasticsearch except ImportError: Elasticsearch = None class ElasticsearchHandler(logging.Handler): def __init__(self, es_url: str, index: str = 'logs'): super().__init__() if Elasticsearch is None: raise ImportError("elasticsearch package is not installed") self.es = Elasticsearch(es_url) self.index = index def emit(self, record: logging.LogRecord): log_entry = self.format(record) self.es.index(index=self.index, body={'message': log_entry, 'timestamp': time.time()}) def create_elasticsearch_dashboard(es_url: str, index: str = 'logs'): # This is a stub for real dashboard creation (Kibana, Grafana, etc.) # In production, use Kibana API or Grafana provisioning print(f"Dashboard for index '{index}' available at {es_url}/_plugin/kibana/app/discover#/?_a=(index:'{index}')") # --- Advanced Log Search, Filtering, Alerting --- from typing import Callable def search_logs(log_file: str, keyword: str) -> list[str]: results: list[str] = [] with open(log_file, encoding="utf-8") as f: for line in f: if keyword in line: results.append(line.strip()) return results def alert_on_log_pattern(log_file: str, pattern: str, alert_fn: Callable[[str], None]): with open(log_file, encoding="utf-8") as f: for line in f: if pattern in line: alert_fn(line) # --- Log Correlation, Trace IDs, Distributed Context --- def add_trace_id(record: logging.LogRecord, trace_id: str) -> logging.LogRecord: record.trace_id = trace_id return record import re from typing import List, Optional def redact_log_message(message: str, patterns: List[str]) -> str: for pat in patterns: message = re.sub(pat, "[REDACTED]", message) return message class RedactingFormatter(logging.Formatter): def __init__(self, patterns: List[str], *args: object, **kwargs: object): super().__init__(*args, **kwargs) self.patterns = patterns def format(self, record: logging.LogRecord) -> str: msg = super().format(record) return redact_log_message(msg, self.patterns) def enrich_log_record(record: logging.LogRecord, user_id: Optional[str]=None, session_id: Optional[str]=None, request_id: Optional[str]=None): if user_id: record.user_id = user_id if session_id: record.session_id = session_id if request_id: record.request_id = request_id return record # --- Expanded Test Harness --- def test_advanced_logging(): logger = configure_logging("development") # Start log streaming server in a thread import threading server_thread = threading.Thread(target=run_log_stream_server, daemon=True) server_thread.start() time.sleep(2) # Attach websocket handler ws_handler = WebsocketLogHandler() logger.addHandler(ws_handler) logger.info("Websocket log test.") # Elasticsearch es_handler = ElasticsearchHandler("http://localhost:9200") logger.addHandler(es_handler) logger.info("Elasticsearch log test.") create_elasticsearch_dashboard("http://localhost:9200") # Log rotation/archival schedule_log_rotation(LOG_DIR, max_files=5, interval_minutes=1) logger.info("Log rotation scheduled.") # Search/filter/alert log_file = os.path.join(LOG_DIR, "app.log") print("Search logs:", search_logs(log_file, "Test")) alert_on_log_pattern(log_file, "ERROR", lambda l: print("ALERT:", l)) # Redaction redacting_formatter = RedactingFormatter([r"password=\w+", r"secret=\w+"]) for handler in logger.handlers: handler.setFormatter(redacting_formatter) logger.info("User login password=12345 secret=abcdefg") # Enrich log record logger = get_logger("enriched", context={"user_id": "u42", "session_id": "sess99", "request_id": "req777"}) logger.info("Enriched log with context.") # Simulate log ingestion from syslog (stub) print("[Stub] Ingest logs from syslog/cloudwatch.") if __name__ == "__main__": test_advanced_logging() import logging import os import sys import json from logging.handlers import RotatingFileHandler, SMTPHandler, HTTPHandler from typing import Optional, Dict, Any # --- Sentry Integration --- SENTRY_DSN = os.getenv("SENTRY_DSN") try: import sentry_sdk if SENTRY_DSN: sentry_sdk.init( dsn=SENTRY_DSN, traces_sample_rate=0.1, # Adjust for production environment=os.getenv("ENVIRONMENT", "development"), ) except ImportError: pass # Sentry is optional # --- Advanced Handlers --- LOG_DIR = os.getenv("LOG_DIR", "logs") os.makedirs(LOG_DIR, exist_ok=True) def get_file_handler(log_name: str = "app.log", max_bytes: int = 10**7, backup_count: int = 5): return RotatingFileHandler( os.path.join(LOG_DIR, log_name), maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8" ) def get_email_handler(): mailhost = (os.getenv("SMTP_HOST", "localhost"), int(os.getenv("SMTP_PORT", 25))) fromaddr = os.getenv("LOG_EMAIL_FROM", "noreply@example.com") toaddrs = os.getenv("LOG_EMAIL_TO", "admin@example.com").split(",") subject = os.getenv("LOG_EMAIL_SUBJECT", "App Error") user = os.getenv("SMTP_USER") passwd = os.getenv("SMTP_PASS") credentials = (user, passwd) if user and passwd else None return SMTPHandler(mailhost, fromaddr, toaddrs, subject, credentials=credentials, secure=()) def get_http_handler(): host = os.getenv("LOG_HTTP_HOST", "localhost:8000") url = os.getenv("LOG_HTTP_URL", "/log") method = os.getenv("LOG_HTTP_METHOD", "POST") return HTTPHandler(host, url, method=method) class SlackHandler(logging.Handler): """Send logs to Slack via webhook.""" def __init__(self, webhook_url: str): super().__init__() self.webhook_url = webhook_url def emit(self, record): import requests log_entry = self.format(record) try: requests.post(self.webhook_url, json={"text": log_entry}) except Exception: pass # --- Structured Logging --- class JsonFormatter(logging.Formatter): def format(self, record): log_record = { "timestamp": self.formatTime(record, self.datefmt), "level": record.levelname, "name": record.name, "message": record.getMessage(), "module": record.module, "funcName": record.funcName, "lineno": record.lineno, } if hasattr(record, 'extra'): log_record.update(record.extra) return json.dumps(log_record) # --- Dynamic Log Level and Filtering --- def set_log_level(logger: logging.Logger, level: str): logger.setLevel(getattr(logging, level.upper(), logging.INFO)) class ContextFilter(logging.Filter): def __init__(self, context: Optional[Dict[str, Any]] = None): super().__init__() self.context = context or {} def filter(self, record): for k, v in self.context.items(): setattr(record, k, v) return True # --- Distributed Tracing and Metrics (Stubs) --- def trace_log(logger: logging.Logger, trace_id: str, span_id: str, message: str): logger.info(f"[trace_id={trace_id} span_id={span_id}] {message}") def log_metric(logger: logging.Logger, metric_name: str, value: float, tags: Optional[Dict[str, str]] = None): logger.info(f"[metric] {metric_name}={value} tags={tags}") # --- Log Analysis and Visualization Utilities --- from collections import Counter def analyze_logs(log_file: str) -> Dict[str, Any]: levels = Counter() with open(log_file, encoding="utf-8") as f: for line in f: try: entry = json.loads(line) levels[entry.get("level", "INFO")] += 1 except Exception: continue return dict(levels) def visualize_log_levels(log_file: str): import matplotlib.pyplot as plt stats = analyze_logs(log_file) plt.bar(stats.keys(), stats.values()) plt.title("Log Level Distribution") plt.xlabel("Level") plt.ylabel("Count") plt.show() # --- Main Logging Configuration --- def configure_logging(env: str = "development"): env = env or os.getenv("ENVIRONMENT", "development") logger = logging.getLogger() logger.handlers.clear() formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s') json_formatter = JsonFormatter() # Console handler ch = logging.StreamHandler(sys.stdout) ch.setFormatter(formatter) logger.addHandler(ch) # File handler fh = get_file_handler("app.log") fh.setFormatter(json_formatter) logger.addHandler(fh) # Websocket handler (for real-time streaming) ws_handler = WebsocketLogHandler() logger.addHandler(ws_handler) # Email handler (errors only) if env == "production": eh = get_email_handler() eh.setLevel(logging.ERROR) logger.addHandler(eh) # HTTP handler (optional) if os.getenv("LOG_HTTP_HOST"): hh = get_http_handler() logger.addHandler(hh) # Slack handler (optional) slack_url = os.getenv("SLACK_WEBHOOK_URL") if slack_url: sh = SlackHandler(slack_url) sh.setLevel(logging.WARNING) logger.addHandler(sh) logger.setLevel(logging.INFO) logger.info(f"Logging configured for {env} environment.") return logger def get_logger(name: str, context: Optional[Dict[str, Any]] = None, level: str = "INFO") -> logging.Logger: logger = logging.getLogger(name) set_log_level(logger, level) if context: logger.addFilter(ContextFilter(context)) return logger # --- Example/Test Harness --- def test_logging(): logger = configure_logging("development") logger.info("Test info log.") logger.warning("Test warning log.") logger.error("Test error log.") trace_log(logger, "trace123", "span456", "Tracing example.") log_metric(logger, "accuracy", 0.98, tags={"model": "test"}) # Visualize log levels log_file = os.path.join(LOG_DIR, "app.log") visualize_log_levels(log_file) if __name__ == "__main__": test_logging()