File size: 12,812 Bytes
effde1c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343


# --- Real-Time Log Streaming (Websockets, FastAPI) ---
import asyncio
import logging
import time
import os
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import uvicorn

from typing import Set
class WebsocketLogHandler(logging.Handler):
    """Streams logs to connected websocket clients."""
    clients: Set[WebSocket] = set()
    def __init__(self):
        super().__init__()
    def emit(self, record: logging.LogRecord):
        log_entry = self.format(record)
        # Schedule sending log to all clients
        for ws in list(WebsocketLogHandler.clients):
            try:
                loop = asyncio.get_event_loop()
                if loop.is_running():
                    asyncio.create_task(ws.send_text(log_entry))
                else:
                    loop.run_until_complete(ws.send_text(log_entry))
            except Exception:
                WebsocketLogHandler.clients.discard(ws)

app = FastAPI()

@app.websocket("/ws/logs")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    WebsocketLogHandler.clients.add(websocket)
    try:
        while True:
            await websocket.receive_text()  # Keep alive
    except WebSocketDisconnect:
        WebsocketLogHandler.clients.discard(websocket)

def run_log_stream_server():
    uvicorn.run(app, host="0.0.0.0", port=8765)

import shutil
from apscheduler.schedulers.background import BackgroundScheduler
def rotate_logs(log_dir: str, max_files: int = 10):
    files = sorted([f for f in os.listdir(log_dir) if f.endswith('.log')], reverse=True)
    for i, f in enumerate(files[max_files:], start=max_files):
        os.remove(os.path.join(log_dir, f))
    # Archive oldest
    if len(files) > max_files:
        archive_dir = os.path.join(log_dir, 'archive')
        os.makedirs(archive_dir, exist_ok=True)
        shutil.move(os.path.join(log_dir, files[max_files]), os.path.join(archive_dir, files[max_files]))

def schedule_log_rotation(log_dir: str, max_files: int = 10, interval_minutes: int = 60):
    scheduler = BackgroundScheduler()
    scheduler.add_job(lambda: rotate_logs(log_dir, max_files), 'interval', minutes=interval_minutes)
    scheduler.start()

try:
    from elasticsearch import Elasticsearch
except ImportError:
    Elasticsearch = None

class ElasticsearchHandler(logging.Handler):
    def __init__(self, es_url: str, index: str = 'logs'):
        super().__init__()
        if Elasticsearch is None:
            raise ImportError("elasticsearch package is not installed")
        self.es = Elasticsearch(es_url)
        self.index = index
    def emit(self, record: logging.LogRecord):
        log_entry = self.format(record)
        self.es.index(index=self.index, body={'message': log_entry, 'timestamp': time.time()})

def create_elasticsearch_dashboard(es_url: str, index: str = 'logs'):
    # This is a stub for real dashboard creation (Kibana, Grafana, etc.)
    # In production, use Kibana API or Grafana provisioning
    print(f"Dashboard for index '{index}' available at {es_url}/_plugin/kibana/app/discover#/?_a=(index:'{index}')")

# --- Advanced Log Search, Filtering, Alerting ---
from typing import Callable
def search_logs(log_file: str, keyword: str) -> list[str]:
    results: list[str] = []
    with open(log_file, encoding="utf-8") as f:
        for line in f:
            if keyword in line:
                results.append(line.strip())
    return results

def alert_on_log_pattern(log_file: str, pattern: str, alert_fn: Callable[[str], None]):
    with open(log_file, encoding="utf-8") as f:
        for line in f:
            if pattern in line:
                alert_fn(line)

# --- Log Correlation, Trace IDs, Distributed Context ---
def add_trace_id(record: logging.LogRecord, trace_id: str) -> logging.LogRecord:
    record.trace_id = trace_id
    return record

import re
from typing import List, Optional
def redact_log_message(message: str, patterns: List[str]) -> str:
    for pat in patterns:
        message = re.sub(pat, "[REDACTED]", message)
    return message

class RedactingFormatter(logging.Formatter):
    def __init__(self, patterns: List[str], *args: object, **kwargs: object):
        super().__init__(*args, **kwargs)
        self.patterns = patterns
    def format(self, record: logging.LogRecord) -> str:
        msg = super().format(record)
        return redact_log_message(msg, self.patterns)

