| |
|
| |
|
| |
|
| | import asyncio
|
| | import logging
|
| | import time
|
| | import os
|
| | from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
| | import uvicorn
|
| |
|
| | from typing import Set
|
| | class WebsocketLogHandler(logging.Handler):
|
| | """Streams logs to connected websocket clients."""
|
| | clients: Set[WebSocket] = set()
|
| | def __init__(self):
|
| | super().__init__()
|
| | def emit(self, record: logging.LogRecord):
|
| | log_entry = self.format(record)
|
| |
|
| | for ws in list(WebsocketLogHandler.clients):
|
| | try:
|
| | loop = asyncio.get_event_loop()
|
| | if loop.is_running():
|
| | asyncio.create_task(ws.send_text(log_entry))
|
| | else:
|
| | loop.run_until_complete(ws.send_text(log_entry))
|
| | except Exception:
|
| | WebsocketLogHandler.clients.discard(ws)
|
| |
|
| | app = FastAPI()
|
| |
|
| | @app.websocket("/ws/logs")
|
| | async def websocket_endpoint(websocket: WebSocket):
|
| | await websocket.accept()
|
| | WebsocketLogHandler.clients.add(websocket)
|
| | try:
|
| | while True:
|
| | await websocket.receive_text()
|
| | except WebSocketDisconnect:
|
| | WebsocketLogHandler.clients.discard(websocket)
|
| |
|
| | def run_log_stream_server():
|
| | uvicorn.run(app, host="0.0.0.0", port=8765)
|
| |
|
| | import shutil
|
| | from apscheduler.schedulers.background import BackgroundScheduler
|
| | def rotate_logs(log_dir: str, max_files: int = 10):
|
| | files = sorted([f for f in os.listdir(log_dir) if f.endswith('.log')], reverse=True)
|
| | for i, f in enumerate(files[max_files:], start=max_files):
|
| | os.remove(os.path.join(log_dir, f))
|
| |
|
| | if len(files) > max_files:
|
| | archive_dir = os.path.join(log_dir, 'archive')
|
| | os.makedirs(archive_dir, exist_ok=True)
|
| | shutil.move(os.path.join(log_dir, files[max_files]), os.path.join(archive_dir, files[max_files]))
|
| |
|
| | def schedule_log_rotation(log_dir: str, max_files: int = 10, interval_minutes: int = 60):
|
| | scheduler = BackgroundScheduler()
|
| | scheduler.add_job(lambda: rotate_logs(log_dir, max_files), 'interval', minutes=interval_minutes)
|
| | scheduler.start()
|
| |
|
| | try:
|
| | from elasticsearch import Elasticsearch
|
| | except ImportError:
|
| | Elasticsearch = None
|
| |
|
| | class ElasticsearchHandler(logging.Handler):
|
| | def __init__(self, es_url: str, index: str = 'logs'):
|
| | super().__init__()
|
| | if Elasticsearch is None:
|
| | raise ImportError("elasticsearch package is not installed")
|
| | self.es = Elasticsearch(es_url)
|
| | self.index = index
|
| | def emit(self, record: logging.LogRecord):
|
| | log_entry = self.format(record)
|
| | self.es.index(index=self.index, body={'message': log_entry, 'timestamp': time.time()})
|
| |
|
| | def create_elasticsearch_dashboard(es_url: str, index: str = 'logs'):
|
| |
|
| |
|
| | print(f"Dashboard for index '{index}' available at {es_url}/_plugin/kibana/app/discover#/?_a=(index:'{index}')")
|
| |
|
| |
|
| | from typing import Callable
|
| | def search_logs(log_file: str, keyword: str) -> list[str]:
|
| | results: list[str] = []
|
| | with open(log_file, encoding="utf-8") as f:
|
| | for line in f:
|
| | if keyword in line:
|
| | results.append(line.strip())
|
| | return results
|
| |
|
| | def alert_on_log_pattern(log_file: str, pattern: str, alert_fn: Callable[[str], None]):
|
| | with open(log_file, encoding="utf-8") as f:
|
| | for line in f:
|
| | if pattern in line:
|
| | alert_fn(line)
|
| |
|
| |
|
| | def add_trace_id(record: logging.LogRecord, trace_id: str) -> logging.LogRecord:
|
| | record.trace_id = trace_id
|
| | return record
|
| |
|
| | import re
|
| | from typing import List, Optional
|
| | def redact_log_message(message: str, patterns: List[str]) -> str:
|
| | for pat in patterns:
|
| | message = re.sub(pat, "[REDACTED]", message)
|
| | return message
|
| |
|
| | class RedactingFormatter(logging.Formatter):
|
| | def __init__(self, patterns: List[str], *args: object, **kwargs: object):
|
| | super().__init__(*args, **kwargs)
|
| | self.patterns = patterns
|
| | def format(self, record: logging.LogRecord) -> str:
|
| | msg = super().format(record)
|
| | return redact_log_message(msg, self.patterns)
|
| |
|
| | def enrich_log_record(record: logging.LogRecord, user_id: Optional[str]=None, session_id: Optional[str]=None, request_id: Optional[str]=None):
|
| | if user_id:
|
| | record.user_id = user_id
|
| | if session_id:
|
| | record.session_id = session_id
|
| | if request_id:
|
| | record.request_id = request_id
|
| | return record
|
| |
|
| |
|
| | def test_advanced_logging():
|
| | logger = configure_logging("development")
|
| |
|
| | import threading
|
| | server_thread = threading.Thread(target=run_log_stream_server, daemon=True)
|
| | server_thread.start()
|
| | time.sleep(2)
|
| |
|
| | ws_handler = WebsocketLogHandler()
|
| | logger.addHandler(ws_handler)
|
| | logger.info("Websocket log test.")
|
| |
|
| | es_handler = ElasticsearchHandler("http://localhost:9200")
|
| | logger.addHandler(es_handler)
|
| | logger.info("Elasticsearch log test.")
|
| | create_elasticsearch_dashboard("http://localhost:9200")
|
| |
|
| | schedule_log_rotation(LOG_DIR, max_files=5, interval_minutes=1)
|
| | logger.info("Log rotation scheduled.")
|
| |
|
| | log_file = os.path.join(LOG_DIR, "app.log")
|
| | print("Search logs:", search_logs(log_file, "Test"))
|
| | alert_on_log_pattern(log_file, "ERROR", lambda l: print("ALERT:", l))
|
| |
|
| | redacting_formatter = RedactingFormatter([r"password=\w+", r"secret=\w+"])
|
| | for handler in logger.handlers:
|
| | handler.setFormatter(redacting_formatter)
|
| | logger.info("User login password=12345 secret=abcdefg")
|
| |
|
| | logger = get_logger("enriched", context={"user_id": "u42", "session_id": "sess99", "request_id": "req777"})
|
| | logger.info("Enriched log with context.")
|
| |
|
| | print("[Stub] Ingest logs from syslog/cloudwatch.")
|
| |
|
| | if __name__ == "__main__":
|
| | test_advanced_logging()
|
| |
|
| | import logging
|
| | import os
|
| | import sys
|
| | import json
|
| | from logging.handlers import RotatingFileHandler, SMTPHandler, HTTPHandler
|
| | from typing import Optional, Dict, Any
|
| |
|
| |
|
| | SENTRY_DSN = os.getenv("SENTRY_DSN")
|
| | try:
|
| | import sentry_sdk
|
| | if SENTRY_DSN:
|
| | sentry_sdk.init(
|
| | dsn=SENTRY_DSN,
|
| | traces_sample_rate=0.1,
|
| | environment=os.getenv("ENVIRONMENT", "development"),
|
| | )
|
| | except ImportError:
|
| | pass
|
| |
|
| |
|
| | LOG_DIR = os.getenv("LOG_DIR", "logs")
|
| | os.makedirs(LOG_DIR, exist_ok=True)
|
| |
|
| | def get_file_handler(log_name: str = "app.log", max_bytes: int = 10**7, backup_count: int = 5):
|
| | return RotatingFileHandler(
|
| | os.path.join(LOG_DIR, log_name),
|
| | maxBytes=max_bytes,
|
| | backupCount=backup_count,
|
| | encoding="utf-8"
|
| | )
|
| |
|
| | def get_email_handler():
|
| | mailhost = (os.getenv("SMTP_HOST", "localhost"), int(os.getenv("SMTP_PORT", 25)))
|
| | fromaddr = os.getenv("LOG_EMAIL_FROM", "noreply@example.com")
|
| | toaddrs = os.getenv("LOG_EMAIL_TO", "admin@example.com").split(",")
|
| | subject = os.getenv("LOG_EMAIL_SUBJECT", "App Error")
|
| | user = os.getenv("SMTP_USER")
|
| | passwd = os.getenv("SMTP_PASS")
|
| | credentials = (user, passwd) if user and passwd else None
|
| | return SMTPHandler(mailhost, fromaddr, toaddrs, subject, credentials=credentials, secure=())
|
| |
|
| | def get_http_handler():
|
| | host = os.getenv("LOG_HTTP_HOST", "localhost:8000")
|
| | url = os.getenv("LOG_HTTP_URL", "/log")
|
| | method = os.getenv("LOG_HTTP_METHOD", "POST")
|
| | return HTTPHandler(host, url, method=method)
|
| |
|
| | class SlackHandler(logging.Handler):
|
| | """Send logs to Slack via webhook."""
|
| | def __init__(self, webhook_url: str):
|
| | super().__init__()
|
| | self.webhook_url = webhook_url
|
| | def emit(self, record):
|
| | import requests
|
| | log_entry = self.format(record)
|
| | try:
|
| | requests.post(self.webhook_url, json={"text": log_entry})
|
| | except Exception:
|
| | pass
|
| |
|
| |
|
| | class JsonFormatter(logging.Formatter):
|
| | def format(self, record):
|
| | log_record = {
|
| | "timestamp": self.formatTime(record, self.datefmt),
|
| | "level": record.levelname,
|
| | "name": record.name,
|
| | "message": record.getMessage(),
|
| | "module": record.module,
|
| | "funcName": record.funcName,
|
| | "lineno": record.lineno,
|
| | }
|
| | if hasattr(record, 'extra'):
|
| | log_record.update(record.extra)
|
| | return json.dumps(log_record)
|
| |
|
| |
|
| | def set_log_level(logger: logging.Logger, level: str):
|
| | logger.setLevel(getattr(logging, level.upper(), logging.INFO))
|
| |
|
| | class ContextFilter(logging.Filter):
|
| | def __init__(self, context: Optional[Dict[str, Any]] = None):
|
| | super().__init__()
|
| | self.context = context or {}
|
| | def filter(self, record):
|
| | for k, v in self.context.items():
|
| | setattr(record, k, v)
|
| | return True
|
| |
|
| |
|
| | def trace_log(logger: logging.Logger, trace_id: str, span_id: str, message: str):
|
| | logger.info(f"[trace_id={trace_id} span_id={span_id}] {message}")
|
| |
|
| | def log_metric(logger: logging.Logger, metric_name: str, value: float, tags: Optional[Dict[str, str]] = None):
|
| | logger.info(f"[metric] {metric_name}={value} tags={tags}")
|
| |
|
| |
|
| | from collections import Counter
|
| | def analyze_logs(log_file: str) -> Dict[str, Any]:
|
| | levels = Counter()
|
| | with open(log_file, encoding="utf-8") as f:
|
| | for line in f:
|
| | try:
|
| | entry = json.loads(line)
|
| | levels[entry.get("level", "INFO")] += 1
|
| | except Exception:
|
| | continue
|
| | return dict(levels)
|
| |
|
| | def visualize_log_levels(log_file: str):
|
| | import matplotlib.pyplot as plt
|
| | stats = analyze_logs(log_file)
|
| | plt.bar(stats.keys(), stats.values())
|
| | plt.title("Log Level Distribution")
|
| | plt.xlabel("Level")
|
| | plt.ylabel("Count")
|
| | plt.show()
|
| |
|
| |
|
| | def configure_logging(env: str = "development"):
|
| | env = env or os.getenv("ENVIRONMENT", "development")
|
| | logger = logging.getLogger()
|
| | logger.handlers.clear()
|
| | formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s')
|
| | json_formatter = JsonFormatter()
|
| |
|
| | ch = logging.StreamHandler(sys.stdout)
|
| | ch.setFormatter(formatter)
|
| | logger.addHandler(ch)
|
| |
|
| | fh = get_file_handler("app.log")
|
| | fh.setFormatter(json_formatter)
|
| | logger.addHandler(fh)
|
| |
|
| | ws_handler = WebsocketLogHandler()
|
| | logger.addHandler(ws_handler)
|
| |
|
| | if env == "production":
|
| | eh = get_email_handler()
|
| | eh.setLevel(logging.ERROR)
|
| | logger.addHandler(eh)
|
| |
|
| | if os.getenv("LOG_HTTP_HOST"):
|
| | hh = get_http_handler()
|
| | logger.addHandler(hh)
|
| |
|
| | slack_url = os.getenv("SLACK_WEBHOOK_URL")
|
| | if slack_url:
|
| | sh = SlackHandler(slack_url)
|
| | sh.setLevel(logging.WARNING)
|
| | logger.addHandler(sh)
|
| | logger.setLevel(logging.INFO)
|
| | logger.info(f"Logging configured for {env} environment.")
|
| | return logger
|
| |
|
| | def get_logger(name: str, context: Optional[Dict[str, Any]] = None, level: str = "INFO") -> logging.Logger:
|
| | logger = logging.getLogger(name)
|
| | set_log_level(logger, level)
|
| | if context:
|
| | logger.addFilter(ContextFilter(context))
|
| | return logger
|
| |
|
| |
|
| | def test_logging():
|
| | logger = configure_logging("development")
|
| | logger.info("Test info log.")
|
| | logger.warning("Test warning log.")
|
| | logger.error("Test error log.")
|
| | trace_log(logger, "trace123", "span456", "Tracing example.")
|
| | log_metric(logger, "accuracy", 0.98, tags={"model": "test"})
|
| |
|
| | log_file = os.path.join(LOG_DIR, "app.log")
|
| | visualize_log_levels(log_file)
|
| |
|
| | if __name__ == "__main__":
|
| | test_logging()
|
| |
|