def enrich_log_record(record: logging.LogRecord, user_id: Optional[str]=None, session_id: Optional[str]=None, request_id: Optional[str]=None):
    if user_id:
        record.user_id = user_id
    if session_id:
        record.session_id = session_id
    if request_id:
        record.request_id = request_id
    return record

# --- Expanded Test Harness ---
def test_advanced_logging():
    logger = configure_logging("development")
    # Start log streaming server in a thread
    import threading
    server_thread = threading.Thread(target=run_log_stream_server, daemon=True)
    server_thread.start()
    time.sleep(2)
    # Attach websocket handler
    ws_handler = WebsocketLogHandler()
    logger.addHandler(ws_handler)
    logger.info("Websocket log test.")
    # Elasticsearch
    es_handler = ElasticsearchHandler("http://localhost:9200")
    logger.addHandler(es_handler)
    logger.info("Elasticsearch log test.")
    create_elasticsearch_dashboard("http://localhost:9200")
    # Log rotation/archival
    schedule_log_rotation(LOG_DIR, max_files=5, interval_minutes=1)
    logger.info("Log rotation scheduled.")
    # Search/filter/alert
    log_file = os.path.join(LOG_DIR, "app.log")
    print("Search logs:", search_logs(log_file, "Test"))
    alert_on_log_pattern(log_file, "ERROR", lambda l: print("ALERT:", l))
    # Redaction
    redacting_formatter = RedactingFormatter([r"password=\w+", r"secret=\w+"])
    for handler in logger.handlers:
        handler.setFormatter(redacting_formatter)
    logger.info("User login password=12345 secret=abcdefg")
    # Enrich log record
    logger = get_logger("enriched", context={"user_id": "u42", "session_id": "sess99", "request_id": "req777"})
    logger.info("Enriched log with context.")
    # Simulate log ingestion from syslog (stub)
    print("[Stub] Ingest logs from syslog/cloudwatch.")

if __name__ == "__main__":
    test_advanced_logging()

import logging
import os
import sys
import json
from logging.handlers import RotatingFileHandler, SMTPHandler, HTTPHandler
from typing import Optional, Dict, Any

# --- Sentry Integration ---
SENTRY_DSN = os.getenv("SENTRY_DSN")
try:
    import sentry_sdk
    if SENTRY_DSN:
        sentry_sdk.init(
            dsn=SENTRY_DSN,
            traces_sample_rate=0.1,  # Adjust for production
            environment=os.getenv("ENVIRONMENT", "development"),
        )
except ImportError:
    pass  # Sentry is optional

# --- Advanced Handlers ---
LOG_DIR = os.getenv("LOG_DIR", "logs")
os.makedirs(LOG_DIR, exist_ok=True)

def get_file_handler(log_name: str = "app.log", max_bytes: int = 10**7, backup_count: int = 5):
    return RotatingFileHandler(
        os.path.join(LOG_DIR, log_name),
        maxBytes=max_bytes,
        backupCount=backup_count,
        encoding="utf-8"
    )

def get_email_handler():
    mailhost = (os.getenv("SMTP_HOST", "localhost"), int(os.getenv("SMTP_PORT", 25)))
    fromaddr = os.getenv("LOG_EMAIL_FROM", "noreply@example.com")
    toaddrs = os.getenv("LOG_EMAIL_TO", "admin@example.com").split(",")
    subject = os.getenv("LOG_EMAIL_SUBJECT", "App Error")
    user = os.getenv("SMTP_USER")
    passwd = os.getenv("SMTP_PASS")
    credentials = (user, passwd) if user and passwd else None
    return SMTPHandler(mailhost, fromaddr, toaddrs, subject, credentials=credentials, secure=())

def get_http_handler():
    host = os.getenv("LOG_HTTP_HOST", "localhost:8000")
    url = os.getenv("LOG_HTTP_URL", "/log")
    method = os.getenv("LOG_HTTP_METHOD", "POST")
    return HTTPHandler(host, url, method=method)

class SlackHandler(logging.Handler):
    """Send logs to Slack via webhook."""
    def __init__(self, webhook_url: str):
        super().__init__()
        self.webhook_url = webhook_url
    def emit(self, record):
        import requests
        log_entry = self.format(record)
        try:
            requests.post(self.webhook_url, json={"text": log_entry})
        except Exception:
            pass

# --- Structured Logging ---
class JsonFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            "timestamp": self.formatTime(record, self.datefmt),
            "level": record.levelname,
            "name": record.name,
            "message": record.getMessage(),
            "module": record.module,
            "funcName": record.funcName,
            "lineno": record.lineno,
        }
        if hasattr(record, 'extra'):
            log_record.update(record.extra)
        return json.dumps(log_record)

# --- Dynamic Log Level and Filtering ---
def set_log_level(logger: logging.Logger, level: str):
    logger.setLevel(getattr(logging, level.upper(), logging.INFO))

class ContextFilter(logging.Filter):
    def __init__(self, context: Optional[Dict[str, Any]] = None):
        super().__init__()
        self.context = context or {}
    def filter(self, record):
        for k, v in self.context.items():
            setattr(record, k, v)
        return True

# --- Distributed Tracing and Metrics (Stubs) ---
def trace_log(logger: logging.Logger, trace_id: str, span_id: str, message: str):
    logger.info(f"[trace_id={trace_id} span_id={span_id}] {message}")

def log_metric(logger: logging.Logger, metric_name: str, value: float, tags: Optional[Dict[str, str]] = None):
    logger.info(f"[metric] {metric_name}={value} tags={tags}")

# --- Log Analysis and Visualization Utilities ---
from collections import Counter
def analyze_logs(log_file: str) -> Dict[str, Any]:
    levels = Counter()
    with open(log_file, encoding="utf-8") as f:
        for line in f:
            try:
                entry = json.loads(line)
                levels[entry.get("level", "INFO")] += 1
            except Exception:
                continue
    return dict(levels)

def visualize_log_levels(log_file: str):
    import matplotlib.pyplot as plt
    stats = analyze_logs(log_file)
    plt.bar(stats.keys(), stats.values())
    plt.title("Log Level Distribution")
    plt.xlabel("Level")
    plt.ylabel("Count")
    plt.show()

# --- Main Logging Configuration ---
def configure_logging(env: str = "development"):
    env = env or os.getenv("ENVIRONMENT", "development")
    logger = logging.getLogger()
    logger.handlers.clear()
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s')
    json_formatter = JsonFormatter()
    # Console handler
    ch = logging.StreamHandler(sys.stdout)
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    # File handler
    fh = get_file_handler("app.log")
    fh.setFormatter(json_formatter)
    logger.addHandler(fh)
    # Websocket handler (for real-time streaming)
    ws_handler = WebsocketLogHandler()
    logger.addHandler(ws_handler)
    # Email handler (errors only)
    if env == "production":
        eh = get_email_handler()
        eh.setLevel(logging.ERROR)
        logger.addHandler(eh)
    # HTTP handler (optional)
    if os.getenv("LOG_HTTP_HOST"):
        hh = get_http_handler()
        logger.addHandler(hh)
    # Slack handler (optional)
    slack_url = os.getenv("SLACK_WEBHOOK_URL")
    if slack_url:
        sh = SlackHandler(slack_url)
        sh.setLevel(logging.WARNING)
        logger.addHandler(sh)
    logger.setLevel(logging.INFO)
    logger.info(f"Logging configured for {env} environment.")
    return logger

def get_logger(name: str, context: Optional[Dict[str, Any]] = None, level: str = "INFO") -> logging.Logger:
    logger = logging.getLogger(name)
    set_log_level(logger, level)
    if context:
        logger.addFilter(ContextFilter(context))
    return logger

# --- Example/Test Harness ---
def test_logging():
    logger = configure_logging("development")
    logger.info("Test info log.")
    logger.warning("Test warning log.")
    logger.error("Test error log.")
    trace_log(logger, "trace123", "span456", "Tracing example.")
    log_metric(logger, "accuracy", 0.98, tags={"model": "test"})
    # Visualize log levels
    log_file = os.path.join(LOG_DIR, "app.log")
    visualize_log_levels(log_file)

if __name__ == "__main__":
    test_logging()